root/spreader/spreader.py

Revision 7, 6.7 kB (checked in by devja..@anarkystic.com, 1 year ago)

no merge in twisted's usage

Line 
1 import select
2 import socket
3 import md5
4
5 from twisted.internet import task
6 from twisted.application import service
7 from twisted.python import usage
8
9 from logging import getLogger
10
11
12 log = getLogger()
13
14 import spread
15
16 class SpreaderOptions(usage.Options):
17     optParameters = [
18             ['spread_poll_interval', None, '0.05'],
19             ['spread_host', None, 'localhost'],
20             ['spread_port', None, str(spread.DEFAULT_SPREAD_PORT)],
21             ['spread_private', None, 'dummy'],
22             ['spread_membership', None, '1'],
23             ['spread_timeout', None, '0.01'],
24         ]
25    
26 class ParallelSpreaderOptions(usage.Options):
27     optParameters = [
28             ['spread_groups', None, ''],
29         ] + SpreaderOptions.optParameters
30
31 class Spreader(service.Service):
32     mailbox = None
33     privateGroup = None
34
35     def __init__(self, config, service_):
36         self.pollInterval = float(config['spread_poll_interval'])
37         self.service = service_
38         self.poller = task.LoopingCall(self.poll)
39         self.spreadName = "%s@%s"%(config['spread_port'], config['spread_host'])
40         self.privateName = "%s-%s" % (config['spread_private'],
41                                       socket.gethostname())
42         self.priority = 0
43         self.membership = int(config['spread_membership'])
44         self.timeout = float(config['spread_timeout'])
45
46     def startService(self):
47         service.Service.startService(self)
48         self.connect()
49         self.poller.start(self.pollInterval)
50
51     def stopService(self):
52         service.Service.stopService(self)
53         if self.poller.running:
54             self.poller.stop()
55         self.disconnect()
56
57     def poll(self):
58         if not self.mailbox:
59             return
60         ready = select.select([self.mailbox.fileno()], [], [], self.timeout)
61         if not len(ready[0]):
62            return
63         m = self.mailbox.receive()
64         if isinstance(m, spread.RegularMsgType):
65             self.regularMessageReceived(m)
66         elif isinstance(m, spread.MembershipMsgType):
67             self.membershipMessageReceived(m)
68
69     def regularMessageReceived(self, message):
70         self.messageReceived(message.sender, message.groups, message.message,
71                              message.msg_type, message.endian)
72
73     def membershipMessageReceived(self, message):
74         if message.reason == 0:
75             self.groupTransitioned(message.group)
76         elif message.reason == spread.CAUSED_BY_JOIN:
77             self.groupJoined(message.group, message.extra)
78             self.groupChanged(message.group, message.members)
79         elif (message.reason == spread.CAUSED_BY_LEAVE
80               or message.reason == spread.CAUSED_BY_DISCONNECT):
81             self.groupLeft(message.group, message.extra)
82             self.groupChanged(message.group, message.members)
83         else:
84             raise Exception(message)
85
86     def messageReceived(self, sender, groups, message, msg_type, extra):
87         log.debug("messageReceived: %s %s %s", sender, groups, message)
88         pass
89
90     def groupTransitioned(self, group):
91         log.debug("groupTransitioned: %s", group)
92         pass
93
94     def groupJoined(self, group, who):
95         log.debug("groupJoined: %s %s", group, who)
96         pass
97
98     def groupLeft(self, group, who):
99         log.debug("groupLeft: %s %s", group, who)
100         pass
101
102     def groupChanged(self, group, current):
103         log.debug("groupChanged: %s %s", group, current)
104         pass
105
106     def connect(self):
107         self.mailbox = spread.connect(self.spreadName, self.privateName,
108                                       self.priority, self.membership)
109         self.privateGroup = self.mailbox.private_group
110    
111     def disconnect(self):
112         if self.mailbox:
113             self.mailbox.disconnect()
114             self.mailbox = None
115             self.privateGroup = None
116
117     def join(self, group):
118         if not self.mailbox:
119             raise Exception("no mailbox")
120         self.mailbox.join(group)
121
122     def leave(self, group):
123         if not self.mailbox:
124             raise Exception("no mailbox")
125         self.mailbox.leave(group)
126
127     def multicast(self, service_type, group, message, message_type=0):
128         if not self.mailbox:
129             raise Exception("no mailbox")
130         return self.mailbox.multicast(service_type, group, message, message_type)
131
132     def multigroup_multicast(self, *a, **kw):
133         if not self.mailbox:
134             raise Exception("no mailbox")
135         return self.mailbox.multigroup_multicast(*a, **kw)
136
137
138 class ParallelSpreader(Spreader):
139     groups = None
140
141     def __init__(self, config, service_):
142         Spreader.__init__(self, config, service_)
143         self.groups = {}
144         for k in config['spread_groups'].split(","):
145             self.groups[k] = {"index": None,
146                               "members": None,
147                               "handle_all": None}
148        
149     def connect(self):
150         Spreader.connect(self)
151         # join the groups we want to listen to
152         for k in self.groups.keys():
153             self.join(k)
154
155     def groupChanged(self, group, current):
156         Spreader.groupChanged(self, group, current)
157         current = list(current)
158         if self.privateGroup not in current:
159             self.groups[group]['index'] = None
160             return
161        
162         # update the group entry to have the new members,
163         # disable handle_all, and find our new index
164         self.groups[group]['members'] = current
165         self.groups[group]['handle_all'] = False
166         self.groups[group]['index'] = current.index(self.privateGroup)
167
168     def groupTransitioned(self, group):
169         Spreader.groupTransitioned(self, group)
170         self.groups[group]['handle_all'] = True
171    
172     def regularMessageReceived(self, message):
173         if not self.shouldHandle(message.groups, message.msg_type):
174             return
175         Spreader.regularMessageReceived(self, message)
176    
177     def safecast(self, group, message, message_type=None):
178         if message_type is None:
179             message_type = self.hash(message)
180         service_type = spread.SAFE_MESS
181         return self.multicast(service_type, group, message, message_type)
182
183     def hash(self, s):
184         return int(md5.new(s).hexdigest()[:2], 16)
185
186     def shouldHandle(self, groups, id):
187         # XXX doesn't really handle multiple groups the best way probably
188         for g in groups:
189             try:
190                index = self.groups[g]['index']
191             except ValueError:
192                 # if we are not in the group, ignore (this shouldn't happen)
193                 continue
194
195             # if we are transitioning, handle anything you see
196             if self.groups[g]['handle_all']:
197                 return True
198
199             # if i hash right, do it too
200             who = id % len(self.groups[g]['members'])
201             if who == index:
202                 return True
203         return False
204
205    
Note: See TracBrowser for help on using the browser.