Changeset 574
- Timestamp:
- 11/05/07 17:32:36
- Files:
-
- branches/crazycache/dejavu/storage/caching.py (modified) (7 diffs)
- branches/crazycache/dejavu/storage/storememcached.py (modified) (22 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/crazycache/dejavu/storage/caching.py
r572 r574 5 5 from dejavu import logic, logflags, recur 6 6 from dejavu.storage import ProxyStorage, resolve 7 8 9 simple_attr_lookup = logic.Expression(lambda x: x.Thing == 4).func.func_code.co_code10 7 11 8 … … 30 27 an instance of storeram.RAMStorage. 31 28 32 fullquery: if True, run recall queries against the cache before33 checking storage. When using key-value caches (like memcached),34 this can be slow and should be turned off. If False (the default),35 recall will still place recalled units into the cache.36 37 29 fulljoin: if True, perform recalls involving multiple classes using 38 30 the cache. This can be quite slow when the involved classes are … … 44 36 ProxyStorage.__init__(self, allOptions) 45 37 46 self.fullquery = allOptions.get("fullquery", False)47 38 self.fulljoin = allOptions.get("fulljoin", False) 48 39 self.cache_recalls = [] … … 82 73 83 74 cls = classes 75 84 76 # Units which have no identifiers are not cached 85 77 if not cls.identifiers: … … 100 92 raise TypeError("Order argument expected when offset is provided.") 101 93 94 # Try to retrieve units using a cached index. 102 95 if cls.identifiers and cls in self.cache.classes: 103 fc = expr.func.func_code 104 compkeys = fc.co_names[1:] 105 # TODO: allow multiple keys (as long as one of them is indexed). 106 if (fc.co_code == simple_attr_lookup and len(compkeys) == 1 107 # Hmmmmm. Should we allow on-the-fly indices? 108 # Something bugs me about that idea, but I don't know what. 109 and getattr(cls, compkeys[0]).index 110 ): 111 filters = {compkeys[0]: fc.co_consts[1]} 112 try: 113 units = self.cache.scan(self.nextstore, cls, filters, order) 114 except ValueError: 115 pass 116 else: 117 for unit in units: 118 yield unit 119 return 120 121 seen = {} 122 if not order: 123 # If an order is supplied, there's no point in running the 124 # query against our cache (because we'd have to interleave 125 # the results with those from storage anyway). In fact, the 126 # only reason to hit the cache at all here is to either hit 127 # the limit or allow the caller to stop iterating before 128 # reaching the DB. 129 if self.fullquery: 130 # Query the cache. 131 for unit in self.cache.xrecall(cls, expr, limit=limit): 132 seen[unit.identity()] = None 96 units = self.cache.scan(self.nextstore, cls, expr, order) 97 if units is not None: 98 for unit in units: 133 99 yield unit 134 limit = limit - len(keys)100 return 135 101 136 102 # Query storage. … … 138 104 for unit in self.nextstore.xrecall(cls, expr, order=order, 139 105 limit=limit, offset=offset): 140 id = unit.identity() 141 # Don't offer up a unit we already yielded from the cache. 142 if id not in seen: 143 try: 144 self.cache.save(unit, forceSave=True) 145 except KeyError: 146 # The cache refused to save the unit (possibly full). 147 pass 148 seen[id] = None 149 yield unit 106 try: 107 self.cache.save(unit, forceSave=True) 108 except KeyError: 109 # The cache refused to save the unit (possibly full). 110 pass 111 yield unit 150 112 else: 151 113 for unit in self.nextstore.xrecall(cls, expr, order=order, … … 170 132 classes, expr, order=order, limit=limit, offset=offset): 171 133 for i, unit in enumerate(unitrow): 172 ident = unit.identity() 173 if not unit.sequencer.valid_id(ident): 174 # This is a 'dummy unit' from an outer join. 175 continue 176 if ident not in seen[i]: 177 try: 178 self.cache.save(unit, forceSave=True) 179 except KeyError: 180 # The cache refused to save the unit (possibly full). 181 pass 134 if unit.__class__ in self.cache.classes: 135 ident = unit.identity() 136 if not unit.sequencer.valid_id(ident): 137 # This is a 'dummy unit' from an outer join. 138 continue 139 140 if ident not in seen[i]: 141 try: 142 self.cache.save(unit, forceSave=True) 143 except KeyError: 144 # The cache refused to save the unit (possibly full). 145 pass 182 146 seen[i][ident] = None 183 147 yield unitrow branches/crazycache/dejavu/storage/storememcached.py
r573 r574 1 1 import md5 2 2 import memcache 3 import re 4 import sys 3 5 4 6 try: … … 12 14 13 15 14 simple_attr_lookup = logic.Expression(lambda x: x.Thing == 4 15 ).func.func_code.co_code 16 def bytecode_regex(bits): 17 """Make a regular expression out of the given mixed bytecode bits. 18 19 If any bit is an integer, it will be replaced with re.escape(chr(bit)). 20 Any bits which are already strings will be added as-is. 21 """ 22 s = [] 23 for bit in bits: 24 if not isinstance(bit, basestring): 25 bit = re.escape(chr(bit)) 26 s.append(bit) 27 return "".join(s) 28 29 simple_compare = bytecode_regex([124, 0, 0, 105, ".", ".", 100, ".", ".", 106, 2, 0]) 30 simple_and = bytecode_regex([111, ".", ".", 1]) 31 indexable_regex = re.compile("^(%s(%s)?)+S$" % (simple_compare, simple_and)) 16 32 17 33 … … 117 133 indexset = self.indexsets[cls] 118 134 119 if expr: 120 if not isinstance(expr, logic.Expression): 121 expr = logic.Expression(expr) 122 fc = expr.func.func_code 123 compkeys = fc.co_names[1:] 124 # TODO: allow multiple filter keys. 125 if (fc.co_code == simple_attr_lookup and len(compkeys) == 1): 126 compvals = fc.co_consts[1:] 127 filters = dict([(k, v) for k, v in zip(compkeys, compvals)]) 128 129 # Try to retrieve a matching unit using its primary_keys. 130 # This will skip grabbing any indices (a HUGE optimization). 131 pk = self.primary_keys[cls] 132 if set(compkeys) >= set(pk): 133 yield self._unit_by_primary_key(cls, pk, filters) 134 return 135 136 # Try to retrieve matching units using an index. 137 # If self.global_index is True, the last one should 138 # be an index with propnames == []. See self.register. 139 for index in indexset: 140 if set(compkeys) >= set(index): 141 data = indexset.xrecall(index, filters) 142 data = self._xrecall_inner(data, expr) 143 for unit in self._paginate(data, order, limit, offset, single=True): 144 yield unit 145 return 146 147 if self.global_index: 148 data = indexset.xrecall([], {}) 149 data = self._xrecall_inner(data, expr) 150 for unit in self._paginate(data, order, limit, offset, single=True): 151 yield unit 152 else: 153 # Yield nothing since we have no access paths. 154 pass 135 if not isinstance(expr, logic.Expression): 136 expr = logic.Expression(expr) 137 if self.logflags & logflags.RECALL: 138 self.log(logflags.RECALL.message(cls, expr)) 139 140 if limit == 0: 141 return 142 143 if offset and not order: 144 raise TypeError("Order argument expected when offset is provided.") 145 146 filters = self.extract_filters(expr) 147 148 # Try to retrieve a single matching unit using its primary_keys. 149 # This will skip grabbing any indices (a HUGE optimization). 150 pk = self.primary_keys[cls] 151 if set(filters.keys()) >= set(pk): 152 yield self._unit_by_primary_key(cls, pk, filters) 153 return 154 155 # Try to retrieve matching units using an index. 156 # If self.global_index is True, the last one should 157 # be an index with propnames == []. See self.register. 158 for index in indexset: 159 if set(filters.keys()) >= set(index): 160 data = indexset.xrecall(index, filters) 161 data = self._xrecall_inner(data, expr) 162 for unit in self._paginate(data, order, limit, offset, single=True): 163 yield unit 164 return 155 165 156 166 def _xrecall_inner(self, units, expr=None): … … 188 198 189 199 if not unit.sequencer.valid_id(unit.identity()): 190 if self.global_index:200 if () in indexset.indices: 191 201 # Try to generate an identifier by looking 192 202 # up all units in the global index. … … 250 260 self.log(logflags.DDL.message("create storage %s" % cls)) 251 261 252 if self.global_index: 262 indexset = self.indexsets[cls] 263 if () in indexset.indices: 253 264 try: 254 self.client.add( self.indexsets[cls].key({}), [])265 self.client.add(indexset.key({}), []) 255 266 except IOError, exc: 256 267 if exc.args[0] == 'NOT STORED': … … 283 294 (clsname, name))) 284 295 285 if self.global_index: 296 indexset = self.indexsets[cls] 297 if () in indexset.indices: 286 298 # TODO: recalculate if primary_keys changed 287 ci = self.client.get( self.indexsets[cls].key({})) or []299 ci = self.client.get(indexset.key({})) or [] 288 300 for id in ci: 289 301 key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) … … 296 308 def has_property(self, cls, name): 297 309 """If storage structures exist for the given property, return True.""" 298 if self.global_index: 310 indexset = self.indexsets[cls] 311 if () in indexset.indices: 299 312 clsname = cls.__name__ 300 ci = self.client.get( self.indexsets[cls].key({}))313 ci = self.client.get(indexset.key({})) 301 314 302 315 if not ci: … … 323 336 (clsname, name))) 324 337 325 if self.global_index: 326 ci = self.client.get(self.indexsets[cls].key({})) or [] 338 indexset = self.indexsets[cls] 339 if () in indexset.indices: 340 ci = self.client.get(indexset.key({})) or [] 327 341 for id in ci: 328 342 key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) … … 343 357 % (cls, oldname, newname))) 344 358 345 if self.global_index: 346 ci = self.client.get(self.indexsets[cls].key({})) or [] 359 indexset = self.indexsets[cls] 360 if () in indexset.indices: 361 ci = self.client.get(indexset.key({})) or [] 347 362 for id in ci: 348 363 key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) … … 358 373 359 374 def cachelen(self, cls): 360 if self.global_index: 361 return len(self.client.get(self.indexsets[cls].key({}))) 375 indexset = self.indexsets[cls] 376 if () in indexset.indices: 377 return len(self.client.get(indexset.key({}))) 362 378 else: 363 379 return 0 … … 365 381 def cached_units(self, cls): 366 382 units = [] 367 if self.global_index: 368 for key in self.client.get(self.indexsets[cls].key({})): 383 indexset = self.indexsets[cls] 384 if () in indexset.indices: 385 for key in self.client.get(indexset.key({})): 369 386 unit = self.client.get(key) 370 387 if unit is not None: … … 376 393 clsname = cls.__name__ 377 394 378 if self.global_index: 379 gi_key = self.indexsets[cls].key({}) 395 indexset = self.indexsets[cls] 396 if () in indexset.indices: 397 gi_key = indexset.key({}) 380 398 # Delete all units in the global index. 381 399 for id in self.client.get(gi_key) or []: … … 400 418 prop = getattr(cls, propname) 401 419 if prop.index: 402 # No need for an index on the primary key;420 # There's usually no need for an index on the primary key; 403 421 # we can just fetch each one directly by cache key. 422 # Callers are free to add one in explicitly if needed, 423 # for example, if wanting to retrive all units without 424 # any filtering criteria. 404 425 if propname not in cls.identifiers: 405 426 i.add_index(propname) … … 442 463 return None 443 464 444 def scan(self, mainstore, cls, filters, order): 445 """Return units from a cached index, if possible.""" 465 def extract_filters(self, expr): 466 """Return a dict of (key == value) pairs from the given expr. 467 468 If the given Expression contains operators other than ==, or if a 469 set of filters cannot be obtained for some other reason, returns {}. 470 In theory, we should be able to ignore other operators but the 471 simple regex we use isn't that smart; we'd have to do a full parse 472 of the expr and then functionally decompose it. 473 474 This function is only designed to work on Expressions for a single 475 class (i.e. - no joins). 476 """ 477 if expr.is_constant(True): 478 return {} 479 480 fc = expr.func.func_code 481 if indexable_regex.match(fc.co_code): 482 if sys.version_info >= (2, 5): 483 # Python 2.5 stopped including args in co_names. 484 compkeys = fc.co_names 485 else: 486 # The first co_names will be the positional args for the class. 487 compkeys = fc.co_names[fc.co_argcount:] 488 489 # "If a code object represents a function, the first item 490 # in co_consts is the documentation string of the function, 491 # or None if undefined." 492 compvals = fc.co_consts[1:] 493 494 return dict(zip(compkeys, compvals)) 495 496 return {} 497 498 def scan(self, mainstore, cls, expr, order): 499 """Return an ordered list of units from a cached index (or None). 500 501 The class and expression will be used to find a cached index; 502 if not found, the mainstore will be used to create one, and it 503 will be cached. 504 505 Once an index has been obtained, it will be iterated over against 506 the cache. Each unit in the index which is not available in the 507 cache will be pulled from mainstore. 508 509 If no index intersects with the given expression, None is returned. 510 """ 511 filters = self.extract_filters(expr) 446 512 indexset = self.indexsets[cls] 447 513 keyattrs = self.primary_keys[cls] … … 454 520 break 455 521 else: 456 raise ValueError("The given filters %r are not indexed for %r." %457 (tuple(filters.keys()), cls.__name__))522 # Signal the caller that no index scan was performed. 523 return None 458 524 459 525 ids = indexset.get(indexcriteria) … … 466 532 # Query the cache for multiple units (by id). 467 533 units = indexset.scan(ids) 468 print units469 534 misses = [k for k in ids if k not in units] 470 print misses471 535 else: 472 536 # Query the cache for multiple units (by id). … … 496 560 misstep = zip(*misses[step:step + self.index_stride])[0] 497 561 f = lambda x: getattr(x, keyattrs[0]) in misstep 498 print f499 562 for unit in mainstore.recall(cls, f): 500 563 units[tuple([getattr(unit, a) for a in keyattrs])] = unit 564 try: 565 self.save(unit, forceSave=True) 566 except KeyError: 567 # The cache refused to save the unit (possibly full). 568 pass 501 569 elif misses: 502 570 # ...or all in one chunk if desired. … … 504 572 misstep = zip(*misses)[0] 505 573 f = lambda x: getattr(x, keyattrs[0]) in misstep 506 print f507 574 for unit in mainstore.recall(cls, f): 508 575 units[tuple([getattr(unit, a) for a in keyattrs])] = unit 576 try: 577 self.save(unit, forceSave=True) 578 except KeyError: 579 # The cache refused to save the unit (possibly full). 580 pass 509 581 510 582 # Preserve order 511 for k in ids: 512 yield units[k] 583 return [units[k] for k in ids] 513 584 514 585 … … 526 597 recall(Person, {age: 25}) -> ns:Person:index(age=25) = [(12, 34, 22)] 527 598 recall(Person, {age: 64}) -> ns:Person:index(age=64) = [(7, 17, 27)] 528 529 In general, callers should use get, put, and scan together:530 531 ids = index.get(filters)532 if ids is None:533 ids = expensive_lookup(cls, filters)534 index.put(filters, ids)535 units = index.scan(ids)536 misses = [k for k in ids if k not in units]537 599 """ 538 600 … … 633 695 # to perform single gets against memcached, rather than the 634 696 # get_multi calls that self.xrecall performs. 635 ids = self.get(dict([(k, filters[k]) for k in index])) 697 indexcriteria = dict([(k, filters[k]) for k in index]) 698 ids = self.get(indexcriteria) 636 699 if ids: 637 700 for id in ids: … … 698 761 if ident not in indexnode: 699 762 indexnode.append(ident) 700 self.put(indexcriteria, indexnode)763 self.put(indexcriteria, indexnode) 701 764 702 765 def discard(self, unit): … … 709 772 if ident in indexnode: 710 773 indexnode.remove(ident) 711 self.put(indexcriteria, indexnode)712 774 self.put(indexcriteria, indexnode) 775
