1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from Anomos.Protocol.AnomosRelayerProtocol import AnomosRelayerProtocol
19 from Anomos.Measure import Measure
20 from Anomos import LOG as log
21
22 -class Relayer(AnomosRelayerProtocol):
23 """ As a tracking code is being sent, each peer it reaches (other than the
24 uploader and downloader) creates a Relayer object to maintain the
25 association between the incoming socket and the outgoing socket (so
26 that the TC only needs to be sent once).
27 """
28 - def __init__(self, stream_id, neighbor, outnid, data=None, orelay=None):
29
30 AnomosRelayerProtocol.__init__(self)
31 self.partial_recv = ''
32 self.recvd_break = False
33 self.sent_break = False
34
35 self.stream_id = stream_id
36 self.neighbor = neighbor
37 self.manager = neighbor.manager
38 self.ratelimiter = neighbor.ratelimiter
39 self.measurer = self.manager.relay_measure
40 self.choked = True
41 self.pre_complete_buffer = []
42 self.complete = False
43 self.closed = False
44 self.next_upload = None
45 self.decremented_count = False
46 self.orelay = orelay
47
48 if orelay is None:
49 self.manager.make_relay(outnid, data, self)
50 elif data is not None:
51 self.send_tracking_code(data)
52
55
57 self.measurer = measurer
58
62
69
76
84
88
90 return (self.next_upload is None) and self.neighbor.in_queue(self.stream_id)
91
102
104 if self.closed:
105 log.warning("Double close")
106 return
107 self.closed = True
108 self.ratelimiter.clean_closed()
109 if not self.complete:
110 return
111 if not (self.decremented_count or self.orelay.decremented_count):
112 self.manager.dec_relay_count()
113 self.decremented_count = True
114
115 if not self.orelay.closed:
116 self.orelay.ore_closed()
117
119 """ Closes the connection when a Break has been received by our
120 other relay (ore). Called by this object's ore during
121 shutdown """
122 if self.closed:
123 log.warning("Double close")
124 return
125 if not self.sent_break:
126 self.send_break()
127
129 for msg in self.pre_complete_buffer:
130 self.relay_message(msg)
131 self.pre_complete_buffer = []
132
134 return self.neighbor.socket.flushed()
135
138
139
141 return "%02x:%04x" % (ord(self.neighbor.id), self.stream_id)
142