Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/branches/crazycache/dejavu/storage/storememcached.py

Revision 578 (checked in by fumanchu, 9 months ago)

Crazycache: I shouldn't have promoted IndexSet?.indices. The order is too important.

Line 
1 import md5
2 import memcache
3 import re
4 import sys
5
6 try:
7     set
8 except NameError:
9     from sets import Set as set
10
11 import dejavu
12 from dejavu import errors, logflags, storage
13 from geniusql import logic
14
15
16 def bytecode_regex(bits):
17     """Make a regular expression out of the given mixed bytecode bits.
18     
19     If any bit is an integer, it will be replaced with re.escape(chr(bit)).
20     Any bits which are already strings will be added as-is.
21     """
22     s = []
23     for bit in bits:
24         if not isinstance(bit, basestring):
25             bit = re.escape(chr(bit))
26         s.append(bit)
27     return "".join(s)
28
29 simple_compare = bytecode_regex([124, 0, 0, 105, ".", ".", 100, ".", ".", 106, 2, 0])
30 simple_and = bytecode_regex([111, ".", ".", 1])
31 indexable_regex = re.compile("^(%s(%s)?)+S$" % (simple_compare, simple_and))
32
33
34 class MemcachedStorageManager(storage.StorageManager):
35     """A Storage Manager which keeps all data in memcached.
36     
37     memcached is a high-performance, distributed memory object caching
38     system, generic in nature, but intended for use in speeding up
39     dynamic web applications by alleviating database load.
40     
41     See http://www.danga.com/memcached/
42     and ftp://ftp.tummy.com/pub/python-memcached/
43     
44     IMPORTANT: data stuck into memcached is not guaranteed to be stable.
45     It may disappear at any time, according to an internal LRU algorithm.
46     In particular, you should be aware that the LRU algorithm is itself
47     partitioned by object size (into "slabs"), so that a newer object
48     may be removed before an older one if they are of significantly
49     different sizes.
50     
51     Options:
52         memcached.servers: a list of strings of the form 'IP-address:port'.
53             These will be passed directly into the memcache.Client instance.
54         
55         memcached.global_index: if True (the default), this store will
56             maintain a index over the identifiers of all stored objects in
57             memcached itself. This is the 'safe' choice, and necessary if
58             your only store is memcached. However, if you run this store as
59             an ObjectCache.cache, you should turn this off, allowing
60             ObjectCache.nextstore to maintain the primary indexes--this
61             allows the cache to run orders of magnitude faster.
62         
63         memcached.index_time: the timeout, in seconds, for any cached indexes.
64             Default is 300 seconds.
65     """
66    
67     def __init__(self, allOptions={}):
68         storage.StorageManager.__init__(self, allOptions)
69        
70         self.name = allOptions['name']
71         self.global_index = allOptions.pop("memcached.global_index", True)
72         self.index_time = allOptions.pop("memcached.index_time", 5 * 60)
73         self.primary_keys = {}
74         self.indexsets = {}
75         self.index_stride = 50
76        
77         cache_opts = dict([(k[10:], v) for k, v in allOptions.iteritems()
78                            if k.startswith("memcached.")])
79         self.client = memcache.Client(**cache_opts)
80    
81     def hash(self, object):
82         """Return a consistent hash for object (for use in a memcached key)."""
83         # TODO: can we add overflow support for collisions?
84         return md5.new(repr(object)).hexdigest()
85    
86     def _unit_key(self, unit):
87         """Return (ident, memcached key) for the given unit."""
88         cls = unit.__class__
89         ident = tuple([getattr(unit, name) for name in self.primary_keys[cls]])
90         key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident))
91         return key
92    
93     def unit(self, cls, **kwargs):
94         """A single Unit which matches the given kwargs, else None.
95         
96         The first Unit matching the kwargs is returned; if no Units match,
97         None is returned.
98         """
99         keyset = set(kwargs.keys())
100        
101         # Try to retrieve a matching unit using its primary_keys.
102         # This will skip grabbing any indices (a HUGE optimization).
103         pk = self.primary_keys[cls]
104         if keyset >= set(pk):
105             return self._unit_by_primary_key(cls, pk, kwargs)
106        
107         # Try to retrieve a matching unit using an index.
108         # If self.global_index is True, the last one should
109         # be an index with propnames == []. See self.register.
110         indexset = self.indexsets[cls]
111         for index in indexset:
112             if keyset >= set(index):
113                 unit = indexset.unit(index, kwargs)
114                 if unit is not None:
115                     if self.logflags & logflags.RECALL:
116                         self.log(logflags.RECALL.message(cls, ('HIT', kwargs)))
117                     return unit
118        
119         # Return None since we have no more access paths.
120         if self.logflags & logflags.RECALL:
121             self.log(logflags.RECALL.message(cls, ('DEFER', kwargs)))
122         return None
123    
124     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
125         """Yield units of the given cls which match the given expr."""
126         if isinstance(classes, dejavu.UnitJoin):
127             for units in self._xmultirecall(classes, expr, order=order,
128                                             limit=limit, offset=offset):
129                 yield units
130             return
131        
132         cls = classes
133         indexset = self.indexsets[cls]
134        
135         if not isinstance(expr, logic.Expression):
136             expr = logic.Expression(expr)
137         if self.logflags & logflags.RECALL:
138             self.log(logflags.RECALL.message(cls, expr))
139        
140         if limit == 0:
141             return
142        
143         if offset and not order:
144             raise TypeError("Order argument expected when offset is provided.")
145        
146         filters = self.extract_filters(expr)
147        
148         # Try to retrieve a single matching unit using its primary_keys.
149         # This will skip grabbing any indices (a HUGE optimization).
150         pk = self.primary_keys[cls]
151         if set(filters.keys()) >= set(pk):
152             yield self._unit_by_primary_key(cls, pk, filters)
153             return
154        
155         # Try to retrieve matching units using an index.
156         # If self.global_index is True, the last one should
157         # be an index with propnames == []. See self.register.
158         for index in indexset:
159             if set(filters.keys()) >= set(index):
160                 data = indexset.xrecall(index, filters)
161                 data = self._xrecall_inner(data, expr)
162                 for unit in self._paginate(data, order, limit, offset, single=True):
163                     yield unit
164                 return
165    
166     def _xrecall_inner(self, units, expr=None):
167         """Private helper for self.xrecall."""
168         for unit in units:
169             if expr is None or expr(unit):
170                 # Must yield a sequence for use in _paginate.
171                 yield (unit,)
172    
173     def save(self, unit, forceSave=False):
174         """Store the unit."""
175         if self.logflags & logflags.SAVE:
176             self.log(logflags.SAVE.message(unit, forceSave))
177        
178         if forceSave or unit.dirty():
179             # Cleanse first because pickle state
180             # includes _initial_property_hash.
181             unit.cleanse()
182             self.client.set(self._unit_key(unit), unit)
183             self.indexsets[unit.__class__].add(unit)
184    
185     def destroy(self, unit):
186         """Delete the unit."""
187         if self.logflags & logflags.DESTROY:
188             self.log(logflags.DESTROY.message(unit))
189        
190         self.client.delete(self._unit_key(unit))
191         self.indexsets[unit.__class__].discard(unit)
192    
193     def reserve(self, unit):
194         """Reserve storage space for the Unit."""
195         if unit.identifiers:
196             cls = unit.__class__
197             indexset = self.indexsets[cls]
198            
199             if not unit.sequencer.valid_id(unit.identity()):
200                 if () in indexset:
201                     # Try to generate an identifier by looking
202                     # up all units in the global index.
203                     index = indexset.get({}) or []
204                     ids = [u.identity()
205                            for u in indexset.scan(index).itervalues()]
206                     unit.sequencer.assign(unit, ids)
207                 else:
208                     raise NotImplementedError(
209                         "Unindexed memcache cannot generate identifiers.")
210            
211             unit.cleanse()
212            
213             # Add the unit to the cache.
214             try:
215                 self.client.add(self._unit_key(unit), unit)
216             except IOError, exc:
217                 if exc.args[0] == 'NOT_STORED':
218                     pass
219                 raise
220            
221             # Add the unit to all indices.
222             indexset.add(unit)
223         else:
224             # This class has no identifiers, so skip reserve and wait for save.
225             pass
226        
227         # Usually we log ASAP, but here we log after
228         # the unit has had a chance to get an auto ID.
229         if self.logflags & logflags.RESERVE:
230             self.log(logflags.RESERVE.message(unit))
231    
232     def shutdown(self, conflicts='error'):
233         """Shut down all connections to internal storage.
234         
235         conflicts: see errors.conflict.
236         """
237         self.client.disconnect_all()
238    
239     def create_database(self, conflicts='error'):
240         """Create internal structures for the entire database.
241         
242         conflicts: see errors.conflict.
243         """
244         pass
245    
246     def drop_database(self, conflicts='error'):
247         """Destroy internal structures for the entire database.
248         
249         conflicts: see errors.conflict.
250         """
251         for cls in self.classes:
252             self.flush(cls)
253    
254     def create_storage(self, cls, conflicts='error'):
255         """Create internal structures for the given class.
256         
257         conflicts: see errors.conflict.
258         """
259         if self.logflags & logflags.DDL:
260             self.log(logflags.DDL.message("create storage %s" % cls))
261        
262         indexset = self.indexsets[cls]
263         if () in indexset:
264             try:
265                 self.client.add(indexset.key({}), [])
266             except IOError, exc:
267                 if exc.args[0] == 'NOT STORED':
268                     errors.conflict(conflicts, "Class %r already has storage."
269                                     % cls)
270                 else:
271                     raise
272    
273     def has_storage(self, cls):
274         """If storage structures exist for the given class, return True."""
275         return True
276    
277     def drop_storage(self, cls, conflicts='error'):
278         """Destroy internal structures for the given class.
279         
280         conflicts: see errors.conflict.
281         """
282         if self.logflags & logflags.DDL:
283             self.log(logflags.DDL.message("drop storage %s" % cls))
284         self.flush(cls)
285    
286     def add_property(self, cls, name, conflicts='error'):
287         """Add internal structures for the given property.
288         
289         conflicts: see errors.conflict.
290         """
291         clsname = cls.__name__
292         if self.logflags & logflags.DDL:
293             self.log(logflags.DDL.message("add property %s %s" %
294                                           (clsname, name)))
295        
296         indexset = self.indexsets[cls]
297         if () in indexset:
298             # TODO: recalculate if primary_keys changed
299             ci = self.client.get(indexset.key({})) or []
300             for id in ci:
301                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
302                 unit = self.client.get(key)
303                 if unit is not None:
304                     unit._properties[name] = None
305                     unit.cleanse()
306                     self.client.set(key, unit)
307    
308     def has_property(self, cls, name):
309         """If storage structures exist for the given property, return True."""
310         indexset = self.indexsets[cls]
311         if () in indexset:
312             clsname = cls.__name__
313             ci = self.client.get(indexset.key({}))
314            
315             if not ci:
316                 # We don't have any items, so there's nothing to
317                 # declare as 'unprepared'.
318                 return True
319            
320             for id in ci:
321                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
322                 unit = self.client.get(key)
323                 if unit is not None:
324                     return name in unit._properties
325        
326         return True
327    
328     def drop_property(self, cls, name, conflicts='error'):
329         """Destroy internal structures for the given property.
330         
331         conflicts: see errors.conflict.
332         """
333         clsname = cls.__name__
334         if self.logflags & logflags.DDL:
335             self.log(logflags.DDL.message("drop property %s %s" %
336                                           (clsname, name)))
337        
338         indexset = self.indexsets[cls]
339         if () in indexset:
340             ci = self.client.get(indexset.key({})) or []
341             for id in ci:
342                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
343                 unit = self.client.get(key)
344                 if unit is not None:
345                     del unit._properties[name]
346                     unit.cleanse()
347                     self.client.set(key, unit)
348    
349     def rename_property(self, cls, oldname, newname, conflicts='error'):
350         """Rename internal structures for the given property.
351         
352         conflicts: see errors.conflict.
353         """
354         clsname = cls.__name__
355         if self.logflags & logflags.DDL:
356             self.log(logflags.DDL.message("rename property %s from %s to %s"
357                                           % (cls, oldname, newname)))
358        
359         indexset = self.indexsets[cls]
360         if () in indexset:
361             ci = self.client.get(indexset.key({})) or []
362             for id in ci:
363                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
364                 unit = self.client.get(key)
365                 if unit is not None:
366                     unit._properties[newname] = unit._properties[oldname]
367                     del unit._properties[oldname]
368                     unit.cleanse()
369                     self.client.set(key, unit)
370    
371    
372     #                   Extra methods for use as a cache                   #
373    
374     def cachelen(self, cls):
375         indexset = self.indexsets[cls]
376         if () in indexset:
377             return len(self.client.get(indexset.key({})))
378         else:
379             return 0
380    
381     def cached_units(self, cls):
382         units = []
383         indexset = self.indexsets[cls]
384         if () in indexset:
385             for key in self.client.get(indexset.key({})):
386                 unit = self.client.get(key)
387                 if unit is not None:
388                     unit.cleanse()
389                     units.append(unit)
390         return units
391    
392     def flush(self, cls):
393         """Dump all objects of the given class."""
394         clsname = cls.__name__
395        
396         indexset = self.indexsets[cls]
397         if () in indexset:
398             gi_key = indexset.key({})
399             # Delete all units in the global index.
400             for id in self.client.get(gi_key) or []:
401                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
402                 self.client.delete(key)
403            
404             # Delete the global index.
405             self.client.delete(gi_key)
406         # TODO:
407         # else:
408         #     self.increment_generation(cls)
409    
410     def register(self, cls):
411         """Assert that Units of class 'cls' will be handled."""
412         # Set a default primary key for the class. Consumers are free to
413         # change this if another unique property is looked up more often.
414         self.primary_keys[cls] = tuple(cls.identifiers or cls.properties)
415        
416         # Add indices based on the .index attribute of each UnitProperty.
417         self.indexsets[cls] = i = IndexSet(self, cls)
418         for propname in cls.properties:
419             prop = getattr(cls, propname)
420             if prop.index:
421                 # There's usually no need for an index on the primary key;
422                 # we can just fetch each one directly by cache key.
423                 # Callers are free to add one in explicitly if needed,
424                 # for example, if wanting to retrive all units without
425                 # any filtering criteria.
426                 if propname not in cls.identifiers:
427                     i.add_index(propname)
428        
429         # Add an index with no propnames. This is a special
430         # sentinel value for the global index that keeps us DRY.
431         if self.global_index:
432             i.add_index()
433        
434         storage.StorageManager.register(self, cls)
435    
436     def _unit_by_primary_key(self, cls, keys, filters):
437         """Return a unit (or None) by primary keys which matches the filters dict.
438         
439         The filters argument must contain an entry for each key in the
440         given list of keys, although it may and often should contain
441         additional entries.
442         """
443         ident = tuple([filters[k] for k in keys])
444         key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident))
445         unit = self.client.get(key)
446         if unit is not None:
447             matching = True
448             if set(filters.keys()) > set(keys):
449                 # We retrieved the Unit using a subset of the filters.
450                 # Filter in full now.
451                 for k, v in filters.iteritems():
452                     if getattr(unit, k) != v:
453                         matching =