Changeset 11

Show
Ignore:
Timestamp:
07/18/07 03:23:54 (1 year ago)
Author:
devja..@anarkystic.com
Message:

reorg

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • aws/aws/s3.py

    r8 r11  
    1 import base64 
    2 import hmac 
     1 
    32import httplib 
    4 import re 
    5 import sha 
    6 import time 
    73from StringIO import StringIO 
    84import xml.sax 
    95import urllib 
    106import mimetypes 
     7import os, os.path 
     8import cPickle as pickle 
    119 
    1210from twisted.internet import task, reactor, defer, protocol 
     
    1816from twisted.web2 import stream, http_headers 
    1917 
     18from aws.client import getPage, addAuthHeader 
    2019 
    2120class Options(usage.Options): 
     
    2625        ] 
    2726     
    28 AMAZON_HEADER_PREFIX = 'x-amz-' 
    29 AMAZON_NS = "http://s3.amazonaws.com/doc/2006-03-01/" 
    30 METADATA_PREFIX = 'x-amz-meta-' 
    31  
    32 class S3(service.Service): 
     27class S3(onject): 
    3328    def __init__(self, config): 
    3429        self.config = config 
     
    170165 
    171166        d = self.getPage(**params) 
    172         d.addCallback(build_old_response) 
    173         d.addCallback(GetResponse) 
     167        def _build_s3(r): 
     168            metadata = {} 
     169            headers = response.headers.getAllHeaders() 
     170            for hkey in headers.keys(): 
     171                if hkey.lower().startswith(METADATA_PREFIX): 
     172                    metadata[hkey[len(METADATA_PREFIX):]] = headers[hkey] 
     173                    del headers[hkey] 
     174            return S3Object(response.stream, metadata) 
     175 
     176        d.addCallback(_build_s3) 
    174177        return d 
    175178    def headObject(self, bucket, name, headers=None): 
     
    201204        return getPage(**kw) 
    202205 
     206 
     207class CachedS3(S3): 
     208    def __init__(self, config, cache): 
     209        S3.__init__(self, config) 
     210        self.cache = cache 
     211 
     212    def getObject(self, *args, **kw): 
     213        item = self.cache.get(*args, **kw) 
     214        if not item: 
     215            item = S3.getObject(self, *args, **kw) 
     216            self.cache.set(*args, **kw) 
     217        return item 
     218 
     219    def putObject(self, *args, **kw): 
     220        rv = S3.putObject(self, *args, **kw) 
     221        self.cache.clear(*args, **kw) 
     222        return rv 
     223 
     224    def deleteObject(self, *args, **kw): 
     225        rv = S3.deleteObject(self, *args, **kw) 
     226        self.cache.clear(*args, **kw) 
     227        return rv 
     228 
    203229def makeService(config): 
    204230    s = S3(config) 
     
    206232 
    207233 
    208 # util 
    209 def getPage(url, access_key=None, secret_access_key=None, contextFactory=None,  
    210             headers=None, method="GET", data=None, *args, **kwargs): 
    211     """ small mod to client.getPage """ 
    212     if not headers: 
    213         headers = {} 
    214     scheme, host, port, path = _parse(url) 
    215      
    216     headers.setdefault("Host", host) 
    217  
    218     if type(data) is type(""): 
    219         data = stream.MemoryStream(data) 
    220      
    221     # this is where we add the amazon auth 
    222     add_auth_header(method, path, headers, access_key, secret_access_key) 
    223      
    224     # build the request 
    225     stupid_headers_part_1 = dict([(k, (v,)) for k, v in headers.items()]) 
    226     stupid_headers = http_headers.Headers(rawHeaders=stupid_headers_part_1) 
    227     req = http.ClientRequest(method, path, stupid_headers, stream=data) 
    228  
    229     d = protocol.ClientCreator(reactor, http.HTTPClientProtocol).connectTCP(host, 80) 
    230     d.addCallback(lambda p: p.submitRequest(req, closeAfter=False)) 
    231  
    232     return d 
     234 
     235 
     236 
     237### Cache 
     238class S3SimpleFileCache(object): 
     239    def __init__(self, config): 
     240        self.base = config['base'] 
     241         
     242    def get(self, bucket, name, headers=None): 
     243        data_path = os.path.join(self.base, 'data', bucket, name) 
     244        meta_path = os.path.join(self.base, '.metadata', bucket, name) 
     245        if not os.path.exists(data_path): 
     246            return None 
     247        metadata = pickle.load(open(meta_path)) 
     248        data = stream.FileStream(open(data_path)) 
     249        return S3Object(data, metadata) 
     250 
     251 
     252    def put(self, bucket, name, headers=None, acl=None, meta=None): 
     253        data_path = os.path.join(self.base, 'data', bucket, name) 
     254        meta_path = os.path.join(self.base, '.metadata', bucket, name) 
     255        data_dir = os.path.dirname(data_path) 
     256        meta_dir = os.path.dirname(meta_path) 
     257        for dir in [data_dir, meta_dir]: 
     258            try: 
     259                os.makedirs(dir) 
     260            except: 
     261                pass 
     262        pickle.dump(meta, open(meta_path)) 
     263        f = open(data_path) 
    233264 
    234265### Support for Amazon S3 example code 
     
    242273    def read(self, *args, **kw): 
    243274        return self.body.read(*args, **kw) 
    244 def build_old_response(response): 
     275 
     276 
     277def exhaust_stream(s): 
    245278    buf = [] 
    246279    def _buffer(s): 
     
    248281            return "".join(buf) 
    249282        buf.append(s) 
    250         next = defer.maybeDeferred(response.stream.read) 
     283        next = defer.maybeDeferred(s.read) 
    251284        next.addCallback(_buffer) 
    252285        return next 
    253286         
     287    d = defer.maybeDeferred(response.stream.read) 
     288    d.addCallback(_buffer) 
     289    return d 
     290 
     291def build_old_response(response): 
    254292    def _assemble(b): 
    255293        headers = dict(response.headers.getAllRawHeaders()) 
    256294        o = OldResponse(response.code, headers, b) 
    257295        return o 
    258     d = defer.maybeDeferred(response.stream.read) 
    259     d.addCallback(_buffer) 
     296    d = exhaust_sream(response.stream) 
    260297    d.addCallback(_assemble) 
    261298    return d 
     
    264301### not my favorite interface but using SAX is probably faster than  
    265302### whatever I was going to do 
    266 def add_auth_header(method, path, headers, access_key, secret_access_key): 
    267     headers.setdefault("Date", time.strftime("%a, %d %b %Y %X GMT", time.gmtime())) 
    268  
    269     interesting_headers = {} 
    270     for key in headers: 
    271         lk = key.lower() 
    272         if lk in ['content-md5', 'content-type', 'date'] or lk.startswith(AMAZON_HEADER_PREFIX): 
    273             interesting_headers[lk] = headers[key] 
    274  
    275     # these keys get empty strings if they don't exist 
    276     interesting_headers.setdefault('content-type', '') 
    277     interesting_headers.setdefault('content-md5', '') 
    278  
    279     # just in case someone used this.  it's not necessary in this lib. 
    280     if interesting_headers.has_key('x-amz-date'): 
    281         interesting_headers['date'] = '' 
    282  
    283     sorted_header_keys = interesting_headers.keys() 
    284     sorted_header_keys.sort() 
    285      
    286     canonical_string = "%s\n" % method 
    287     for key in sorted_header_keys: 
    288         if key.startswith(AMAZON_HEADER_PREFIX): 
    289             canonical_string += "%s:%s\n" % (key, interesting_headers[key]) 
    290         else: 
    291             canonical_string += "%s\n" % interesting_headers[key] 
    292      
    293      
    294     # don't include anything after the first ? in the resource... 
    295     canonical_string += path.split('?')[0] 
    296      
    297     # ...unless there is an acl or torrent parameter 
    298     if re.search("[&?]acl($|=|&)", path): 
    299         canonical_string += "?acl" 
    300     elif re.search("[&?]torrent($|=|&)", path): 
    301         canonical_string += "?torrent" 
    302      
    303     encoded_canonical = base64.encodestring(hmac.new(secret_access_key, canonical_string, sha).digest()).strip() 
    304     headers['Authorization'] = "AWS %s:%s" % (access_key, encoded_canonical)  
    305     return headers 
    306303class S3Object: 
    307304    def __init__(self, data, metadata={}):