Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

Changeset 574

Show
Ignore:
Timestamp:
11/05/07 17:32:36
Author:
fumanchu
Message:

Crazycache: Mostly working; gotta fix ordering now. Removed ObjectCache?.fullquery.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/crazycache/dejavu/storage/caching.py

    r572 r574  
    55from dejavu import logic, logflags, recur 
    66from dejavu.storage import ProxyStorage, resolve 
    7  
    8  
    9 simple_attr_lookup = logic.Expression(lambda x: x.Thing == 4).func.func_code.co_code 
    107 
    118 
     
    3027            an instance of storeram.RAMStorage. 
    3128         
    32         fullquery: if True, run recall queries against the cache before 
    33             checking storage. When using key-value caches (like memcached), 
    34             this can be slow and should be turned off. If False (the default), 
    35             recall will still place recalled units into the cache. 
    36          
    3729        fulljoin: if True, perform recalls involving multiple classes using 
    3830            the cache. This can be quite slow when the involved classes are 
     
    4436        ProxyStorage.__init__(self, allOptions) 
    4537         
    46         self.fullquery = allOptions.get("fullquery", False) 
    4738        self.fulljoin = allOptions.get("fulljoin", False) 
    4839        self.cache_recalls = [] 
     
    8273         
    8374        cls = classes 
     75         
    8476        # Units which have no identifiers are not cached 
    8577        if not cls.identifiers: 
     
    10092            raise TypeError("Order argument expected when offset is provided.") 
    10193         
     94        # Try to retrieve units using a cached index. 
    10295        if cls.identifiers and cls in self.cache.classes: 
    103             fc = expr.func.func_code 
    104             compkeys = fc.co_names[1:] 
    105             # TODO: allow multiple keys (as long as one of them is indexed). 
    106             if (fc.co_code == simple_attr_lookup and len(compkeys) == 1 
    107                     # Hmmmmm. Should we allow on-the-fly indices? 
    108                     # Something bugs me about that idea, but I don't know what. 
    109                     and getattr(cls, compkeys[0]).index 
    110                     ): 
    111                 filters = {compkeys[0]: fc.co_consts[1]} 
    112                 try: 
    113                     units = self.cache.scan(self.nextstore, cls, filters, order) 
    114                 except ValueError: 
    115                     pass 
    116                 else: 
    117                     for unit in units: 
    118                         yield unit 
    119                     return 
    120          
    121         seen = {} 
    122         if not order: 
    123             # If an order is supplied, there's no point in running the 
    124             # query against our cache (because we'd have to interleave 
    125             # the results with those from storage anyway). In fact, the 
    126             # only reason to hit the cache at all here is to either hit 
    127             # the limit or allow the caller to stop iterating before 
    128             # reaching the DB. 
    129             if self.fullquery: 
    130                 # Query the cache. 
    131                 for unit in self.cache.xrecall(cls, expr, limit=limit): 
    132                     seen[unit.identity()] = None 
     96            units = self.cache.scan(self.nextstore, cls, expr, order) 
     97            if units is not None: 
     98                for unit in units: 
    13399                    yield unit 
    134                 limit = limit - len(keys) 
     100                return 
    135101         
    136102        # Query storage. 
     
    138104            for unit in self.nextstore.xrecall(cls, expr, order=order, 
    139105                                               limit=limit, offset=offset): 
    140                 id = unit.identity() 
    141                 # Don't offer up a unit we already yielded from the cache. 
    142                 if id not in seen: 
    143                     try: 
    144                         self.cache.save(unit, forceSave=True) 
    145                     except KeyError: 
    146                         # The cache refused to save the unit (possibly full). 
    147                         pass 
    148                     seen[id] = None 
    149                     yield unit 
     106                try: 
     107                    self.cache.save(unit, forceSave=True) 
     108                except KeyError: 
     109                    # The cache refused to save the unit (possibly full). 
     110                    pass 
     111                yield unit 
    150112        else: 
    151113            for unit in self.nextstore.xrecall(cls, expr, order=order, 
     
    170132                classes, expr, order=order, limit=limit, offset=offset): 
    171133                for i, unit in enumerate(unitrow): 
    172                     ident = unit.identity() 
    173                     if not unit.sequencer.valid_id(ident): 
    174                         # This is a 'dummy unit' from an outer join. 
    175                         continue 
    176                     if ident not in seen[i]: 
    177                         try: 
    178                             self.cache.save(unit, forceSave=True) 
    179                         except KeyError: 
    180                             # The cache refused to save the unit (possibly full). 
    181                             pass 
     134                    if unit.__class__ in self.cache.classes: 
     135                        ident = unit.identity() 
     136                        if not unit.sequencer.valid_id(ident): 
     137                            # This is a 'dummy unit' from an outer join. 
     138                            continue 
     139                         
     140                        if ident not in seen[i]: 
     141                            try: 
     142                                self.cache.save(unit, forceSave=True) 
     143                            except KeyError: 
     144                                # The cache refused to save the unit (possibly full). 
     145                                pass 
    182146                        seen[i][ident] = None 
    183147                yield unitrow 
  • branches/crazycache/dejavu/storage/storememcached.py

    r573 r574  
    11import md5 
    22import memcache 
     3import re 
     4import sys 
    35 
    46try: 
     
    1214 
    1315 
    14 simple_attr_lookup = logic.Expression(lambda x: x.Thing == 4 
    15                                       ).func.func_code.co_code 
     16def bytecode_regex(bits): 
     17    """Make a regular expression out of the given mixed bytecode bits. 
     18     
     19    If any bit is an integer, it will be replaced with re.escape(chr(bit)). 
     20    Any bits which are already strings will be added as-is. 
     21    """ 
     22    s = [] 
     23    for bit in bits: 
     24        if not isinstance(bit, basestring): 
     25            bit = re.escape(chr(bit)) 
     26        s.append(bit) 
     27    return "".join(s) 
     28 
     29simple_compare = bytecode_regex([124, 0, 0, 105, ".", ".", 100, ".", ".", 106, 2, 0]) 
     30simple_and = bytecode_regex([111, ".", ".", 1]) 
     31indexable_regex = re.compile("^(%s(%s)?)+S$" % (simple_compare, simple_and)) 
    1632 
    1733 
     
    117133        indexset = self.indexsets[cls] 
    118134         
    119         if expr: 
    120             if not isinstance(expr, logic.Expression): 
    121                 expr = logic.Expression(expr) 
    122             fc = expr.func.func_code 
    123             compkeys = fc.co_names[1:] 
    124             # TODO: allow multiple filter keys. 
    125             if (fc.co_code == simple_attr_lookup and len(compkeys) == 1): 
    126                 compvals = fc.co_consts[1:] 
    127                 filters = dict([(k, v) for k, v in zip(compkeys, compvals)]) 
    128                  
    129                 # Try to retrieve a matching unit using its primary_keys. 
    130                 # This will skip grabbing any indices (a HUGE optimization). 
    131                 pk = self.primary_keys[cls] 
    132                 if set(compkeys) >= set(pk): 
    133                     yield self._unit_by_primary_key(cls, pk, filters) 
    134                     return 
    135                  
    136                 # Try to retrieve matching units using an index. 
    137                 # If self.global_index is True, the last one should 
    138                 # be an index with propnames == []. See self.register. 
    139                 for index in indexset: 
    140                     if set(compkeys) >= set(index): 
    141                         data = indexset.xrecall(index, filters) 
    142                         data = self._xrecall_inner(data, expr) 
    143                         for unit in self._paginate(data, order, limit, offset, single=True): 
    144                             yield unit 
    145                         return 
    146          
    147         if self.global_index: 
    148             data = indexset.xrecall([], {}) 
    149             data = self._xrecall_inner(data, expr) 
    150             for unit in self._paginate(data, order, limit, offset, single=True): 
    151                 yield unit 
    152         else: 
    153             # Yield nothing since we have no access paths. 
    154             pass 
     135        if not isinstance(expr, logic.Expression): 
     136            expr = logic.Expression(expr) 
     137        if self.logflags & logflags.RECALL: 
     138            self.log(logflags.RECALL.message(cls, expr)) 
     139         
     140        if limit == 0: 
     141            return 
     142         
     143        if offset and not order: 
     144            raise TypeError("Order argument expected when offset is provided.") 
     145         
     146        filters = self.extract_filters(expr) 
     147         
     148        # Try to retrieve a single matching unit using its primary_keys. 
     149        # This will skip grabbing any indices (a HUGE optimization). 
     150        pk = self.primary_keys[cls] 
     151        if set(filters.keys()) >= set(pk): 
     152            yield self._unit_by_primary_key(cls, pk, filters) 
     153            return 
     154         
     155        # Try to retrieve matching units using an index. 
     156        # If self.global_index is True, the last one should 
     157        # be an index with propnames == []. See self.register. 
     158        for index in indexset: 
     159            if set(filters.keys()) >= set(index): 
     160                data = indexset.xrecall(index, filters) 
     161                data = self._xrecall_inner(data, expr) 
     162                for unit in self._paginate(data, order, limit, offset, single=True): 
     163                    yield unit 
     164                return 
    155165     
    156166    def _xrecall_inner(self, units, expr=None): 
     
    188198             
    189199            if not unit.sequencer.valid_id(unit.identity()): 
    190                 if self.global_index
     200                if () in indexset.indices
    191201                    # Try to generate an identifier by looking 
    192202                    # up all units in the global index. 
     
    250260            self.log(logflags.DDL.message("create storage %s" % cls)) 
    251261         
    252         if self.global_index: 
     262        indexset = self.indexsets[cls] 
     263        if () in indexset.indices: 
    253264            try: 
    254                 self.client.add(self.indexsets[cls].key({}), []) 
     265                self.client.add(indexset.key({}), []) 
    255266            except IOError, exc: 
    256267                if exc.args[0] == 'NOT STORED': 
     
    283294                                          (clsname, name))) 
    284295         
    285         if self.global_index: 
     296        indexset = self.indexsets[cls] 
     297        if () in indexset.indices: 
    286298            # TODO: recalculate if primary_keys changed 
    287             ci = self.client.get(self.indexsets[cls].key({})) or [] 
     299            ci = self.client.get(indexset.key({})) or [] 
    288300            for id in ci: 
    289301                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     
    296308    def has_property(self, cls, name): 
    297309        """If storage structures exist for the given property, return True.""" 
    298         if self.global_index: 
     310        indexset = self.indexsets[cls] 
     311        if () in indexset.indices: 
    299312            clsname = cls.__name__ 
    300             ci = self.client.get(self.indexsets[cls].key({})) 
     313            ci = self.client.get(indexset.key({})) 
    301314             
    302315            if not ci: 
     
    323336                                          (clsname, name))) 
    324337         
    325         if self.global_index: 
    326             ci = self.client.get(self.indexsets[cls].key({})) or [] 
     338        indexset = self.indexsets[cls] 
     339        if () in indexset.indices: 
     340            ci = self.client.get(indexset.key({})) or [] 
    327341            for id in ci: 
    328342                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     
    343357                                          % (cls, oldname, newname))) 
    344358         
    345         if self.global_index: 
    346             ci = self.client.get(self.indexsets[cls].key({})) or [] 
     359        indexset = self.indexsets[cls] 
     360        if () in indexset.indices: 
     361            ci = self.client.get(indexset.key({})) or [] 
    347362            for id in ci: 
    348363                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     
    358373     
    359374    def cachelen(self, cls): 
    360         if self.global_index: 
    361             return len(self.client.get(self.indexsets[cls].key({}))) 
     375        indexset = self.indexsets[cls] 
     376        if () in indexset.indices: 
     377            return len(self.client.get(indexset.key({}))) 
    362378        else: 
    363379            return 0 
     
    365381    def cached_units(self, cls): 
    366382        units = [] 
    367         if self.global_index: 
    368             for key in self.client.get(self.indexsets[cls].key({})): 
     383        indexset = self.indexsets[cls] 
     384        if () in indexset.indices: 
     385            for key in self.client.get(indexset.key({})): 
    369386                unit = self.client.get(key) 
    370387                if unit is not None: 
     
    376393        clsname = cls.__name__ 
    377394         
    378         if self.global_index: 
    379             gi_key = self.indexsets[cls].key({}) 
     395        indexset = self.indexsets[cls] 
     396        if () in indexset.indices: 
     397            gi_key = indexset.key({}) 
    380398            # Delete all units in the global index. 
    381399            for id in self.client.get(gi_key) or []: 
     
    400418            prop = getattr(cls, propname) 
    401419            if prop.index: 
    402                 # No need for an index on the primary key; 
     420                # There's usually no need for an index on the primary key; 
    403421                # we can just fetch each one directly by cache key. 
     422                # Callers are free to add one in explicitly if needed, 
     423                # for example, if wanting to retrive all units without 
     424                # any filtering criteria. 
    404425                if propname not in cls.identifiers: 
    405426                    i.add_index(propname) 
     
    442463        return None 
    443464     
    444     def scan(self, mainstore, cls, filters, order): 
    445         """Return units from a cached index, if possible.""" 
     465    def extract_filters(self, expr): 
     466        """Return a dict of (key == value) pairs from the given expr. 
     467         
     468        If the given Expression contains operators other than ==, or if a 
     469        set of filters cannot be obtained for some other reason, returns {}. 
     470        In theory, we should be able to ignore other operators but the 
     471        simple regex we use isn't that smart; we'd have to do a full parse 
     472        of the expr and then functionally decompose it. 
     473         
     474        This function is only designed to work on Expressions for a single 
     475        class (i.e. - no joins). 
     476        """ 
     477        if expr.is_constant(True): 
     478            return {} 
     479         
     480        fc = expr.func.func_code 
     481        if indexable_regex.match(fc.co_code): 
     482            if sys.version_info >= (2, 5): 
     483                # Python 2.5 stopped including args in co_names. 
     484                compkeys = fc.co_names 
     485            else: 
     486                # The first co_names will be the positional args for the class. 
     487                compkeys = fc.co_names[fc.co_argcount:] 
     488             
     489            # "If a code object represents a function, the first item 
     490            # in co_consts is the documentation string of the function, 
     491            # or None if undefined." 
     492            compvals = fc.co_consts[1:] 
     493             
     494            return dict(zip(compkeys, compvals)) 
     495         
     496        return {} 
     497     
     498    def scan(self, mainstore, cls, expr, order): 
     499        """Return an ordered list of units from a cached index (or None). 
     500         
     501        The class and expression will be used to find a cached index; 
     502        if not found, the mainstore will be used to create one, and it 
     503        will be cached. 
     504         
     505        Once an index has been obtained, it will be iterated over against 
     506        the cache. Each unit in the index which is not available in the 
     507        cache will be pulled from mainstore. 
     508         
     509        If no index intersects with the given expression, None is returned. 
     510        """ 
     511        filters = self.extract_filters(expr) 
    446512        indexset = self.indexsets[cls] 
    447513        keyattrs = self.primary_keys[cls] 
     
    454520                break 
    455521        else: 
    456             raise ValueError("The given filters %r are not indexed for %r." % 
    457                              (tuple(filters.keys()), cls.__name__)) 
     522            # Signal the caller that no index scan was performed. 
     523            return None 
    458524         
    459525        ids = indexset.get(indexcriteria) 
     
    466532            # Query the cache for multiple units (by id). 
    467533            units = indexset.scan(ids) 
    468             print units 
    469534            misses = [k for k in ids if k not in units] 
    470             print misses 
    471535        else: 
    472536            # Query the cache for multiple units (by id). 
     
    496560                misstep = zip(*misses[step:step + self.index_stride])[0] 
    497561                f = lambda x: getattr(x, keyattrs[0]) in misstep 
    498                 print f 
    499562                for unit in mainstore.recall(cls, f): 
    500563                    units[tuple([getattr(unit, a) for a in keyattrs])] = unit 
     564                    try: 
     565                        self.save(unit, forceSave=True) 
     566                    except KeyError: 
     567                        # The cache refused to save the unit (possibly full). 
     568                        pass 
    501569        elif misses: 
    502570            # ...or all in one chunk if desired. 
     
    504572            misstep = zip(*misses)[0] 
    505573            f = lambda x: getattr(x, keyattrs[0]) in misstep 
    506             print f 
    507574            for unit in mainstore.recall(cls, f): 
    508575                units[tuple([getattr(unit, a) for a in keyattrs])] = unit 
     576                try: 
     577                    self.save(unit, forceSave=True) 
     578                except KeyError: 
     579                    # The cache refused to save the unit (possibly full). 
     580                    pass 
    509581         
    510582        # Preserve order 
    511         for k in ids: 
    512             yield units[k] 
     583        return [units[k] for k in ids] 
    513584 
    514585 
     
    526597        recall(Person, {age: 25}) -> ns:Person:index(age=25) = [(12, 34, 22)] 
    527598        recall(Person, {age: 64}) -> ns:Person:index(age=64) = [(7, 17, 27)] 
    528      
    529     In general, callers should use get, put, and scan together: 
    530          
    531         ids = index.get(filters) 
    532         if ids is None: 
    533             ids = expensive_lookup(cls, filters) 
    534             index.put(filters, ids) 
    535         units = index.scan(ids) 
    536         misses = [k for k in ids if k not in units] 
    537599    """ 
    538600     
     
    633695            # to perform single gets against memcached, rather than the 
    634696            # get_multi calls that self.xrecall performs. 
    635             ids = self.get(dict([(k, filters[k]) for k in index])) 
     697            indexcriteria = dict([(k, filters[k]) for k in index]) 
     698            ids = self.get(indexcriteria) 
    636699            if ids: 
    637700                for id in ids: 
     
    698761            if ident not in indexnode: 
    699762                indexnode.append(ident) 
    700             self.put(indexcriteria, indexnode) 
     763                self.put(indexcriteria, indexnode) 
    701764     
    702765    def discard(self, unit): 
     
    709772            if ident in indexnode: 
    710773                indexnode.remove(ident) 
    711             self.put(indexcriteria, indexnode) 
    712  
     774                self.put(indexcriteria, indexnode) 
     775