1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 from random import shuffle
17
18 from Anomos import bttime, LOG as log
19 from Anomos.Measure import Measure
20 from Anomos.bitfield import Bitfield
21
23
25 self.numgood = 0
26 self.bad = {}
27 self.numstreams = 0
28 self.lastdownload = None
29
30
32
39
40 - def bad(self, index, bump = False):
41 self.stats.bad.setdefault(index, 0)
42 self.stats.bad[index] += 1
43 if self.uid not in self.downloader.bad_peers:
44 self.downloader.bad_peers[self.uid] = (False, self.stats)
45 if self.download is not None:
46 self.downloader.kick(self.download)
47 self.download = None
48 elif len(self.stats.bad) > 1 and self.stats.numstreams == 1 and \
49 self.stats.lastdownload is not None:
50
51
52
53 self.downloader.kick(self.stats.lastdownload)
54
55
56
57 if bump:
58 self.downloader.picker.bump(index)
59
60 - def good(self, index):
61
62
63 if index != self.lastindex:
64 self.stats.numgood += 1
65 self.lastindex = index
66
67
69
71 self.downloader = downloader
72 self.stream = stream
73 self.choked = True
74 self.interested = False
75 self.active_requests = []
76 self.measure = Measure(downloader.config['max_rate_period'])
77 self.peermeasure = Measure(max(downloader.storage.piece_size / 10000,
78 20))
79 self.have = Bitfield(downloader.numpieces)
80 self.last = 0
81 self.example_interest = None
82 self.backlog = 2
83 self.guard = BadDataGuard(self)
84
86 backlog = 2 + int(4 * self.measure.get_rate() /
87 self.downloader.chunksize)
88 if backlog > 50:
89 backlog = max(50, int(.075 * backlog))
90 self.backlog = backlog
91 return backlog
92
94 self.downloader.lost_peer(self)
95 for i in xrange(len(self.have)):
96 if self.have[i]:
97 self.downloader.picker.lost_have(i)
98 self._letgo()
99 self.guard.download = None
100
102 if not self.active_requests:
103 return
104 if self.downloader.storage.endgame:
105 self.active_requests = []
106 return
107 lost = []
108 for index, begin, length in self.active_requests:
109 self.downloader.storage.request_lost(index, begin, length)
110 if index not in lost:
111 lost.append(index)
112 self.active_requests = []
113 ds = [d for d in self.downloader.downloads if not d.choked]
114 shuffle(ds)
115 for d in ds:
116 d._request_more(lost)
117 for d in self.downloader.downloads:
118 if d.choked and not d.interested:
119 for l in lost:
120 if d.have[l] and self.downloader.storage.do_I_have_requests(l):
121 d.interested = True
122 d.stream.send_interested()
123 break
124
126 if not self.choked:
127 self.choked = True
128 self._letgo()
129
135
137 try:
138 self.active_requests.remove((index, begin, len(piece)))
139 except ValueError:
140 self.downloader.discarded_bytes += len(piece)
141 return False
142 if self.downloader.storage.endgame:
143 self.downloader.all_requests.remove((index, begin, len(piece)))
144 self.last = bttime()
145 self.measure.update_rate(len(piece))
146 self.downloader.measurefunc(len(piece))
147 self.downloader.downmeasure.update_rate(len(piece))
148 if not self.downloader.storage.piece_came_in(index, begin, piece,
149 self.guard):
150 if self.downloader.storage.endgame:
151 while self.downloader.storage.do_I_have_requests(index):
152 nb, nl = self.downloader.storage.new_request(index)
153 self.downloader.all_requests.append((index, nb, nl))
154 for d in self.downloader.downloads:
155 d.fix_download_endgame()
156 return False
157 ds = [d for d in self.downloader.downloads if not d.choked]
158 shuffle(ds)
159 for d in ds:
160 d._request_more([index])
161 return False
162 if self.downloader.storage.do_I_have(index):
163 self.downloader.picker.complete(index)
164 if self.downloader.storage.endgame:
165 for d in self.downloader.downloads:
166 if d is not self and d.interested:
167 if d.choked:
168 d.fix_download_endgame()
169 else:
170 try:
171 d.active_requests.remove((index, begin, len(piece)))
172 except ValueError:
173 continue
174 d.stream.send_cancel(index, begin, len(piece))
175 d.fix_download_endgame()
176 self._request_more()
177 if self.downloader.picker.am_I_complete():
178 for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]:
179 d.stream.close()
180 return self.downloader.storage.do_I_have(index)
181
184
186 assert not self.choked
187 if len(self.active_requests) >= self._backlog():
188 return
189 if self.downloader.storage.endgame:
190 self.fix_download_endgame()
191 return
192 lost_interests = []
193 while len(self.active_requests) < self.backlog:
194 if indices is None:
195 interest = self.downloader.picker.next(self._want, self.have.numfalse == 0)
196 else:
197 interest = None
198 for i in indices:
199 if self.have[i] and self.downloader.storage.do_I_have_requests(i):
200 interest = i
201 break
202 if interest is None:
203 break
204 if not self.interested:
205 self.interested = True
206 self.stream.send_interested()
207 self.example_interest = interest
208 self.downloader.picker.requested(interest, self.have.numfalse == 0)
209 while len(self.active_requests) < (self.backlog-2) * 5 + 2:
210 begin, length = self.downloader.storage.new_request(interest)
211 self.active_requests.append((interest, begin, length))
212 self.stream.send_request(interest, begin, length)
213 if not self.downloader.storage.do_I_have_requests(interest):
214 lost_interests.append(interest)
215 break
216 if not self.active_requests and self.interested:
217 self.interested = False
218 self.stream.send_not_interested()
219 if lost_interests:
220 for d in self.downloader.downloads:
221 if d.active_requests or not d.interested:
222 continue
223 if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
224 continue
225 for lost in lost_interests:
226 if d.have[lost]:
227 break
228 else:
229 continue
230 interest = self.downloader.picker.next(d._want, d.have.numfalse == 0)
231 if interest is None:
232 d.interested = False
233 d.stream.send_not_interested()
234 else:
235 d.example_interest = interest
236 if self.downloader.storage.endgame:
237 self.downloader.all_requests = []
238 for d in self.downloader.downloads:
239 self.downloader.all_requests.extend(d.active_requests)
240 for d in self.downloader.downloads:
241 d.fix_download_endgame()
242
244 want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
245 if self.interested and not self.active_requests and not want:
246 self.interested = False
247 self.stream.send_not_interested()
248 return
249 if not self.interested and want:
250 self.interested = True
251 self.stream.send_interested()
252 if self.choked or len(self.active_requests) >= self._backlog():
253 return
254 shuffle(want)
255 del want[self.backlog - len(self.active_requests):]
256 self.active_requests.extend(want)
257 for piece, begin, length in want:
258 self.stream.send_request(piece, begin, length)
259
261 if self.have[index]:
262 return
263 if index == self.downloader.numpieces-1:
264 self.peermeasure.update_rate(self.downloader.storage.total_length-
265 (self.downloader.numpieces-1)*self.downloader.storage.piece_size)
266 else:
267 self.peermeasure.update_rate(self.downloader.storage.piece_size)
268 self.have[index] = True
269 self.downloader.picker.got_have(index)
270 if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
271 self.stream.close()
272 return
273 if self.downloader.storage.endgame:
274 self.fix_download_endgame()
275 elif self.downloader.storage.do_I_have_requests(index):
276 if not self.choked:
277 self._request_more([index])
278 else:
279 if not self.interested:
280 self.interested = True
281 self.stream.send_interested()
282
284 if self.downloader.picker.am_I_complete() and have.numfalse == 0:
285 self.stream.close()
286 return
287 self.have = have
288 for i in xrange(len(self.have)):
289 if self.have[i]:
290 self.downloader.picker.got_have(i)
291 if self.downloader.storage.endgame:
292 for piece, begin, length in self.downloader.all_requests:
293 if self.have[piece]:
294 self.interested = True
295 self.stream.send_interested()
296 return
297 for i in xrange(len(self.have)):
298 if self.have[i] and self.downloader.storage.do_I_have_requests(i):
299 self.interested = True
300 self.stream.send_interested()
301 return
302
305
307 return bttime() - self.last > self.downloader.snub_time
308
309
311
312 - def __init__(self, config, storage, picker, numpieces, downmeasure,
313 measurefunc, kickfunc):
314 self.config = config
315 self.storage = storage
316 self.picker = picker
317 self.chunksize = config['download_slice_size']
318 self.downmeasure = downmeasure
319 self.numpieces = numpieces
320 self.snub_time = config['snub_time']
321 self.measurefunc = measurefunc
322 self.kickfunc = kickfunc
323 self.downloads = []
324 self.sstats = {}
325 self.bad_peers = {}
326 self.discarded_bytes = 0
327
329 uid = stream.uniq_id()
330 sstats = self.sstats.get(uid)
331 if sstats is None:
332 sstats = PerStreamStats()
333 self.sstats[uid] = sstats
334 sstats.numstreams += 1
335 d = SingleDownload(self, stream)
336 sstats.lastdownload = d
337 self.downloads.append(d)
338 return d
339
341 self.downloads.remove(download)
342 uid = download.stream.uniq_id()
343 self.sstats[uid].numstreams -= 1
344 if self.sstats[uid].lastdownload == download:
345 self.sstats[uid].lastdownload = None
346
347 - def kick(self, download):
348 if not self.config['retaliate_to_garbled_data']:
349 return
350 uid = download.stream.uniq_id()
351
352
353
354
355 self.kickfunc(download.stream)
356