Package Anomos :: Module Downloader
[hide private]
[frames] | no frames]

Source Code for Module Anomos.Downloader

  1  # This program is free software: you can redistribute it and/or modify 
  2  # it under the terms of the GNU General Public License as published by 
  3  # the Free Software Foundation, either version 3 of the License, or 
  4  # (at your option) any later version. 
  5  # 
  6  # This program is distributed in the hope that it will be useful, 
  7  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
  8  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  9  # GNU General Public License for more details. 
 10  # 
 11  # You should have received a copy of the GNU General Public License 
 12  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 13   
 14  # Written by Bram Cohen 
 15   
 16  from random import shuffle 
 17   
 18  from Anomos import bttime, LOG as log 
 19  from Anomos.Measure import Measure 
 20  from Anomos.bitfield import Bitfield 
 21   
22 -class PerStreamStats(object):
23
24 - def __init__(self):
25 self.numgood = 0 26 self.bad = {} 27 self.numstreams = 0 28 self.lastdownload = None
29 30
31 -class BadDataGuard(object):
32
33 - def __init__(self, download):
34 self.download = download 35 self.uid = download.stream.uniq_id() 36 self.downloader = download.downloader 37 self.stats = self.downloader.sstats[self.uid] 38 self.lastindex = None
39
40 - def bad(self, index, bump = False):
41 self.stats.bad.setdefault(index, 0) 42 self.stats.bad[index] += 1 43 if self.uid not in self.downloader.bad_peers: 44 self.downloader.bad_peers[self.uid] = (False, self.stats) 45 if self.download is not None: 46 self.downloader.kick(self.download) 47 self.download = None 48 elif len(self.stats.bad) > 1 and self.stats.numstreams == 1 and \ 49 self.stats.lastdownload is not None: 50 # kick new stream from same IP if previous one sent bad data, 51 # mainly to give the algorithm time to find other bad pieces 52 # in case the peer is sending a lot of bad data 53 self.downloader.kick(self.stats.lastdownload) 54 #if len(self.stats.bad) >= 3 and len(self.stats.bad) > \ 55 # self.stats.numgood // 30: 56 # self.downloader.ban(self.uid) 57 if bump: 58 self.downloader.picker.bump(index)
59
60 - def good(self, index):
61 # lastindex is a hack to only increase numgood for by one for each good 62 # piece, however many chunks came from the stream(s) from this IP 63 if index != self.lastindex: 64 self.stats.numgood += 1 65 self.lastindex = index
66 67
68 -class SingleDownload(object):
69
70 - def __init__(self, downloader, stream):
71 self.downloader = downloader 72 self.stream = stream 73 self.choked = True 74 self.interested = False 75 self.active_requests = [] 76 self.measure = Measure(downloader.config['max_rate_period']) 77 self.peermeasure = Measure(max(downloader.storage.piece_size / 10000, 78 20)) 79 self.have = Bitfield(downloader.numpieces) 80 self.last = 0 81 self.example_interest = None 82 self.backlog = 2 83 self.guard = BadDataGuard(self)
84
85 - def _backlog(self):
86 backlog = 2 + int(4 * self.measure.get_rate() / 87 self.downloader.chunksize) 88 if backlog > 50: 89 backlog = max(50, int(.075 * backlog)) 90 self.backlog = backlog 91 return backlog
92
93 - def disconnected(self):
94 self.downloader.lost_peer(self) 95 for i in xrange(len(self.have)): 96 if self.have[i]: 97 self.downloader.picker.lost_have(i) 98 self._letgo() 99 self.guard.download = None
100
101 - def _letgo(self):
102 if not self.active_requests: 103 return 104 if self.downloader.storage.endgame: 105 self.active_requests = [] 106 return 107 lost = [] 108 for index, begin, length in self.active_requests: 109 self.downloader.storage.request_lost(index, begin, length) 110 if index not in lost: 111 lost.append(index) 112 self.active_requests = [] 113 ds = [d for d in self.downloader.downloads if not d.choked] 114 shuffle(ds) 115 for d in ds: 116 d._request_more(lost) 117 for d in self.downloader.downloads: 118 if d.choked and not d.interested: 119 for l in lost: 120 if d.have[l] and self.downloader.storage.do_I_have_requests(l): 121 d.interested = True 122 d.stream.send_interested() 123 break
124
125 - def got_choke(self):
126 if not self.choked: 127 self.choked = True 128 self._letgo()
129
130 - def got_unchoke(self):
131 if self.choked: 132 self.choked = False 133 if self.interested: 134 self._request_more()
135
136 - def got_piece(self, index, begin, piece):
137 try: 138 self.active_requests.remove((index, begin, len(piece))) 139 except ValueError: 140 self.downloader.discarded_bytes += len(piece) 141 return False 142 if self.downloader.storage.endgame: 143 self.downloader.all_requests.remove((index, begin, len(piece))) 144 self.last = bttime() 145 self.measure.update_rate(len(piece)) 146 self.downloader.measurefunc(len(piece)) 147 self.downloader.downmeasure.update_rate(len(piece)) 148 if not self.downloader.storage.piece_came_in(index, begin, piece, 149 self.guard): 150 if self.downloader.storage.endgame: 151 while self.downloader.storage.do_I_have_requests(index): 152 nb, nl = self.downloader.storage.new_request(index) 153 self.downloader.all_requests.append((index, nb, nl)) 154 for d in self.downloader.downloads: 155 d.fix_download_endgame() 156 return False 157 ds = [d for d in self.downloader.downloads if not d.choked] 158 shuffle(ds) 159 for d in ds: 160 d._request_more([index]) 161 return False 162 if self.downloader.storage.do_I_have(index): 163 self.downloader.picker.complete(index) 164 if self.downloader.storage.endgame: 165 for d in self.downloader.downloads: 166 if d is not self and d.interested: 167 if d.choked: 168 d.fix_download_endgame() 169 else: 170 try: 171 d.active_requests.remove((index, begin, len(piece))) 172 except ValueError: 173 continue 174 d.stream.send_cancel(index, begin, len(piece)) 175 d.fix_download_endgame() 176 self._request_more() 177 if self.downloader.picker.am_I_complete(): 178 for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]: 179 d.stream.close() 180 return self.downloader.storage.do_I_have(index)
181
182 - def _want(self, index):
183 return self.have[index] and self.downloader.storage.do_I_have_requests(index)
184
185 - def _request_more(self, indices = None):
186 assert not self.choked 187 if len(self.active_requests) >= self._backlog(): 188 return 189 if self.downloader.storage.endgame: 190 self.fix_download_endgame() 191 return 192 lost_interests = [] 193 while len(self.active_requests) < self.backlog: 194 if indices is None: 195 interest = self.downloader.picker.next(self._want, self.have.numfalse == 0) 196 else: 197 interest = None 198 for i in indices: 199 if self.have[i] and self.downloader.storage.do_I_have_requests(i): 200 interest = i 201 break 202 if interest is None: 203 break 204 if not self.interested: 205 self.interested = True 206 self.stream.send_interested() 207 self.example_interest = interest 208 self.downloader.picker.requested(interest, self.have.numfalse == 0) 209 while len(self.active_requests) < (self.backlog-2) * 5 + 2: 210 begin, length = self.downloader.storage.new_request(interest) 211 self.active_requests.append((interest, begin, length)) 212 self.stream.send_request(interest, begin, length) 213 if not self.downloader.storage.do_I_have_requests(interest): 214 lost_interests.append(interest) 215 break 216 if not self.active_requests and self.interested: 217 self.interested = False 218 self.stream.send_not_interested() 219 if lost_interests: 220 for d in self.downloader.downloads: 221 if d.active_requests or not d.interested: 222 continue 223 if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest): 224 continue 225 for lost in lost_interests: 226 if d.have[lost]: 227 break 228 else: 229 continue 230 interest = self.downloader.picker.next(d._want, d.have.numfalse == 0) 231 if interest is None: 232 d.interested = False 233 d.stream.send_not_interested() 234 else: 235 d.example_interest = interest 236 if self.downloader.storage.endgame: 237 self.downloader.all_requests = [] 238 for d in self.downloader.downloads: 239 self.downloader.all_requests.extend(d.active_requests) 240 for d in self.downloader.downloads: 241 d.fix_download_endgame()
242
243 - def fix_download_endgame(self):
244 want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests] 245 if self.interested and not self.active_requests and not want: 246 self.interested = False 247 self.stream.send_not_interested() 248 return 249 if not self.interested and want: 250 self.interested = True 251 self.stream.send_interested() 252 if self.choked or len(self.active_requests) >= self._backlog(): 253 return 254 shuffle(want) 255 del want[self.backlog - len(self.active_requests):] 256 self.active_requests.extend(want) 257 for piece, begin, length in want: 258 self.stream.send_request(piece, begin, length)
259
260 - def got_have(self, index):
261 if self.have[index]: 262 return 263 if index == self.downloader.numpieces-1: 264 self.peermeasure.update_rate(self.downloader.storage.total_length- 265 (self.downloader.numpieces-1)*self.downloader.storage.piece_size) 266 else: 267 self.peermeasure.update_rate(self.downloader.storage.piece_size) 268 self.have[index] = True 269 self.downloader.picker.got_have(index) 270 if self.downloader.picker.am_I_complete() and self.have.numfalse == 0: 271 self.stream.close() 272 return 273 if self.downloader.storage.endgame: 274 self.fix_download_endgame() 275 elif self.downloader.storage.do_I_have_requests(index): 276 if not self.choked: 277 self._request_more([index]) 278 else: 279 if not self.interested: 280 self.interested = True 281 self.stream.send_interested()
282
283 - def got_have_bitfield(self, have):
284 if self.downloader.picker.am_I_complete() and have.numfalse == 0: 285 self.stream.close() 286 return 287 self.have = have 288 for i in xrange(len(self.have)): 289 if self.have[i]: 290 self.downloader.picker.got_have(i) 291 if self.downloader.storage.endgame: 292 for piece, begin, length in self.downloader.all_requests: 293 if self.have[piece]: 294 self.interested = True 295 self.stream.send_interested() 296 return 297 for i in xrange(len(self.have)): 298 if self.have[i] and self.downloader.storage.do_I_have_requests(i): 299 self.interested = True 300 self.stream.send_interested() 301 return
302
303 - def get_rate(self):
304 return self.measure.get_rate()
305
306 - def is_snubbed(self):
307 return bttime() - self.last > self.downloader.snub_time
308 309
310 -class Downloader(object):
311
312 - def __init__(self, config, storage, picker, numpieces, downmeasure, 313 measurefunc, kickfunc):
314 self.config = config 315 self.storage = storage 316 self.picker = picker 317 self.chunksize = config['download_slice_size'] 318 self.downmeasure = downmeasure 319 self.numpieces = numpieces 320 self.snub_time = config['snub_time'] 321 self.measurefunc = measurefunc 322 self.kickfunc = kickfunc 323 self.downloads = [] 324 self.sstats = {} 325 self.bad_peers = {} 326 self.discarded_bytes = 0
327
328 - def make_download(self, stream):
329 uid = stream.uniq_id() 330 sstats = self.sstats.get(uid) 331 if sstats is None: 332 sstats = PerStreamStats() 333 self.sstats[uid] = sstats 334 sstats.numstreams += 1 335 d = SingleDownload(self, stream) 336 sstats.lastdownload = d 337 self.downloads.append(d) 338 return d
339
340 - def lost_peer(self, download):
341 self.downloads.remove(download) 342 uid = download.stream.uniq_id() 343 self.sstats[uid].numstreams -= 1 344 if self.sstats[uid].lastdownload == download: 345 self.sstats[uid].lastdownload = None
346
347 - def kick(self, download):
348 if not self.config['retaliate_to_garbled_data']: 349 return 350 uid = download.stream.uniq_id() 351 # kickfunc will schedule stream.close() to be executed later; we 352 # might now be inside RawServer event loop with events from that 353 # stream already queued, and trying to handle them after doing 354 # close() now could cause problems. 355 self.kickfunc(download.stream)
356