root/aws/aws/sqs.py

Revision 14, 7.4 kB (checked in by devja..@anarkystic.com, 1 year ago)

most functionality in

Line 
1
2 from StringIO import StringIO
3 import urllib
4 import mimetypes
5 import os, os.path
6 import cPickle as pickle
7
8 from twisted.internet import task, reactor, defer, protocol
9 from twisted.application import service
10 from twisted.python import usage
11
12 from twisted.web2.client import http
13 from twisted.web2 import stream, http_headers
14
15 from lxml import etree
16
17 from aws.client import rest
18 from aws import s3_example, util
19
20 AMAZON_SQS_NS = "http://queue.amazonaws.com/doc/2007-05-01/"
21 default_ns = {"sqs": AMAZON_SQS_NS}
22
23 class Options(usage.Options):
24     optParameters = [
25             ['access_key', "k", 'I SHOULD BE A KEY'],
26             ['secret_access_key', "s", 'I SHOULD BE THE ULTRA SUPER SECRET KEY'],
27             ['base_url', None, "http://queue.amazonaws.com"],
28             ['aws_version', None, "2007-05-01"],
29         ]
30    
31 def _print(s):
32     print s
33     return s
34
35 class SQS(object):
36     def __init__(self, config):
37         self.config = config
38         self.access_key = config['access_key']
39         self.secret_access_key = config['secret_access_key']
40         self.base_url = config['base_url']
41    
42     # interface
43     def postQueue(self, name):
44         options = {"QueueName": name}
45         query = urllib.urlencode(options)
46
47         url = self.base_url + "/?" + query
48        
49         params = dict(url=url, method="POST")
50    
51
52         def _checkErrors(r):
53             #print r.headers.getAllRawHeaders()
54             r.stream.read().addCallback(_print)
55             if r.code != 200:
56                 raise Exception("bad status", r)
57             return r
58
59         def _grabTheUrl(r):
60             return r.xpath("//sqs:QueueUrl/text()", namespaces=default_ns)[0]
61
62         d = self.getPage(**params)
63         d.addCallback(_checkErrors)
64         d.addCallback(build_xml_response)
65         d.addCallback(_grabTheUrl)
66         d.addBoth(_print)
67
68         return d
69     def listQueues(self, prefix=None):
70         url = self.base_url
71         if prefix:
72             query = urllib.urlencode({"QueueNamePrefix": prefix})
73             url = url + "/?" + query
74
75         params = dict(url=url, method="GET")
76
77         def _checkErrors(r):
78             #r.stream.read().addCallback(_print)
79             if r.code != 200:
80                 raise Exception("bad status", r)
81             return r
82
83         def _grabTheUrls(r):
84             return r.xpath("//sqs:QueueUrl/text()", namespaces=default_ns)
85
86         d = self.getPage(**params)
87         d.addCallback(_checkErrors)
88         d.addCallback(build_xml_response)
89         d.addCallback(_grabTheUrls)
90         return d
91     def getQueue(self, queue, max=None, timeout=None):
92         url = queue + "/front"
93
94         options = {}
95         if max is not None:
96             options['NumberOfMessages'] = max
97         if timeout is not None:
98             options['VisibilityTimeout'] = timeout
99         if options:
100             query = urllib.urlencode(options)
101             url = url + "?" + query
102         params = dict(url=url, method="GET")
103
104         def _checkErrors(r):
105             r.stream.read().addCallback(_print)
106             if r.code != 200:
107                 raise Exception("bad status", r)
108             return r
109
110         def _grabTheMessages(r):
111             elms = r.xpath("//sqs:Message", namespaces=default_ns)
112             messages = []
113             for elm in elms:
114                 messages.append(SQSMessage(
115                     elm.xpath("sqs:MessageId/text()",
116                               namespaces=default_ns)[0],
117                     elm.xpath("sqs:MessageBody/text()",
118                               namespaces=default_ns)[0],
119                     queue))
120                    
121             return messages
122
123         d = self.getPage(**params)
124         d.addCallback(_checkErrors)
125         d.addCallback(build_xml_response)
126         d.addCallback(_grabTheMessages)
127         return d
128     def putMessage(self, queue, data, headers=None):
129         url = "%s/back" % (queue)
130         if not headers:
131             headers = {}
132        
133         headers.setdefault("Content-type", "text/plain")
134                    
135         params = dict(url=url, method="PUT", headers=headers, data=data)
136
137         def _checkErrors(r):
138             print r.headers.getAllRawHeaders()
139             r.stream.read().addCallback(_print)
140             if r.code != 200:
141                 raise Exception("bad status", r)
142             return r
143
144         d = self.getPage(**params)
145         d.addCallback(_checkErrors)
146         return d
147     def putMessageFromFile(self, queue, file, headers=None):
148         url = "%s/back" % (queue)
149         if not headers:
150             headers = {}
151        
152         content_type, content_encoding = mimetypes.guess_type(file.name)
153         headers.setdefault("Content-type", content_type)
154         #headers.setdefault("Content-encoding", content_encoding)
155        
156         data = stream.FileStream(file)
157
158         return self.putMessage(queue=queue, data=data, headers=headers)
159     def getMessage(self, queue, message_id):
160         url = queue + "/" + message_id
161
162         params = dict(url=url, method="GET")
163
164         def _checkErrors(r):
165             if r.code != 200:
166                 raise Exception("bad status", r)
167             return r
168
169         def _grabTheMessage(r):
170             elms = r.xpath("//sqs:Message", namespaces=default_ns)
171             messages = []
172             for elm in elms:
173                 messages.append(SQSMessage(
174                     elm.xpath("sqs:MessageId/text()",
175                               namespaces=default_ns)[0],
176                     elm.xpath("sqs:MessageBody/text()",
177                               namespaces=default_ns)[0],
178                     queue))
179                    
180             return messages[0]
181
182         d = self.getPage(**params)
183         d.addBoth(_checkErrors)
184         d.addCallback(build_xml_response)
185         d.addCallback(_grabTheMessage)
186         return d
187     def deleteMessage(self, queue, message_id):
188         url = queue + "/" + message_id
189
190         params = dict(url=url, method="DELETE")
191
192         def _checkErrors(r):
193             if r.code != 200:
194                 raise Exception("bad status", r)
195             return r
196
197         d = self.getPage(**params)
198         d.addBoth(_checkErrors)
199         return d
200    
201     # util
202     def getPage(self, **kw):
203         kw.setdefault("access_key", self.access_key)
204         kw.setdefault("secret_access_key", self.secret_access_key)
205         kw.setdefault("headers", {})
206         kw['headers'].setdefault("AWS-Version", self.config['aws_version'])
207         kw['headers'].setdefault("Content-type", "text/plain")
208         return rest.getPage(**kw)
209
210 class SQSMessage(object):
211     def __init__(self, id, body, queue):
212         self.id = id
213         self.body = body
214         self.queue = queue
215     def __str__(self):
216         return "<SQSMessage id='%s' data='%s'>" % (self.id, self.body.strip())
217     def __repr__(self):
218         return str(self)
219
220
221 def build_xml_response(r):
222     d = util.exhaust_stream(r.stream)
223     d.addCallback(StringIO)
224     d.addCallback(etree.parse)
225     return d
226
227
228 def makeService(config):
229     s = SQS(config)
230     return s
231
232
233 ### Support for Amazon S3 example code
234 class OldResponse(object):
235     def __init__(self, status, headers, body):
236         self.headers = headers
237         self.msg = headers
238         self.body = StringIO(body)
239         self.status = status
240    
241     def read(self, *args, **kw):
242         return self.body.read(*args, **kw)
243 def build_old_response(response):
244     def _assemble(b):
245         headers = dict(response.headers.getAllRawHeaders())
246         o = OldResponse(response.code, headers, b)
247         return o
248     d = util.exhaust_stream(response.stream)
249     d.addCallback(_assemble)
250     return d
Note: See TracBrowser for help on using the browser.