Changeset 572
- Timestamp:
- 11/05/07 09:14:11
- Files:
-
- branches/crazycache/dejavu/storage/caching.py (modified) (3 diffs)
- branches/crazycache/dejavu/storage/storememcached.py (modified) (22 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/crazycache/dejavu/storage/caching.py
r571 r572 47 47 self.fulljoin = allOptions.get("fulljoin", False) 48 48 self.cache_recalls = [] 49 self.xspecial_stride = 5050 49 51 50 self.cache = allOptions.get("cache") … … 74 73 return u 75 74 76 def xspecial(self, cls, key, value, order=None):77 """Yield multiple units of the given cls where key=value."""78 keyattrs = self.cache.primary_keys[cls]79 80 # Get a cached list of identifier-tuples, ordered if requested.81 # TODO: add order to the idkey.82 ids = self.cache.get_index(cls, {key: value})83 if ids is None:84 # Not in the cache. Grab the list of id-tuples from nextstore.85 ids = self.view((cls, keyattrs, {key: value}), order=order)86 # Then cache the list result for next time.87 self.cache.put_index(cls, {key: value}, ids, time = 5 * 60)88 89 # Query the cache for multiple units (by id).90 items = self.cache.scan_index(cls, ids)91 misses = [k for k in ids if k not in items]92 93 # Now query the DB for any items that the cache missed...94 if self.xspecial_stride:95 # ...in chunks of length: self.xspecial_stride.96 for step in xrange(0, len(misses), self.xspecial_stride):97 # TODO: allow for multiple identifiers98 f = lambda x: ((getattr(x, keyattrs[0]), ) in99 misses[step:step + self.xspecial_stride])100 for unit in self.recall(cls, f):101 items[tuple([getattr(unit, a) for a in keyattrs])] = unit102 elif misses:103 # ...or all in one chunk if desired.104 f = lambda x: (getattr(x, keyattrs[0]), ) in misses105 for unit in self.recall(cls, f):106 items[tuple([getattr(unit, a) for a in keyattrs])] = unit107 108 # Return non-null items only, preserving order109 for k in ids:110 yield items[k]111 ## unit = items[k]112 ## if unit is not None:113 ## yield unit114 115 75 def xrecall(self, classes, expr=None, order=None, limit=None, offset=None): 116 76 """Return a Unit iterator.""" … … 137 97 return 138 98 99 if offset and not order: 100 raise TypeError("Order argument expected when offset is provided.") 101 102 if cls.identifiers and cls in self.cache.classes: 103 fc = expr.func.func_code 104 compkeys = fc.co_names[1:] 105 # TODO: allow multiple keys (as long as one of them is indexed). 106 if (fc.co_code == simple_attr_lookup and len(compkeys) == 1 107 # Hmmmmm. Should we allow on-the-fly indices? 108 # Something bugs me about that idea, but I don't know what. 109 and getattr(cls, compkeys[0]).index 110 ): 111 filters = {compkeys[0]: fc.co_consts[1]} 112 try: 113 units = self.cache.scan(self.nextstore, cls, filters, order) 114 except ValueError: 115 pass 116 else: 117 for unit in units: 118 yield unit 119 return 120 139 121 seen = {} 140 if order:122 if not order: 141 123 # If an order is supplied, there's no point in running the 142 124 # query against our cache (because we'd have to interleave 143 # the results with those from storage anyway). 144 pass 145 elif offset: 146 raise TypeError("Order argument expected when offset is provided.") 147 elif cls.identifiers and cls in self.cache.classes: 148 fc = expr.func.func_code 149 compkeys = fc.co_names[1:] 150 if (fc.co_code == simple_attr_lookup and len(compkeys) == 1 151 # TODO: use getattr(cls, compkeys[0]).index to dispatch. 152 ## and (cls, compkeys[0]) in self.cache_recalls 153 ): 154 for unit in self.xspecial(cls, compkeys[0], fc.co_consts[1], order): 155 yield unit 156 return 157 125 # the results with those from storage anyway). In fact, the 126 # only reason to hit the cache at all here is to either hit 127 # the limit or allow the caller to stop iterating before 128 # reaching the DB. 158 129 if self.fullquery: 159 130 # Query the cache. 160 for unit in self.cache.xrecall(cls, expr, limit=limit , offset=offset):131 for unit in self.cache.xrecall(cls, expr, limit=limit): 161 132 seen[unit.identity()] = None 162 133 yield unit branches/crazycache/dejavu/storage/storememcached.py
r571 r572 44 44 ObjectCache.nextstore to maintain the primary indexes--this 45 45 allows the cache to run orders of magnitude faster. 46 47 Secondary Indexes: 48 49 We don't even try to invalidate any of the secondary indexes, because 50 to do so we'd either have to 1) keep around the old criteria for each 51 modified unit (to determine from which index to remove the unit), or 52 2) rely on an index of indexes, neither of which we want to do. 53 They're just there for x/recall() and consumers of ObjectCache; 54 we invalidate by timeout, not by event. 46 47 memcached.index_time: the timeout, in seconds, for any cached indexes. 48 Default is 300 seconds. 55 49 """ 56 50 … … 60 54 self.name = allOptions['name'] 61 55 self.global_index = allOptions.pop("memcached.global_index", True) 56 self.index_time = allOptions.pop("memcached.index_time", 5 * 60) 62 57 self.primary_keys = {} 63 self.indices = {} 58 self.indexsets = {} 59 self.index_stride = 50 64 60 65 61 cache_opts = dict([(k[10:], v) for k, v in allOptions.iteritems() … … 77 73 ident = tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 78 74 key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident)) 79 return ident,key75 return key 80 76 81 77 def unit(self, cls, **kwargs): … … 94 90 95 91 # Try to retrieve a matching unit using an index. 96 # Sort them from most-specific (most properties) to least.97 92 # If self.global_index is True, the last one should 98 93 # be an index with propnames == []. See self.register. 99 indices = list(self.indices[cls]) 100 indices.sort(lambda x, y: cmp(len(y), len(x))) 101 for propnames in indices: 102 if keyset >= set(propnames): 103 unit = self._unit_from_index(cls, propnames, kwargs) 94 indexset = self.indexsets[cls] 95 for index in indexset: 96 if keyset >= set(index): 97 unit = indexset.unit(index, kwargs) 104 98 if unit is not None: 105 99 if self.logflags & logflags.RECALL: … … 121 115 122 116 cls = classes 117 indexset = self.indexsets[cls] 123 118 124 119 if expr: … … 140 135 141 136 # Try to retrieve matching units using an index. 142 # Sort them from most-specific (most properties) to least.143 137 # If self.global_index is True, the last one should 144 138 # be an index with propnames == []. See self.register. 145 indices = list(self.indices[cls]) 146 indices.sort(lambda x, y: cmp(len(y), len(x))) 147 for propnames in indices: 148 if set(compkeys) >= set(propnames): 149 data = self._xrecall_from_index(cls, propnames, filters) 139 for index in indexset: 140 if set(compkeys) >= set(index): 141 data = indexset.xrecall(index, filters) 150 142 data = self._xrecall_inner(data, expr) 151 143 for unit in self._paginate(data, order, limit, offset, single=True): … … 154 146 155 147 if self.global_index: 156 data = self._xrecall_from_index(cls,[], {})148 data = indexset.xrecall([], {}) 157 149 data = self._xrecall_inner(data, expr) 158 150 for unit in self._paginate(data, order, limit, offset, single=True): … … 178 170 # includes _initial_property_hash. 179 171 unit.cleanse() 180 181 ident, key = self._unit_key(unit) 182 self.client.set(key, unit) 183 184 # See the class doc for more information about secondary indexes. 185 if self.global_index: 186 cls = unit.__class__ 187 index = self.get_index(cls, {}) or set() 188 index.add(ident) 189 self.put_index(cls, {}, index) 172 self.client.set(self._unit_key(unit), unit) 173 self.indexsets[unit.__class__].add(unit) 190 174 191 175 def destroy(self, unit): … … 194 178 self.log(logflags.DESTROY.message(unit)) 195 179 196 ident, key = self._unit_key(unit) 197 self.client.delete(key) 198 199 # See the class doc for more information about secondary indexes. 200 if self.global_index: 201 cls = unit.__class__ 202 index = self.get_index(cls, {}) or set() 203 index.discard(ident) 204 self.put_index(cls, {}, index) 180 self.client.delete(self._unit_key(unit)) 181 self.indexsets[unit.__class__].discard(unit) 205 182 206 183 def reserve(self, unit): … … 208 185 if unit.identifiers: 209 186 cls = unit.__class__ 210 if self.global_index: 211 # See the class doc for more information about secondary indexes. 212 index = self.get_index(cls, {}) or set() 213 214 if not unit.sequencer.valid_id(unit.identity()): 215 ids = [u.identity() for u in 216 self.scan_index(cls, index).itervalues()] 187 indexset = self.indexsets[cls] 188 189 if not unit.sequencer.valid_id(unit.identity()): 190 if self.global_index: 191 # Try to generate an identifier by looking 192 # up all units in the global index. 193 index = indexset.get({}) or [] 194 ids = [u.identity() 195 for u in indexset.scan(index).itervalues()] 217 196 unit.sequencer.assign(unit, ids) 218 unit.cleanse() 219 220 # Add the unit to the cache. 221 ident, key = self._unit_key(unit) 222 self.client.add(key, unit) 223 224 # Add the unit to the global index. 225 index.add(ident) 226 self.put_index(cls, {}, index) 227 else: 228 if not unit.sequencer.valid_id(unit.identity()): 197 else: 229 198 raise NotImplementedError( 230 199 "Unindexed memcache cannot generate identifiers.") 231 232 unit.cleanse() 233 ident, key = self._unit_key(unit) 234 try: 235 self.client.add(key, unit) 236 except IOError, exc: 237 if exc.args[0] == 'NOT_STORED': 238 pass 239 raise 200 201 unit.cleanse() 202 203 # Add the unit to the cache. 204 try: 205 self.client.add(self._unit_key(unit), unit) 206 except IOError, exc: 207 if exc.args[0] == 'NOT_STORED': 208 pass 209 raise 210 211 # Add the unit to all indices. 212 indexset.add(unit) 240 213 else: 241 214 # This class has no identifiers, so skip reserve and wait for save. … … 279 252 if self.global_index: 280 253 try: 281 self.client.add(self. _index_key(cls, {}), set())254 self.client.add(self.indexsets[cls].key({}), []) 282 255 except IOError, exc: 283 256 if exc.args[0] == 'NOT STORED': … … 312 285 if self.global_index: 313 286 # TODO: recalculate if primary_keys changed 314 ci = self.client.get(self. _index_key(cls, {})) or set()287 ci = self.client.get(self.indexsets[cls].key({})) or [] 315 288 for id in ci: 316 289 key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) … … 325 298 if self.global_index: 326 299 clsname = cls.__name__ 327 ci = self.client.get(self. _index_key(cls,{}))300 ci = self.client.get(self.indexsets[cls].key({})) 328 301 329 302 if not ci: … … 351 324 352 325 if self.global_index: 353 ci = self.client.get(self. _index_key(cls, {})) or set()326 ci = self.client.get(self.indexsets[cls].key({})) or [] 354 327 for id in ci: 355 328 key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) … … 371 344 372 345 if self.global_index: 373 ci = self.client.get(self. _index_key(cls, {})) or set()346 ci = self.client.get(self.indexsets[cls].key({})) or [] 374 347 for id in ci: 375 348 key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) … … 386 359 def cachelen(self, cls): 387 360 if self.global_index: 388 return len(self.client.get(self. _index_key(cls,{})))361 return len(self.client.get(self.indexsets[cls].key({}))) 389 362 else: 390 363 return 0 … … 393 366 units = [] 394 367 if self.global_index: 395 for key in self.client.get(self. _index_key(cls,{})):368 for key in self.client.get(self.indexsets[cls].key({})): 396 369 unit = self.client.get(key) 397 370 if unit is not None: … … 404 377 405 378 if self.global_index: 379 gi_key = self.indexsets[cls].key({}) 406 380 # Delete all units in the global index. 407 for id in self.client.get( self._index_key(cls, {})) or set():381 for id in self.client.get(gi_key) or []: 408 382 key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 409 383 self.client.delete(key) 410 384 411 385 # Delete the global index. 412 self.client.delete( self._index_key(cls, {}))386 self.client.delete(gi_key) 413 387 # TODO: 414 388 # else: 415 389 # self.increment_generation(cls) 416 417 418 # Indexing #419 390 420 391 def register(self, cls): … … 425 396 426 397 # Add indices based on the .index attribute of each UnitProperty. 427 self.ind ices[cls] = i = set()398 self.indexsets[cls] = i = IndexSet(self, cls) 428 399 for propname in cls.properties: 429 400 prop = getattr(cls, propname) … … 431 402 # No need for an index on the primary key; 432 403 # we can just fetch each one directly by cache key. 433 if not prop.key:434 i.add ((prop,))404 if propname not in cls.identifiers: 405 i.add_index(propname) 435 406 436 407 # Add an index with no propnames. This is a special 437 408 # sentinel value for the global index that keeps us DRY. 438 409 if self.global_index: 439 i.add (())410 i.add_index() 440 411 441 412 storage.StorageManager.register(self, cls) 442 443 def _index_key(self, cls, filters):444 """Return the cache key for the index of the given class and filters.445 446 If filters is an empty dict, the 'global index' key is returned.447 """448 criteria = ["%s=%s" % (k, str(v).replace(" ", "+"))449 for k, v in filters.iteritems()]450 return '%s:%s:index(%s)' % (self.name, cls.__name__,451 ",".join(criteria))452 453 def get_index(self, cls, filters):454 """Return a cached list of unit ids which match the given filters dict.455 456 The ids returned will be a list of tuples of the form:457 tuple([getattr(unit, name) for name in self.primary_keys[cls]])458 459 In general, callers should use get_index, put_index, and scan_index460 together:461 462 ids = get_index(cls, ...)463 if ids is None:464 ids = expensive_lookup(cls, ...)465 put_index(cls, ...)466 items = scan_index(cls, ids)467 misses = [k for k in ids if k not in items]468 """469 cache_key = self._index_key(cls, filters)470 ids = self.client.get(cache_key)471 if self.logflags & logflags.IO:472 if ids is None:473 idlen = None474 else:475 idlen = len(ids)476 self.log(logflags.IO.message("INDEX GET (%s) len %r" %477 (cache_key, idlen)))478 return ids479 480 def put_index(self, cls, filters, ids, time=0):481 """Cache a list of unit identifiers which match the given filters dict.482 483 The ids provided MUST be a list of tuples of the form:484 tuple([getattr(unit, name) for name in self.primary_keys[cls]])485 """486 cache_key = self._index_key(cls, filters)487 if self.logflags & logflags.IO:488 self.log(logflags.IO.message("INDEX PUT (%s) len %r" %489 (cache_key, len(ids))))490 self.client.set(cache_key, ids, time=time)491 492 def scan_index(self, cls, ids):493 """Return a dict of multiple units from the given set of ids.494 495 The ids provided MUST be a list of tuples of the form:496 tuple([getattr(unit, name) for name in self.primary_keys[cls]])497 498 The returned dict will not contain entries for missed ids.499 """500 if ids:501 clsname = cls.__name__502 keys = ["%s:%s:%s" % (self.name, clsname, self.hash(id))503 for id in ids]504 data = self.client.get_multi(keys)505 506 # Transform the dict back to id keys instead of cache keys.507 units = {}508 for i, k in zip(ids, keys):509 unit = data.get(k, None)510 if unit is not None:511 units[i] = unit512 else:513 units = {}514 515 if self.logflags & logflags.IO:516 self.log(logflags.IO.message("INDEX SCAN %s (%r hits of %r)" %517 (cls.__name__, len(units), len(ids))))518 return units519 413 520 414 def _unit_by_primary_key(self, cls, keys, filters): … … 548 442 return None 549 443 550 def _unit_from_index(self, cls, keys, filters): 551 """Scan the (cls, keys) index for a unit which matches the filters dict. 552 553 The filters argument must contain an entry for each key in the 554 given list of keys, although it may and often should contain 555 additional entries. 556 """ 557 if set(filters.keys()) > set(keys): 558 for unit in self._xrecall_from_index(cls, keys, filters): 444 def scan(self, mainstore, cls, filters, order): 445 """Return units from a cached index, if possible.""" 446 indexset = self.indexsets[cls] 447 keyattrs = self.primary_keys[cls] 448 449 # Get a cached list of identifier-tuples, ordered if requested. 450 # TODO: add order to the idkey. 451 for index in indexset: 452 if set(filters.keys()) >= set(index): 453 indexcriteria = dict([(k, filters[k]) for k in index]) 454 break 455 else: 456 raise ValueError("The given filters %r are not indexed for %r." % 457 (tuple(filters.keys()), cls.__name__)) 458 459 ids = indexset.get(indexcriteria) 460 if ids is None: 461 # Not in the cache. Grab the list of id-tuples from nextstore. 462 ids = mainstore.view((cls, keyattrs, filters), order=order) 463 # Then cache the list result for next time. 464 indexset.put(indexcriteria, ids, time=self.index_time) 465 466 # Query the cache for multiple units (by id). 467 units = indexset.scan(ids) 468 print units 469 misses = [k for k in ids if k not in units] 470 print misses 471 else: 472 # Query the cache for multiple units (by id). 473 units = indexset.scan(ids) 474 475 # Remove any idents from the index node that no longer 476 # satisfy the index criteria. This is how we update 477 # index nodes--eager adds but late discards. 478 removals = False 479 for id, unit in units.items(): 480 for key, value in filters.iteritems(): 481 if getattr(unit, key) != value: 482 removals = True 483 del units[id] 484 ids.remove(id) 485 break 486 if removals: 487 indexset.put(indexcriteria, ids, time=self.index_time) 488 489 misses = [k for k in ids if k not in units] 490 491 # Now query the nextstore for any units that the cache missed... 492 if self.index_stride: 493 # ...in chunks of length: self.index_stride. 494 for step in xrange(0, len(misses), self.index_stride): 495 # TODO: allow for multiple identifiers 496 misstep = zip(*misses[step:step + self.index_stride])[0] 497 f = lambda x: getattr(x, keyattrs[0]) in misstep 498 print f 499 for unit in mainstore.recall(cls, f): 500 units[tuple([getattr(unit, a) for a in keyattrs])] = unit 501 elif misses: 502 # ...or all in one chunk if desired. 503 # TODO: allow for multiple identifiers 504 misstep = zip(*misses)[0] 505 f = lambda x: getattr(x, keyattrs[0]) in misstep 506 print f 507 for unit in mainstore.recall(cls, f): 508 units[tuple([getattr(unit, a) for a in keyattrs])] = unit 509 510 # Preserve order 511 for k in ids: 512 yield units[k] 513 514 515 class IndexSet(object): 516 """A set of indices for a single class. 517 518 Each index covers a tuple of unit attributes. 519 520 Each leaf node of each index is stored in memcached under its own key; 521 each value is a list of tuple([unit.k for k in primary_keys[cls]]). 522 For example, given an index over ("age", ), each distinct recall 523 operation will produce its own index node: 524 525 recall(Person, {age: 31}) -> ns:Person:index(age=31) = [(1132, 663)] 526 recall(Person, {age: 25}) -> ns:Person:index(age=25) = [(12, 34, 22)] 527 recall(Person, {age: 64}) -> ns:Person:index(age=64) = [(7, 17, 27)] 528 529 In general, callers should use get, put, and scan together: 530 531 ids = index.get(filters) 532 if ids is None: 533 ids = expensive_lookup(cls, filters) 534 index.put(filters, ids) 535 units = index.scan(ids) 536 misses = [k for k in ids if k not in units] 537 """ 538 539 def __init__(self, store, cls): 540 self.store = store 541 self.cls = cls 542 self._key_template = '%s:%s:index(%%s)' % (store.name, cls.__name__) 543 self._indices = [] 544 545 def add_index(self, *attributes): 546 """Add an index over the given attributes.""" 547 # Sort them from most-specific (most properties) to least. 548 if attributes not in self._indices: 549 self._indices.append(attributes) 550 self._indices.sort(lambda x, y: cmp(len(y), len(x))) 551 552 def __iter__(self): 553 return iter(self._indices) 554 555 def key(self, filters): 556 """Return the cache key for the given filters. 557 558 If filters is an empty dict, the 'global index' key is returned. 559 """ 560 criteria = ["%s=%s" % (k, str(v).replace(" ", "+")) 561 for k, v in filters.iteritems()] 562 return self._key_template % ",".join(criteria) 563 564 def get(self, filters): 565 """Return a cached list of unit ids which match the given filters dict. 566 567 The ids returned will be a list of tuples of the form: 568 tuple([getattr(unit, name) for name in primary_keys[cls]]) 569 """ 570 cache_key = self.key(filters) 571 ids = self.store.client.get(cache_key) 572 if self.store.logflags & logflags.IO: 573 if ids is None: 574 idlen = None 575 else: 576 idlen = len(ids) 577 self.store.log(logflags.IO.message("INDEX GET (%s) len %r" % 578 (cache_key, idlen))) 579 return ids 580 581 def put(self, filters, ids, time=0): 582 """Cache a list of unit identifiers which match the given filters dict. 583 584 The ids provided MUST be a list of tuples of the form: 585 tuple([getattr(unit, name) for name in primary_keys[cls]]) 586 """ 587 cache_key = self.key(filters) 588 if self.store.logflags & logflags.IO: 589 self.store.log(logflags.IO.message("INDEX PUT (%s) len %r: %r" % 590 (cache_key, len(ids), ids))) 591 self.store.client.set(cache_key, ids, time=time) 592 593 def scan(self, ids): 594 """Return a dict of multiple units from the given list of ids. 595 596 The ids provided MUST be a list of tuples of the form: 597 tuple([getattr(unit, name) for name in primary_keys[cls]]) 598 599 The returned dict will not contain entries for missed ids. 600 """ 601 clsname = self.cls.__name__ 602 if ids: 603 keys = ["%s:%s:%s" % (self.store.name, clsname, self.store.hash(id)) 604 for id in ids] 605 data = self.store.client.get_multi(keys) 606 607 # Transform the dict back to id keys instead of cache keys. 608 units = {} 609 for i, k in zip(ids, keys): 610 unit = data.get(k, None) 611 if unit is not None: 612 units[i] = unit 613 else: 614 units = {} 615 616 if self.store.logflags & logflags.IO: 617 self.store.log(logflags.IO.message("INDEX SCAN %s (%r hits of %r)" % 618 (clsname, len(units), len(ids)))) 619 return units 620 621 def unit(self, index, filters): 622 """Return a unit from the index which matches the filters dict (or None). 623 624 The filters argument must contain an entry for each key in the given 625 index, although it may and often should contain additional entries. 626 """ 627 if set(filters.keys()) > set(index): 628 for unit in self.xrecall(index, filters): 559 629 return unit 560 630 else: 561 clsname = cls.__name__631 clsname = self.cls.__name__ 562 632 # If the filters and index keys are equal, it should be faster 563 633 # to perform single gets against memcached, rather than the 564 # get_multi calls that _xrecall_from_indexperforms.565 ids = self.get _index(cls, dict([(k, filters[k]) for k in keys]))634 # get_multi calls that self.xrecall performs. 635 ids = self.get(dict([(k, filters[k]) for k in index])) 566 636 if ids: 567 637 for id in ids: 568 cache_key = "%s:%s:%s" % (self. name, clsname, self.hash(id))638 cache_key = "%s:%s:%s" % (self.store.name, clsname, self.hash(id)) 569 639 unit = self.client.get(cache_key) 570 640 if unit is None: 571 if self. logflags & logflags.IO:572 self. log(logflags.IO.message('INDEX MISS (%s) %s' %573 (cache_key, filters)))641 if self.store.logflags & logflags.IO: 642 self.store.log(logflags.IO.message( 643 'INDEX MISS (%s) %s' % (cache_key, filters))) 574 644 else: 575 if self. logflags & logflags.IO:576 self. log(logflags.IO.message('INDEX HIT (%s) %s' %577 (cache_key, filters)))645 if self.store.logflags & logflags.IO: 646 self.store.log(logflags.IO.message( 647 'INDEX HIT (%s) %s' % (cache_key, filters))) 578 648 unit.cleanse() 579 649 return unit 580 650 else: 581 if self. logflags & logflags.IO:582 self. log(logflags.IO.message('INDEX EMPTY (%s) %s' %583 (clsname, filters)))651 if self.store.logflags & logflags.IO: 652 self.store.log(logflags.IO.message( 653 'INDEX EMPTY (%s) %s' % (clsname, filters))) 584 654 return None 585 655 586 def _xrecall_from_index(self, cls, keys, filters):587 """Yield units from the (cls, keys)index which match the filters dict.588 589 The filters argument must contain an entry for each key in the 590 given list of keys, although it may and often should contain591 additional entries.592 """593 partial_index = set(filters.keys()) > set(keys)594 ids = self.get _index(cls, dict([(k, filters[k]) for k in keys]))656 def xrecall(self, index, filters): 657 """Yield units from the given index which match the filters dict. 658 659 The filters argument must contain an entry for each key in the given 660 index, although it may and often should contain additional entries. 661 """ 662 partial_index = set(filters.keys()) > set(index) 663 indexcriteria = dict([(k, filters[k]) for k in index]) 664 ids = self.get(indexcriteria) 595 665 if ids: 596 units = self.scan_index(cls, ids) 666 units = self.scan(ids) 667 668 removals = False 597 669 # Preserve order by iterating over the retrieved ids 598 670 # instead of the retrieved units. … … 600 672 unit = units.get(id, None) 601 673 if unit is not None: 602 matching = True 603 604 if partial_index: 605 # Filter in full now. 606 for k, v in filters.iteritems(): 607 if getattr(unit, k) != v: 608 matching = False 609 break 610 611 if matching: 674 for k, v in filters.iteritems(): 675 if getattr(unit, k) != v: 676 if k in index: 677 removals = True 678 del units[id] 679 ids.remove(id) 680 break 681 else: 612 682 unit.cleanse() 613 683 yield unit 614 684 685 if removals: 686 # Remove any idents from the index node that no longer 687 # satisfy the index criteria. This is how we update 688 # index nodes--eager adds but late discards. 689 self.put(indexcriteria, ids, time=self.store.index_time) 690 691 def add(self, unit): 692 """Add the given unit to all indices.""" 693 ident = tuple([getattr(unit, name) 694 for name in self.store.primary_keys[self.cls]]) 695 for index in self._indices: 696 indexcriteria = dict([(k, getattr(unit, k)) for k in index]) 697 indexnode = self.get(indexcriteria) or [] 698 if ident not in indexnode: 699 indexnode.append(ident) 700 self.put(indexcriteria, indexnode) 701 702 def discard(self, unit): 703 """Discard the given unit from all indices.""" 704 ident = tuple([getattr(unit, name) 705 for name in self.store.primary_keys[self.cls]]) 706 for index in self._indices: 707 indexcriteria = dict([(k, getattr(unit, k)) for k in index]) 708 indexnode = self.get(indexcriteria) or [] 709 if ident in indexnode: 710 indexnode.remove(ident) 711 self.put(indexcriteria, indexnode) 712
