Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/branches/crazycache/dejavu/storage/storememcached.py

Revision 578 (checked in by fumanchu, 2 years ago)

Crazycache: I shouldn't have promoted IndexSet?.indices. The order is too important.

Line 
1 import md5
2 import memcache
3 import re
4 import sys
5
6 try:
7     set
8 except NameError:
9     from sets import Set as set
10
11 import dejavu
12 from dejavu import errors, logflags, storage
13 from geniusql import logic
14
15
16 def bytecode_regex(bits):
17     """Make a regular expression out of the given mixed bytecode bits.
18     
19     If any bit is an integer, it will be replaced with re.escape(chr(bit)).
20     Any bits which are already strings will be added as-is.
21     """
22     s = []
23     for bit in bits:
24         if not isinstance(bit, basestring):
25             bit = re.escape(chr(bit))
26         s.append(bit)
27     return "".join(s)
28
29 simple_compare = bytecode_regex([124, 0, 0, 105, ".", ".", 100, ".", ".", 106, 2, 0])
30 simple_and = bytecode_regex([111, ".", ".", 1])
31 indexable_regex = re.compile("^(%s(%s)?)+S$" % (simple_compare, simple_and))
32
33
34 class MemcachedStorageManager(storage.StorageManager):
35     """A Storage Manager which keeps all data in memcached.
36     
37     memcached is a high-performance, distributed memory object caching
38     system, generic in nature, but intended for use in speeding up
39     dynamic web applications by alleviating database load.
40     
41     See http://www.danga.com/memcached/
42     and ftp://ftp.tummy.com/pub/python-memcached/
43     
44     IMPORTANT: data stuck into memcached is not guaranteed to be stable.
45     It may disappear at any time, according to an internal LRU algorithm.
46     In particular, you should be aware that the LRU algorithm is itself
47     partitioned by object size (into "slabs"), so that a newer object
48     may be removed before an older one if they are of significantly
49     different sizes.
50     
51     Options:
52         memcached.servers: a list of strings of the form 'IP-address:port'.
53             These will be passed directly into the memcache.Client instance.
54         
55         memcached.global_index: if True (the default), this store will
56             maintain a index over the identifiers of all stored objects in
57             memcached itself. This is the 'safe' choice, and necessary if
58             your only store is memcached. However, if you run this store as
59             an ObjectCache.cache, you should turn this off, allowing
60             ObjectCache.nextstore to maintain the primary indexes--this
61             allows the cache to run orders of magnitude faster.
62         
63         memcached.index_time: the timeout, in seconds, for any cached indexes.
64             Default is 300 seconds.
65     """
66    
67     def __init__(self, allOptions={}):
68         storage.StorageManager.__init__(self, allOptions)
69        
70         self.name = allOptions['name']
71         self.global_index = allOptions.pop("memcached.global_index", True)
72         self.index_time = allOptions.pop("memcached.index_time", 5 * 60)
73         self.primary_keys = {}
74         self.indexsets = {}
75         self.index_stride = 50
76        
77         cache_opts = dict([(k[10:], v) for k, v in allOptions.iteritems()
78                            if k.startswith("memcached.")])
79         self.client = memcache.Client(**cache_opts)
80    
81     def hash(self, object):
82         """Return a consistent hash for object (for use in a memcached key)."""
83         # TODO: can we add overflow support for collisions?
84         return md5.new(repr(object)).hexdigest()
85    
86     def _unit_key(self, unit):
87         """Return (ident, memcached key) for the given unit."""
88         cls = unit.__class__
89         ident = tuple([getattr(unit, name) for name in self.primary_keys[cls]])
90         key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident))
91         return key
92    
93     def unit(self, cls, **kwargs):
94         """A single Unit which matches the given kwargs, else None.
95         
96         The first Unit matching the kwargs is returned; if no Units match,
97         None is returned.
98         """
99         keyset = set(kwargs.keys())
100        
101         # Try to retrieve a matching unit using its primary_keys.
102         # This will skip grabbing any indices (a HUGE optimization).
103         pk = self.primary_keys[cls]
104         if keyset >= set(pk):
105             return self._unit_by_primary_key(cls, pk, kwargs)
106        
107         # Try to retrieve a matching unit using an index.
108         # If self.global_index is True, the last one should
109         # be an index with propnames == []. See self.register.
110         indexset = self.indexsets[cls]
111         for index in indexset:
112             if keyset >= set(index):
113                 unit = indexset.unit(index, kwargs)
114                 if unit is not None:
115                     if self.logflags & logflags.RECALL:
116                         self.log(logflags.RECALL.message(cls, ('HIT', kwargs)))
117                     return unit
118        
119         # Return None since we have no more access paths.
120         if self.logflags & logflags.RECALL:
121             self.log(logflags.RECALL.message(cls, ('DEFER', kwargs)))
122         return None
123    
124     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
125         """Yield units of the given cls which match the given expr."""
126         if isinstance(classes, dejavu.UnitJoin):
127             for units in self._xmultirecall(classes, expr, order=order,
128                                             limit=limit, offset=offset):
129                 yield units
130             return
131        
132         cls = classes
133         indexset = self.indexsets[cls]
134        
135         if not isinstance(expr, logic.Expression):
136             expr = logic.Expression(expr)
137         if self.logflags & logflags.RECALL:
138             self.log(logflags.RECALL.message(cls, expr))
139        
140         if limit == 0:
141             return
142        
143         if offset and not order:
144             raise TypeError("Order argument expected when offset is provided.")
145        
146         filters = self.extract_filters(expr)
147        
148         # Try to retrieve a single matching unit using its primary_keys.
149         # This will skip grabbing any indices (a HUGE optimization).
150         pk = self.primary_keys[cls]
151         if set(filters.keys()) >= set(pk):
152             yield self._unit_by_primary_key(cls, pk, filters)
153             return
154        
155         # Try to retrieve matching units using an index.
156         # If self.global_index is True, the last one should
157         # be an index with propnames == []. See self.register.
158         for index in indexset:
159             if set(filters.keys()) >= set(index):
160                 data = indexset.xrecall(index, filters)
161                 data = self._xrecall_inner(data, expr)
162                 for unit in self._paginate(data, order, limit, offset, single=True):
163                     yield unit
164                 return
165    
166     def _xrecall_inner(self, units, expr=None):
167         """Private helper for self.xrecall."""
168         for unit in units:
169             if expr is None or expr(unit):
170                 # Must yield a sequence for use in _paginate.
171                 yield (unit,)
172    
173     def save(self, unit, forceSave=False):
174         """Store the unit."""
175         if self.logflags & logflags.SAVE:
176             self.log(logflags.SAVE.message(unit, forceSave))
177        
178         if forceSave or unit.dirty():
179             # Cleanse first because pickle state
180             # includes _initial_property_hash.
181             unit.cleanse()
182             self.client.set(self._unit_key(unit), unit)
183             self.indexsets[unit.__class__].add(unit)
184    
185     def destroy(self, unit):
186         """Delete the unit."""
187         if self.logflags & logflags.DESTROY:
188             self.log(logflags.DESTROY.message(unit))
189        
190         self.client.delete(self._unit_key(unit))
191         self.indexsets[unit.__class__].discard(unit)
192    
193     def reserve(self, unit):
194         """Reserve storage space for the Unit."""
195         if unit.identifiers:
196             cls = unit.__class__
197             indexset = self.indexsets[cls]
198            
199             if not unit.sequencer.valid_id(unit.identity()):
200                 if () in indexset:
201                     # Try to generate an identifier by looking
202                     # up all units in the global index.
203                     index = indexset.get({}) or []
204                     ids = [u.identity()
205                            for u in indexset.scan(index).itervalues()]
206                     unit.sequencer.assign(unit, ids)
207                 else:
208                     raise NotImplementedError(
209                         "Unindexed memcache cannot generate identifiers.")
210            
211             unit.cleanse()
212            
213             # Add the unit to the cache.
214             try:
215                 self.client.add(self._unit_key(unit), unit)
216             except IOError, exc:
217                 if exc.args[0] == 'NOT_STORED':
218                     pass
219                 raise
220            
221             # Add the unit to all indices.
222             indexset.add(unit)
223         else:
224             # This class has no identifiers, so skip reserve and wait for save.
225             pass
226        
227         # Usually we log ASAP, but here we log after
228         # the unit has had a chance to get an auto ID.
229         if self.logflags & logflags.RESERVE:
230             self.log(logflags.RESERVE.message(unit))
231    
232     def shutdown(self, conflicts='error'):
233         """Shut down all connections to internal storage.
234         
235         conflicts: see errors.conflict.
236         """
237         self.client.disconnect_all()
238    
239     def create_database(self, conflicts='error'):
240         """Create internal structures for the entire database.
241         
242         conflicts: see errors.conflict.
243         """
244         pass
245    
246     def drop_database(self, conflicts='error'):
247         """Destroy internal structures for the entire database.
248         
249         conflicts: see errors.conflict.
250         """
251         for cls in self.classes:
252             self.flush(cls)
253    
254     def create_storage(self, cls, conflicts='error'):
255         """Create internal structures for the given class.
256         
257         conflicts: see errors.conflict.
258         """
259         if self.logflags & logflags.DDL:
260             self.log(logflags.DDL.message("create storage %s" % cls))
261        
262         indexset = self.indexsets[cls]
263         if () in indexset:
264             try:
265                 self.client.add(indexset.key({}), [])
266             except IOError, exc:
267                 if exc.args[0] == 'NOT STORED':
268                     errors.conflict(conflicts, "Class %r already has storage."
269                                     % cls)
270                 else:
271                     raise
272    
273     def has_storage(self, cls):
274         """If storage structures exist for the given class, return True."""
275         return True
276    
277     def drop_storage(self, cls, conflicts='error'):
278         """Destroy internal structures for the given class.
279         
280         conflicts: see errors.conflict.
281         """
282         if self.logflags & logflags.DDL:
283             self.log(logflags.DDL.message("drop storage %s" % cls))
284         self.flush(cls)
285    
286     def add_property(self, cls, name, conflicts='error'):
287         """Add internal structures for the given property.
288         
289         conflicts: see errors.conflict.
290         """
291         clsname = cls.__name__
292         if self.logflags & logflags.DDL:
293             self.log(logflags.DDL.message("add property %s %s" %
294                                           (clsname, name)))
295        
296         indexset = self.indexsets[cls]
297         if () in indexset:
298             # TODO: recalculate if primary_keys changed
299             ci = self.client.get(indexset.key({})) or []
300             for id in ci:
301                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
302                 unit = self.client.get(key)
303                 if unit is not None:
304                     unit._properties[name] = None
305                     unit.cleanse()
306                     self.client.set(key, unit)
307    
308     def has_property(self, cls, name):
309         """If storage structures exist for the given property, return True."""
310         indexset = self.indexsets[cls]
311         if () in indexset:
312             clsname = cls.__name__
313             ci = self.client.get(indexset.key({}))
314            
315             if not ci:
316                 # We don't have any items, so there's nothing to
317                 # declare as 'unprepared'.
318                 return True
319            
320             for id in ci:
321                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
322                 unit = self.client.get(key)
323                 if unit is not None:
324                     return name in unit._properties
325        
326         return True
327    
328     def drop_property(self, cls, name, conflicts='error'):
329         """Destroy internal structures for the given property.
330         
331         conflicts: see errors.conflict.
332         """
333         clsname = cls.__name__
334         if self.logflags & logflags.DDL:
335             self.log(logflags.DDL.message("drop property %s %s" %
336                                           (clsname, name)))
337        
338         indexset = self.indexsets[cls]
339         if () in indexset:
340             ci = self.client.get(indexset.key({})) or []
341             for id in ci:
342                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
343                 unit = self.client.get(key)
344                 if unit is not None:
345                     del unit._properties[name]
346                     unit.cleanse()
347                     self.client.set(key, unit)
348    
349     def rename_property(self, cls, oldname, newname, conflicts='error'):
350         """Rename internal structures for the given property.
351         
352         conflicts: see errors.conflict.
353         """
354         clsname = cls.__name__
355         if self.logflags & logflags.DDL:
356             self.log(logflags.DDL.message("rename property %s from %s to %s"
357                                           % (cls, oldname, newname)))
358        
359         indexset = self.indexsets[cls]
360         if () in indexset:
361             ci = self.client.get(indexset.key({})) or []
362             for id in ci:
363                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
364                 unit = self.client.get(key)
365                 if unit is not None:
366                     unit._properties[newname] = unit._properties[oldname]
367                     del unit._properties[oldname]
368                     unit.cleanse()
369                     self.client.set(key, unit)
370    
371    
372     #                   Extra methods for use as a cache                   #
373    
374     def cachelen(self, cls):
375         indexset = self.indexsets[cls]
376         if () in indexset:
377             return len(self.client.get(indexset.key({})))
378         else:
379             return 0
380    
381     def cached_units(self, cls):
382         units = []
383         indexset = self.indexsets[cls]
384         if () in indexset:
385             for key in self.client.get(indexset.key({})):
386                 unit = self.client.get(key)
387                 if unit is not None:
388                     unit.cleanse()
389                     units.append(unit)
390         return units
391    
392     def flush(self, cls):
393         """Dump all objects of the given class."""
394         clsname = cls.__name__
395        
396         indexset = self.indexsets[cls]
397         if () in indexset:
398             gi_key = indexset.key({})
399             # Delete all units in the global index.
400             for id in self.client.get(gi_key) or []:
401                 key = "%s:%s:%s" % (self.name, clsname, self.hash(id))
402                 self.client.delete(key)
403            
404             # Delete the global index.
405             self.client.delete(gi_key)
406         # TODO:
407         # else:
408         #     self.increment_generation(cls)
409    
410     def register(self, cls):
411         """Assert that Units of class 'cls' will be handled."""
412         # Set a default primary key for the class. Consumers are free to
413         # change this if another unique property is looked up more often.
414         self.primary_keys[cls] = tuple(cls.identifiers or cls.properties)
415        
416         # Add indices based on the .index attribute of each UnitProperty.
417         self.indexsets[cls] = i = IndexSet(self, cls)
418         for propname in cls.properties:
419             prop = getattr(cls, propname)
420             if prop.index:
421                 # There's usually no need for an index on the primary key;
422                 # we can just fetch each one directly by cache key.
423                 # Callers are free to add one in explicitly if needed,
424                 # for example, if wanting to retrive all units without
425                 # any filtering criteria.
426                 if propname not in cls.identifiers:
427                     i.add_index(propname)
428        
429         # Add an index with no propnames. This is a special
430         # sentinel value for the global index that keeps us DRY.
431         if self.global_index:
432             i.add_index()
433        
434         storage.StorageManager.register(self, cls)
435    
436     def _unit_by_primary_key(self, cls, keys, filters):
437         """Return a unit (or None) by primary keys which matches the filters dict.
438         
439         The filters argument must contain an entry for each key in the
440         given list of keys, although it may and often should contain
441         additional entries.
442         """
443         ident = tuple([filters[k] for k in keys])
444         key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident))
445         unit = self.client.get(key)
446         if unit is not None:
447             matching = True
448             if set(filters.keys()) > set(keys):
449                 # We retrieved the Unit using a subset of the filters.
450                 # Filter in full now.
451                 for k, v in filters.iteritems():
452                     if getattr(unit, k) != v:
453                         matching = False
454                         break
455            
456             if matching:
457                 if self.logflags & logflags.IO:
458                     self.log(logflags.IO.message('PK HIT (%s) %s' % (key, filters)))
459                 unit.cleanse()
460                 return unit
461        
462         if self.logflags & logflags.IO:
463             self.log(logflags.IO.message('PK MISS (%s) %s' % (key, filters)))
464         return None
465    
466     def extract_filters(self, expr):
467         """Return a dict of (key == value) pairs from the given expr.
468         
469         If the given Expression contains operators other than ==, or if a
470         set of filters cannot be obtained for some other reason, returns {}.
471         In theory, we should be able to ignore other operators but the
472         simple regex we use isn't that smart; we'd have to do a full parse
473         of the expr and then functionally decompose it.
474         
475         This function is only designed to work on Expressions for a single
476         class (i.e. - no joins).
477         """
478         if expr.is_constant(True):
479             return {}
480        
481         fc = expr.func.func_code
482         if indexable_regex.match(fc.co_code):
483             if sys.version_info >= (2, 5):
484                 # Python 2.5 stopped including args in co_names.
485                 compkeys = fc.co_names
486             else:
487                 # The first co_names will be the positional args for the class.
488                 compkeys = fc.co_names[fc.co_argcount:]
489            
490             # "If a code object represents a function, the first item
491             # in co_consts is the documentation string of the function,
492             # or None if undefined."
493             compvals = fc.co_consts[1:]
494            
495             return dict(zip(compkeys, compvals))
496        
497         return {}
498    
499     def scan(self, mainstore, cls, expr):
500         """Return a list of units from a cached index (or None).
501         
502         The class and expression will be used to find a cached index;
503         if not found, the mainstore will be used to create one, and it
504         will be cached.
505         
506         Once an index has been obtained, it will be iterated over against
507         the cache. Each unit in the index which is not available in the
508         cache will be pulled from mainstore.
509         
510         If no index intersects with the given expression, None is returned.
511         """
512         filters = self.extract_filters(expr)
513         indexset = self.indexsets[cls]
514         keyattrs = self.primary_keys[cls]
515        
516         # Find the best index for the given filters.
517         for index in indexset:
518             if set(filters.keys()) >= set(index):
519                 break
520         else:
521             # Signal the caller that no index scan was performed.
522             return None
523        
524         criteria = [(k, filters[k]) for k in index]
525         ids = indexset.get(criteria)
526         if ids is None:
527             # Not in the cache. Grab the list of id-tuples from nextstore.
528             # Note well: we're NOT grabbing view(.., filters), because
529             # if filters > criteria, that would cache a subset of
530             # the index leaf node.
531             ids = mainstore.view((cls, keyattrs, dict(criteria)))
532             # Then cache the list result for next time. Note that index
533             # contents are unordered.
534             indexset.put(criteria, ids, time=self.index_time)
535        
536         # Query the cache for multiple units (by id).
537         units = indexset.scan(ids)
538        
539         # Now query the nextstore for any units that the cache missed...
540         misses = [k for k in ids if k not in units]
541         if self.index_stride:
542             # ...in chunks of length: self.index_stride.
543             for step in xrange(0, len(misses), self.index_stride):
544                 # TODO: allow for multiple identifiers
545                 misstep = zip(*misses[step:step + self.index_stride])[0]
546                 f = lambda x: getattr(x, keyattrs[0]) in misstep
547                 for unit in mainstore.recall(cls, f):
548                     units[tuple([getattr(unit, a) for a in keyattrs])] = unit
549                     try:
550                         self.save(unit, forceSave=True)
551                     except KeyError:
552                         # The cache refused to save the unit (possibly full).
553                         pass
554         elif misses:
555             # ...or all in one chunk if desired.
556             # TODO: allow for multiple identifiers
557             misstep = zip(*misses)[0]
558             f = lambda x: getattr(x, keyattrs[0]) in misstep
559             for unit in mainstore.recall(cls, f):
560                 units[tuple([getattr(unit, a) for a in keyattrs])] = unit
561                 try:
562                     self.save(unit, forceSave=True)
563                 except KeyError:
564                     # The cache refused to save the unit (possibly full).
565                     pass
566        
567         indexset.filter(index, filters, units)
568        
569         return units.values()
570
571
572 class IndexSet(object):
573     """A set of indices for a single class.
574     
575     Each index covers a tuple of unit attributes.
576     
577     Each leaf node of each index is stored in memcached under its own key;
578     each value is a list of tuple([unit.k for k in primary_keys[cls]]).
579     For example, given an index over ("age", ), each distinct recall
580     operation will produce its own index node:
581         
582         recall(Person, {age: 31}) -> ns:Person:index(age=31) = [(1132, 663)]
583         recall(Person, {age: 25}) -> ns:Person:index(age=25) = [(12, 34, 22)]
584         recall(Person, {age: 64}) -> ns:Person:index(age=64) = [(7, 17, 27)]
585     """
586    
587     def __init__(self, store, cls):
588         self.store = store
589         self.cls = cls
590         self._key_template = '%s:%s:index(%%s)' % (store.name, cls.__name__)
591         self._indices = []
592    
593     def add_index(self, *attributes):
594         """Add an index over the given attributes."""
595         # Sort them from most-specific (most properties) to least.
596         if attributes not in self._indices:
597             self._indices.append(attributes)
598             self._indices.sort(lambda x, y: cmp(len(y), len(x)))
599    
600     def __iter__(self):
601         return iter(self._indices)
602    
603     def key(self, criteria):
604         """Return the cache key for the index node for the given criteria.
605         
606         The given criteria must be an iterable of (key, value) tuples,
607         and must only contain keys for an existing index.
608         
609         If criteria is an empty list, the 'global index' key is returned.
610         """
611         criteria = ["%s=%s" % (k, str(v).replace(" ", "+"))
612                     for k, v in criteria]
613         return self._key_template % ",".join(criteria)
614    
615     def get(self, criteria):
616         """Return a cached list of unit ids which match the given criteria.
617         
618         The given criteria must be an iterable of (key, value) tuples,
619         and must only contain keys for an existing index.
620         
621         If criteria is an empty list, the 'global index' key is returned.
622         
623         The ids returned will be a list of tuples of the form:
624             tuple([getattr(unit, name) for name in primary_keys[cls]])
625         """
626         cache_key = self.key(criteria)
627         ids = self.store.client.get(cache_key)
628         if self.store.logflags & logflags.IO:
629             if ids is None:
630                 idlen = None
631             else:
632                 idlen = len(ids)
633             self.store.log(logflags.IO.message("INDEX GET (%s) len %r" %
634                                                (cache_key, idlen)))
635         return ids
636    
637     def put(self, criteria, ids, time=0):
638         """Cache a list of unit identifiers which match the given criteria.
639         
640         The given criteria must be an iterable of (key, value) tuples,
641         and must only contain keys for an existing index.
642         
643         If criteria is an empty list, the 'global index' key is returned.
644         
645         The ids provided MUST be a list of tuples of the form:
646             tuple([getattr(unit, name) for name in primary_keys[cls]])
647         """
648         cache_key = self.key(criteria)
649         if self.store.logflags & logflags.IO:
650             self.store.log(logflags.IO.message("INDEX PUT (%s) len %r: %r" %
651                                                (cache_key, len(ids), ids)))
652         self.store.client.set(cache_key, ids, time=time)
653    
654     def scan(self, ids):
655         """Return a dict of multiple units from the given list of ids.
656         
657         The ids provided MUST be a list of tuples of the form:
658             tuple([getattr(unit, name) for name in primary_keys[cls]])
659         
660         The returned dict will not contain entries for any units which
661         have expired from the cache. Callers may use this information
662         to request missed units from another store.
663         """
664         clsname = self.cls.__name__
665         if ids:
666             keys = ["%s:%s:%s" % (self.store.name, clsname, self.store.hash(id))
667                     for id in ids]
668             data = self.store.client.get_multi(keys)
669            
670             # Transform the dict back to id keys instead of cache keys.
671             units = {}
672             for i, k in zip(ids, keys):
673                 unit = data.get(k, None)
674                 if unit is not None:
675                     unit.cleanse()
676                     units[i] = unit
677         else:
678             units = {}
679        
680         if self.store.logflags & logflags.IO:
681             self.store.log(logflags.IO.message("INDEX SCAN %s (%r hits of %r)" %
682                                                (clsname, len(units), len(ids))))
683         return units
684    
685     def unit(self, index, filters):
686         """Return a unit from the index which matches the filters dict (or None).
687         
688         The filters argument must contain an entry for each key in the given
689         index, although it may and often should contain additional entries.
690         """
691         if set(filters.keys()) > set(index):
692             for unit in self.xrecall(index, filters):
693                 return unit
694         else:
695             clsname = self.cls.__name__
696             # If the filters and index keys are equal, it should be faster
697             # to perform single gets against memcached, rather than the
698             # get_multi calls that self.xrecall performs.
699             criteria = [(k, filters[k]) for k in index]
700             ids = self.get(criteria)
701             if ids:
702                 for id in ids:
703                     cache_key = "%s:%s:%s" % (self.store.name, clsname, self.hash(id))
704                     unit = self.client.get(cache_key)
705                     if unit is None:
706                         if self.store.logflags & logflags.IO:
707                             self.store.log(logflags.IO.message(
708                                 'INDEX MISS (%s) %s' % (cache_key, filters)))
709                     else:
710                         if self.store.logflags & logflags.IO:
711                             self.store.log(logflags.IO.message(
712                                 'INDEX HIT (%s) %s' % (cache_key, filters)))
713                         unit.cleanse()
714                         return unit
715             else:
716                 if self.store.logflags & logflags.IO:
717                     self.store.log(logflags.IO.message(
718                         'INDEX EMPTY (%s) %s' % (clsname, filters)))
719         return None
720    
721     def xrecall(self, index, filters):
722         """Yield units from the given index which match the filters dict.
723         
724         The filters argument must contain an entry for each key in the given
725         index, although it may and often should contain additional entries.
726         """
727         criteria = [(k, filters[k]) for k in index]
728         ids = self.get(criteria)
729         if ids:
730             units = self.scan(ids)
731             self.filter(index, filters, units)
732             for unit in units.itervalues():
733                 unit.cleanse()
734                 yield unit
735    
736     def filter(self, index, filters, units):
737         """Remove any units which don't match filters, and update the index.
738         
739         The filters argument must contain an entry for each key in the given
740         index, although it may and often should contain additional entries.
741         
742         The 'units' arg must be a dict of (id, unit) pairs, and must be
743         the complete set of units from an index node; that is, the result
744         of an indexset.get() call for the same index.
745         """
746         ids = units.keys()
747         removals = False
748         for id, unit in units.items():
749             for key, value in filters.iteritems():
750                 if getattr(unit, key) != value:
751                     del units[id]
752                     # Remove any idents from the index node that no longer
753                     # satisfy the index criteria. This is how we update
754                     # index nodes--eager adds but late discards.
755                     if key in index:
756                         removals = True
757                         ids.remove(id)
758         if removals:
759             criteria = [(k, filters[k]) for k in index]
760             indexset.put(criteria, ids, time=self.store.index_time)
761    
762     def add(self, unit):
763         """Add the given unit to all indices."""
764         ident = tuple([getattr(unit, name)
765                        for name in self.store.primary_keys[self.cls]])
766         for index in self._indices:
767             criteria = [(k, getattr(unit, k)) for k in index]
768             indexnode = self.get(criteria) or []
769             if ident not in indexnode:
770                 indexnode.append(ident)
771                 self.put(criteria, indexnode)
772    
773     def discard(self, unit):
774         """Discard the given unit from all indices."""
775         ident = tuple([getattr(unit, name)
776                        for name in self.store.primary_keys[self.cls]])
777         for index in self._indices:
778             criteria = [(k, getattr(unit, k)) for k in index]
779             indexnode = self.get(criteria) or []
780             if ident in indexnode:
781                 indexnode.remove(ident)
782                 self.put(criteria, indexnode)
783
Note: See TracBrowser for help on using the browser.