kraken
148 строк · 4.1 Кб
1# Copyright (c) 2016-2019 Uber Technologies, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import json
15import random
16import sets
17
18import networkx as nx
19
20"""
21Random regular graph with connection limit of 5:
22- 5000 peers, 500MB: 17 iterations
23- 1000 peers, 10GB: p50 294 iterations, p100 298 iterations (84% ~ 85% speed)
24"""
25PEER_COUNT = 5000
26PIECE_COUNT = 125
27PIECE_TRANSMIT_LIMIT = 10 # Number of pieces uploaded/downloaded per iteration
28DEGREE = 5
29
30
31class Peer(object):
32def __init__(self, name, piece_count):
33self.name = name
34self.neighbors = sets.Set()
35self.pieces = [0]*piece_count
36self.completed = 0
37self.time = 0
38
39self.uploaded_current_turn = 0
40self.downloaded_current_turn = 0
41
42def connect(self, other):
43self.neighbors.add(other)
44other.neighbors.add(self)
45
46def done(self):
47return self.completed == len(self.pieces)
48
49def fetch_step(self, time):
50if self.done():
51return
52
53if self.downloaded_current_turn >= PIECE_TRANSMIT_LIMIT:
54return
55
56candidates = []
57for n in self.neighbors:
58if n.uploaded_current_turn >= PIECE_TRANSMIT_LIMIT:
59continue
60
61for i in range(0, len(self.pieces)):
62if n.uploaded_current_turn >= PIECE_TRANSMIT_LIMIT:
63continue
64
65if n.pieces[i] == 1 and self.pieces[i] == 0:
66candidates.append((n, i))
67
68if len(candidates) == 0:
69return
70
71c = random.choice(candidates)
72
73self.pieces[c[1]] = 1
74self.completed += 1
75self.downloaded_current_turn += 1
76c[0].uploaded_current_turn += 1
77
78print ('Peer %s downloaded one piece from neighbor %s. Total completed: %d.' % (self.name, c[0].name, self.completed))
79
80if self.completed == len(self.pieces)-1:
81self.time = time
82print ('Peer %s finished downloading at time %d.' % (self.name, time))
83
84def fetch_cleanup(self):
85self.uploaded_current_turn = 0
86self.downloaded_current_turn = 0
87
88class PeerManager(object):
89
90def __init__(self):
91self.peers = []
92
93g = nx.random_regular_graph(DEGREE, PEER_COUNT)
94for n in g:
95peer = Peer(str(n), PIECE_COUNT)
96self.peers.append(peer)
97
98for e in g.edges():
99self.peers[e[0]].connect(self.peers[e[1]])
100
101for peer in self.peers:
102neighbors_str = ""
103for neighbor in peer.neighbors:
104neighbors_str = neighbors_str + neighbor.name + "; "
105print ('Peer %s is connected to peers %s' % (peer.name, neighbors_str))
106
107# Set peer 0 to be the seeder.
108self.peers[0].pieces = [1]*PIECE_COUNT
109self.peers[0].completed = len(self.peers[0].pieces)
110
111def start(self):
112time = 0
113while True:
114print ('current time: %d.' % time)
115time += 1
116
117plan = []
118for p in self.peers:
119if not p.done():
120for j in range(0, PIECE_TRANSMIT_LIMIT):
121plan.append(p)
122random.shuffle(plan)
123for p in plan:
124p.fetch_step(time)
125
126for p in self.peers:
127p.fetch_cleanup()
128
129done = True
130for p in self.peers:
131if p.completed != len(p.pieces):
132done = False
133
134if done:
135break
136
137if time > 1000:
138break
139
140print ('Done. Total time: %d.' % time)
141
142
143def main():
144peer_manager = PeerManager()
145peer_manager.start()
146
147if __name__== "__main__":
148main()
149