Package Anomos :: Module Relayer
[hide private]
[frames] | no frames]

Source Code for Module Anomos.Relayer

  1  # Relayer.py 
  2  # 
  3  # This program is free software: you can redistribute it and/or modify 
  4  # it under the terms of the GNU General Public License as published by 
  5  # the Free Software Foundation, either version 3 of the License, or 
  6  # (at your option) any later version. 
  7  # 
  8  # This program is distributed in the hope that it will be useful, 
  9  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 11  # GNU General Public License for more details. 
 12  # 
 13  # You should have received a copy of the GNU General Public License 
 14  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 15   
 16  # Written by Rich Jones, John Schanck 
 17   
 18  from Anomos.Protocol.AnomosRelayerProtocol import AnomosRelayerProtocol 
 19  from Anomos.Measure import Measure 
 20  from Anomos import LOG as log 
 21   
22 -class Relayer(AnomosRelayerProtocol):
23 """ As a tracking code is being sent, each peer it reaches (other than the 24 uploader and downloader) creates a Relayer object to maintain the 25 association between the incoming socket and the outgoing socket (so 26 that the TC only needs to be sent once). 27 """
28 - def __init__(self, stream_id, neighbor, outnid, data=None, orelay=None):
29 #storage, uprate, downrate, choker, key): 30 AnomosRelayerProtocol.__init__(self) 31 self.partial_recv = '' 32 self.recvd_break = False 33 self.sent_break = False 34 35 self.stream_id = stream_id 36 self.neighbor = neighbor 37 self.manager = neighbor.manager 38 self.ratelimiter = neighbor.ratelimiter 39 self.measurer = self.manager.relay_measure 40 self.choked = True 41 self.pre_complete_buffer = [] 42 self.complete = False 43 self.closed = False 44 self.next_upload = None 45 self.decremented_count = False # Hack to prevent double decrementing of relay count 46 self.orelay = orelay 47 # Make the other relayer which we'll send data through 48 if orelay is None: 49 self.manager.make_relay(outnid, data, self) 50 elif data is not None: 51 self.send_tracking_code(data)
52
53 - def set_other_relay(self, r):
54 self.orelay = r
55
56 - def set_measurer(self, measurer):
57 self.measurer = measurer
58
59 - def _complete_relay_message(self, msg):
60 if not self.orelay.closed: 61 self.orelay.send_relay_message(msg)
62
63 - def relay_message(self, msg):
64 if self.complete: 65 self.relay_message = self._complete_relay_message 66 self.relay_message(msg) 67 else: 68 self.pre_complete_buffer.append(msg)
69
70 - def send_partial(self, bytes):
71 if self.closed: 72 return 0 73 b = self.neighbor.send_partial(self.stream_id, bytes) 74 self.measurer.update_rate(b) 75 return b
76
77 - def connection_completed(self):
78 log.info("Relay connection [%02x:%d] established" % 79 (int(ord(self.neighbor.id)),self.stream_id)) 80 self.complete = True 81 self.flush_pre_buffer() 82 self.orelay.complete = True 83 self.orelay.flush_pre_buffer()
84
85 - def connection_flushed(self):
86 if self.should_queue(): 87 self.ratelimiter.queue(self)
88
89 - def should_queue(self):
90 return (self.next_upload is None) and self.neighbor.in_queue(self.stream_id)
91
92 - def close(self):
93 # Connection was closed locally (as opposed to 94 # being closed by receiving a BREAK message) 95 if self.closed: 96 log.warning("Double close") 97 return 98 log.info("Closing %s"%self.uniq_id()) 99 if not self.sent_break: 100 self.send_break() 101 self.shutdown()
102
103 - def shutdown(self):
104 if self.closed: 105 log.warning("Double close") 106 return 107 self.closed = True 108 self.ratelimiter.clean_closed() 109 if not self.complete: 110 return 111 if not (self.decremented_count or self.orelay.decremented_count): 112 self.manager.dec_relay_count() 113 self.decremented_count = True 114 # Tell our orelay to close. 115 if not self.orelay.closed: 116 self.orelay.ore_closed()
117
118 - def ore_closed(self):
119 """ Closes the connection when a Break has been received by our 120 other relay (ore). Called by this object's ore during 121 shutdown """ 122 if self.closed: 123 log.warning("Double close") 124 return 125 if not self.sent_break: 126 self.send_break()
127
128 - def flush_pre_buffer(self):
129 for msg in self.pre_complete_buffer: 130 self.relay_message(msg) 131 self.pre_complete_buffer = []
132
133 - def is_flushed(self):
134 return self.neighbor.socket.flushed()
135
136 - def got_exception(self, e):
137 log.error(e)
138 #self.torrent.handle_exception(e) 139
140 - def uniq_id(self):
141 return "%02x:%04x" % (ord(self.neighbor.id), self.stream_id)
142