Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

Changeset 571

Show
Ignore:
Timestamp:
11/03/07 18:54:19
Author:
fumanchu
Message:

Crazycache: Lots of memcached changes to support secondary indices. Changed 'indexed' option to 'global_index'. Changed keyattrs to primary_keys.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/crazycache/dejavu/storage/caching.py

    r569 r571  
    7676    def xspecial(self, cls, key, value, order=None): 
    7777        """Yield multiple units of the given cls where key=value.""" 
    78         keyattrs = self.cache._keyattrs[cls] 
     78        keyattrs = self.cache.primary_keys[cls] 
    7979         
    8080        # Get a cached list of identifier-tuples, ordered if requested. 
    8181        # TODO: add order to the idkey. 
    82         ids = self.cache.get_index(cls, key, value
     82        ids = self.cache.get_index(cls, {key: value}
    8383        if ids is None: 
    8484            # Not in the cache. Grab the list of id-tuples from nextstore. 
    8585            ids = self.view((cls, keyattrs, {key: value}), order=order) 
    8686            # Then cache the list result for next time. 
    87             self.cache.put_index(cls, key, value, ids, time = 5 * 60) 
     87            self.cache.put_index(cls, {key: value}, ids, time = 5 * 60) 
    8888         
    8989        # Query the cache for multiple units (by id). 
     
    155155                    yield unit 
    156156                return 
    157             elif self.fullquery: 
     157             
     158            if self.fullquery: 
    158159                # Query the cache. 
    159160                for unit in self.cache.xrecall(cls, expr, limit=limit, offset=offset): 
  • branches/crazycache/dejavu/storage/storememcached.py

    r570 r571  
    11import md5 
    22import memcache 
    3  
    4 try: 
    5     import cPickle as pickle 
    6 except ImportError: 
    7     import pickle 
    83 
    94try: 
     
    1712 
    1813 
     14simple_attr_lookup = logic.Expression(lambda x: x.Thing == 4 
     15                                      ).func.func_code.co_code 
     16 
     17 
    1918class MemcachedStorageManager(storage.StorageManager): 
    2019    """A Storage Manager which keeps all data in memcached. 
     
    3837            These will be passed directly into the memcache.Client instance. 
    3938         
    40         memcached.indexed: if True (the default), this store will maintain 
    41             an index of all stored objects in memcached itself. This is the 
    42             'safe' choice, and necessary if your only store is memcached. 
    43             If you run this store as an ObjectCache.cache, however, you 
    44             should turn this off, allowing ObjectCache.nextstore to maintain 
    45             the indexes--this allows the cache to run orders of magnitude 
    46             faster. 
     39        memcached.global_index: if True (the default), this store will 
     40            maintain a index over the identifiers of all stored objects in 
     41            memcached itself. This is the 'safe' choice, and necessary if 
     42            your only store is memcached. However, if you run this store as 
     43            an ObjectCache.cache, you should turn this off, allowing 
     44            ObjectCache.nextstore to maintain the primary indexes--this 
     45            allows the cache to run orders of magnitude faster. 
     46     
     47    Secondary Indexes: 
     48     
     49    We don't even try to invalidate any of the secondary indexes, because 
     50    to do so we'd either have to 1) keep around the old criteria for each 
     51    modified unit (to determine from which index to remove the unit), or 
     52    2) rely on an index of indexes, neither of which we want to do. 
     53    They're just there for x/recall() and consumers of ObjectCache; 
     54    we invalidate by timeout, not by event. 
    4755    """ 
    4856     
     
    5159         
    5260        self.name = allOptions['name'] 
    53         self.indexed = allOptions.pop("memcached.indexed", True) 
     61        self.global_index = allOptions.pop("memcached.global_index", True) 
     62        self.primary_keys = {} 
     63        self.indices = {} 
    5464         
    5565        cache_opts = dict([(k[10:], v) for k, v in allOptions.iteritems() 
    5666                           if k.startswith("memcached.")]) 
    5767        self.client = memcache.Client(**cache_opts) 
    58          
    59         self._keyattrs = {} 
    6068     
    6169    def hash(self, object): 
    6270        """Return a consistent hash for object (for use in a memcached key).""" 
    63         return md5.new(pickle.dumps(object)).hexdigest() 
    64      
    65     def _index_key(self, cls): 
    66         """Return the key for the cached index of the given class.""" 
    67         return "%s:%s(%s)" % (self.name, cls.__name__, 
    68                               ",".join(self._keyattrs[cls])) 
    69      
    70     def register(self, cls): 
    71         """Assert that Units of class 'cls' will be handled.""" 
    72         self._keyattrs[cls] = tuple(cls.identifiers or cls.properties) 
    73         storage.StorageManager.register(self, cls) 
     71        # TODO: can we add overflow support for collisions? 
     72        return md5.new(repr(object)).hexdigest() 
    7473     
    7574    def _unit_key(self, unit): 
    76         """Return the memcached key for the given unit.""" 
     75        """Return (ident, memcached key) for the given unit.""" 
    7776        cls = unit.__class__ 
    78         ident = tuple([getattr(unit, name) for name in self._keyattrs[cls]]) 
    79         return "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident)) 
     77        ident = tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
     78        key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident)) 
     79        return ident, key 
    8080     
    8181    def unit(self, cls, **kwargs): 
     
    8585        None is returned. 
    8686        """ 
    87         filters = set(kwargs.keys()) 
    88         keyattrs = self._keyattrs[cls] 
    89         if filters >= set(keyattrs): 
    90             # Looking up a Unit by its identifiers. 
    91             # Skip grabbing the cached class index (a HUGE optimization). 
    92             key = tuple([kwargs[k] for k in keyattrs]) 
    93             key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(key)) 
    94             unit = self.client.get(key) 
    95             if unit is not None: 
    96                 matching = True 
    97                 if filters > set(keyattrs): 
    98                     # We retrieved the Unit using a subset of the filters. 
    99                     # Filter in full now. 
    100                     for k, v in kwargs.iteritems(): 
    101                         if getattr(unit, k) != v: 
    102                             matching = False 
    103                             break 
    104                  
    105                 if matching: 
    106                     unit.cleanse() 
     87        keyset = set(kwargs.keys()) 
     88         
     89        # Try to retrieve a matching unit using its primary_keys. 
     90        # This will skip grabbing any indices (a HUGE optimization). 
     91        pk = self.primary_keys[cls] 
     92        if keyset >= set(pk): 
     93            return self._unit_by_primary_key(cls, pk, kwargs) 
     94         
     95        # Try to retrieve a matching unit using an index. 
     96        # Sort them from most-specific (most properties) to least. 
     97        # If self.global_index is True, the last one should 
     98        # be an index with propnames == []. See self.register. 
     99        indices = list(self.indices[cls]) 
     100        indices.sort(lambda x, y: cmp(len(y), len(x))) 
     101        for propnames in indices: 
     102            if keyset >= set(propnames): 
     103                unit = self._unit_from_index(cls, propnames, kwargs) 
     104                if unit is not None: 
    107105                    if self.logflags & logflags.RECALL: 
    108106                        self.log(logflags.RECALL.message(cls, ('HIT', kwargs))) 
    109107                    return unit 
    110              
    111             if self.logflags & logflags.RECALL: 
    112                 self.log(logflags.RECALL.message(cls, ('MISS', kwargs))) 
    113             return None 
    114          
    115         if self.indexed: 
    116             if self.logflags & logflags.RECALL: 
    117                 self.log(logflags.RECALL.message(cls, ('INDEX', kwargs))) 
    118              
    119             ci = self.client.get(self._index_key(cls)) or set() 
    120             try: 
    121                 expr = logic.filter(**kwargs) 
    122                 return self._xrecall_inner(ci, expr).next()[0] 
    123             except StopIteration: 
    124                 return None 
    125         else: 
    126             if self.logflags & logflags.RECALL: 
    127                 self.log(logflags.RECALL.message(cls, ('DEFER', kwargs))) 
    128             return None 
     108         
     109        # Return None since we have no more access paths. 
     110        if self.logflags & logflags.RECALL: 
     111            self.log(logflags.RECALL.message(cls, ('DEFER', kwargs))) 
     112        return None 
    129113     
    130114    def xrecall(self, classes, expr=None, order=None, limit=None, offset=None): 
    131115        """Yield units of the given cls which match the given expr.""" 
    132         if not self.indexed: 
    133             return iter([]) 
    134          
    135116        if isinstance(classes, dejavu.UnitJoin): 
    136             return self._xmultirecall(classes, expr, order=order, 
    137                                       limit=limit, offset=offset) 
     117            for units in self._xmultirecall(classes, expr, order=order, 
     118                                            limit=limit, offset=offset): 
     119                yield units 
     120            return 
    138121         
    139122        cls = classes 
    140         if self.logflags & logflags.RECALL: 
    141             self.log(logflags.RECALL.message(cls, expr)) 
    142          
    143         ci = self.client.get(self._index_key(cls)) or set() 
    144         if ci: 
    145             data = self._xrecall_inner(ci, expr) 
    146             return self._paginate(data, order, limit, offset, single=True) 
     123         
     124        if expr: 
     125            if not isinstance(expr, logic.Expression): 
     126                expr = logic.Expression(expr) 
     127            fc = expr.func.func_code 
     128            compkeys = fc.co_names[1:] 
     129            # TODO: allow multiple filter keys. 
     130            if (fc.co_code == simple_attr_lookup and len(compkeys) == 1): 
     131                compvals = fc.co_consts[1:] 
     132                filters = dict([(k, v) for k, v in zip(compkeys, compvals)]) 
     133                 
     134                # Try to retrieve a matching unit using its primary_keys. 
     135                # This will skip grabbing any indices (a HUGE optimization). 
     136                pk = self.primary_keys[cls] 
     137                if set(compkeys) >= set(pk): 
     138                    yield self._unit_by_primary_key(cls, pk, filters) 
     139                    return 
     140                 
     141                # Try to retrieve matching units using an index. 
     142                # Sort them from most-specific (most properties) to least. 
     143                # If self.global_index is True, the last one should 
     144                # be an index with propnames == []. See self.register. 
     145                indices = list(self.indices[cls]) 
     146                indices.sort(lambda x, y: cmp(len(y), len(x))) 
     147                for propnames in indices: 
     148                    if set(compkeys) >= set(propnames): 
     149                        data = self._xrecall_from_index(cls, propnames, filters) 
     150                        data = self._xrecall_inner(data, expr) 
     151                        for unit in self._paginate(data, order, limit, offset, single=True): 
     152                            yield unit 
     153                        return 
     154         
     155        if self.global_index: 
     156            data = self._xrecall_from_index(cls, [], {}) 
     157            data = self._xrecall_inner(data, expr) 
     158            for unit in self._paginate(data, order, limit, offset, single=True): 
     159                yield unit 
    147160        else: 
    148             return iter([]) 
    149      
    150     def _xrecall_inner(self, keys, expr=None): 
     161            # Yield nothing since we have no access paths. 
     162            pass 
     163     
     164    def _xrecall_inner(self, units, expr=None): 
    151165        """Private helper for self.xrecall.""" 
    152         units = self.client.get_multi(keys) 
    153         # Iterate over the keys in the same order we were given. 
    154         for key in keys: 
    155             unit = units.get(key, None) 
    156             if unit is not None and expr is None or expr(unit): 
    157                 unit.cleanse() 
     166        for unit in units: 
     167            if expr is None or expr(unit): 
    158168                # Must yield a sequence for use in _paginate. 
    159169                yield (unit,) 
     
    169179            unit.cleanse() 
    170180             
    171             key = self._unit_key(unit) 
     181            ident, key = self._unit_key(unit) 
    172182            self.client.set(key, unit) 
    173183             
    174             if self.indexed: 
     184            # See the class doc for more information about secondary indexes. 
     185            if self.global_index: 
    175186                cls = unit.__class__ 
    176                 ci = self.client.get(self._index_key(cls)) or set() 
    177                 ci.add(key
    178                 self.client.set(self._index_key(cls), ci
     187                index = self.get_index(cls, {}) or set() 
     188                index.add(ident
     189                self.put_index(cls, {}, index
    179190     
    180191    def destroy(self, unit): 
     
    183194            self.log(logflags.DESTROY.message(unit)) 
    184195         
    185         key = self._unit_key(unit) 
     196        ident, key = self._unit_key(unit) 
    186197        self.client.delete(key) 
    187198         
    188         if self.indexed: 
     199        # See the class doc for more information about secondary indexes. 
     200        if self.global_index: 
    189201            cls = unit.__class__ 
    190             ci = self.client.get(self._index_key(cls)) or set() 
    191             ci.discard(key
    192             self.client.set(self._index_key(cls), ci
     202            index = self.get_index(cls, {}) or set() 
     203            index.discard(ident
     204            self.put_index(cls, {}, index
    193205     
    194206    def reserve(self, unit): 
     
    196208        if unit.identifiers: 
    197209            cls = unit.__class__ 
    198             if self.indexed: 
    199                 ci = self.client.get(self._index_key(cls)) or set() 
     210            if self.global_index: 
     211                # See the class doc for more information about secondary indexes. 
     212                index = self.get_index(cls, {}) or set() 
    200213                 
    201214                if not unit.sequencer.valid_id(unit.identity()): 
    202                     ids = [] 
    203                     for key in ci: 
    204                         otherunit = self.client.get(key) 
    205                         if otherunit is not None: 
    206                             ids.append(otherunit.identity()) 
     215                    ids = [u.identity() for u in 
     216                           self.scan_index(cls, index).itervalues()] 
    207217                    unit.sequencer.assign(unit, ids) 
    208218                unit.cleanse() 
    209219                 
    210                 key = self._unit_key(unit) 
     220                # Add the unit to the cache. 
     221                ident, key = self._unit_key(unit) 
    211222                self.client.add(key, unit) 
    212223                 
    213                 ci.add(key) 
    214                 self.client.set(self._index_key(cls), ci) 
     224                # Add the unit to the global index. 
     225                index.add(ident) 
     226                self.put_index(cls, {}, index) 
    215227            else: 
    216228                if not unit.sequencer.valid_id(unit.identity()): 
     
    219231                 
    220232                unit.cleanse() 
    221                 key = self._unit_key(unit) 
     233                ident, key = self._unit_key(unit) 
    222234                try: 
    223235                    self.client.add(key, unit) 
     
    265277            self.log(logflags.DDL.message("create storage %s" % cls)) 
    266278         
    267         try: 
    268             self.client.add(self._index_key(cls), []) 
    269         except IOError, exc: 
    270             if exc.args[0] == 'NOT STORED': 
    271                 errors.conflict(conflicts, "Class %r already has storage." 
    272                                 % cls) 
    273             else: 
    274                 raise 
     279        if self.global_index: 
     280            try: 
     281                self.client.add(self._index_key(cls, {}), set()) 
     282            except IOError, exc: 
     283                if exc.args[0] == 'NOT STORED': 
     284                    errors.conflict(conflicts, "Class %r already has storage." 
     285                                    % cls) 
     286                else: 
     287                    raise 
    275288     
    276289    def has_storage(self, cls): 
     
    292305        conflicts: see errors.conflict. 
    293306        """ 
     307        clsname = cls.__name__ 
    294308        if self.logflags & logflags.DDL: 
    295             self.log(logflags.DDL.message("add property %s %s" % (cls, name))) 
    296          
    297         if self.indexed: 
    298             ci = self.client.get(self._index_key(cls)) or [] 
    299             for key in ci: 
     309            self.log(logflags.DDL.message("add property %s %s" % 
     310                                          (clsname, name))) 
     311         
     312        if self.global_index: 
     313            # TODO: recalculate if primary_keys changed 
     314            ci = self.client.get(self._index_key(cls, {})) or set() 
     315            for id in ci: 
     316                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
    300317                unit = self.client.get(key) 
    301318                if unit is not None: 
    302319                    unit._properties[name] = None 
    303320                    unit.cleanse() 
    304                     self.client.set(self._unit_key(unit), unit) 
     321                    self.client.set(key, unit) 
    305322     
    306323    def has_property(self, cls, name): 
    307324        """If storage structures exist for the given property, return True.""" 
    308         if self.indexed: 
    309             ci = self.client.get(self._index_key(cls)) 
     325        if self.global_index: 
     326            clsname = cls.__name__ 
     327            ci = self.client.get(self._index_key(cls, {})) 
    310328             
    311329            if not ci: 
     
    314332                return True 
    315333             
    316             for key in ci: 
     334            for id in ci: 
     335                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
    317336                unit = self.client.get(key) 
    318337                if unit is not None: 
     
    326345        conflicts: see errors.conflict. 
    327346        """ 
     347        clsname = cls.__name__ 
    328348        if self.logflags & logflags.DDL: 
    329             self.log(logflags.DDL.message("drop property %s %s" % (cls, name))) 
    330          
    331         if self.indexed: 
    332             ci = self.client.get(self._index_key(cls)) or [] 
    333             for key in ci: 
     349            self.log(logflags.DDL.message("drop property %s %s" % 
     350                                          (clsname, name))) 
     351         
     352        if self.global_index: 
     353            ci = self.client.get(self._index_key(cls, {})) or set() 
     354            for id in ci: 
     355                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
    334356                unit = self.client.get(key) 
    335357                if unit is not None: 
    336358                    del unit._properties[name] 
    337359                    unit.cleanse() 
    338                     self.client.set(self._unit_key(unit), unit) 
     360                    self.client.set(key, unit) 
    339361     
    340362    def rename_property(self, cls, oldname, newname, conflicts='error'): 
     
    343365        conflicts: see errors.conflict. 
    344366        """ 
     367        clsname = cls.__name__ 
    345368        if self.logflags & logflags.DDL: 
    346369            self.log(logflags.DDL.message("rename property %s from %s to %s" 
    347370                                          % (cls, oldname, newname))) 
    348371         
    349         if self.indexed: 
    350             ci = self.client.get(self._index_key(cls)) or [] 
    351             for key in ci: 
     372        if self.global_index: 
     373            ci = self.client.get(self._index_key(cls, {})) or set() 
     374            for id in ci: 
     375                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
    352376                unit = self.client.get(key) 
    353377                if unit is not None: 
     
    355379                    del unit._properties[oldname] 
    356380                    unit.cleanse() 
    357                     self.client.set(self._unit_key(unit), unit) 
     381                    self.client.set(key, unit) 
    358382     
    359383     
     
    361385     
    362386    def cachelen(self, cls): 
    363         if self.indexed
    364             return len(self.client.get(self._index_key(cls))) 
     387        if self.global_index
     388            return len(self.client.get(self._index_key(cls, {}))) 
    365389        else: 
    366390            return 0 
     
    368392    def cached_units(self, cls): 
    369393        units = [] 
    370         if self.indexed
    371             for key in self.client.get(self._index_key(cls)): 
     394        if self.global_index
     395            for key in self.client.get(self._index_key(cls, {})): 
    372396                unit = self.client.get(key) 
    373397                if unit is not None: 
     
    377401    def flush(self, cls): 
    378402        """Dump all objects of the given class.""" 
    379         if self.indexed: 
    380             # Delete all units in the class index. 
    381             for key in self.client.get(self._index_key(cls)) or []: 
     403        clsname = cls.__name__ 
     404         
     405        if self.global_index: 
     406            # Delete all units in the global index. 
     407            for id in self.client.get(self._index_key(cls, {})) or set(): 
     408                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
    382409                self.client.delete(key) 
    383410             
    384             # Delete the class index. 
    385             self.client.delete(self._index_key(cls)) 
     411            # Delete the global index. 
     412            self.client.delete(self._index_key(cls, {})) 
    386413        # TODO: 
    387414        # else: 
    388415        #     self.increment_generation(cls) 
    389416     
    390     def get_index(self, cls, key, value): 
    391         """Return a cached list of unit identifiers where unit.key == value. 
     417     
     418    #                               Indexing                               # 
     419     
     420    def register(self, cls): 
     421        """Assert that Units of class 'cls' will be handled.""" 
     422        # Set a default primary key for the class. Consumers are free to 
     423        # change this if another unique property is looked up more often. 
     424        self.primary_keys[cls] = tuple(cls.identifiers or cls.properties) 
     425         
     426        # Add indices based on the .index attribute of each UnitProperty. 
     427        self.indices[cls] = i = set() 
     428        for propname in cls.properties: 
     429            prop = getattr(cls, propname) 
     430            if prop.index: 
     431                # No need for an index on the primary key; 
     432                # we can just fetch each one directly by cache key. 
     433                if not prop.key: 
     434                    i.add((prop,)) 
     435         
     436        # Add an index with no propnames. This is a special 
     437        # sentinel value for the global index that keeps us DRY. 
     438        if self.global_index: 
     439            i.add(()) 
     440         
     441        storage.StorageManager.register(self, cls) 
     442     
     443    def _index_key(self, cls, filters): 
     444        """Return the cache key for the index of the given class and filters. 
     445         
     446        If filters is an empty dict, the 'global index' key is returned. 
     447        """ 
     448        criteria = ["%s=%s" % (k, str(v).replace(" ", "+")) 
     449                    for k, v in filters.iteritems()] 
     450        return '%s:%s:index(%s)' % (self.name, cls.__name__, 
     451                                    ",".join(criteria)) 
     452     
     453    def get_index(self, cls, filters): 
     454        """Return a cached list of unit ids which match the given filters dict. 
    392455         
    393456        The ids returned will be a list of tuples of the form: 
    394             tuple([getattr(unit, name) for name in self._keyattrs[cls]]) 
     457            tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
    395458         
    396459        In general, callers should use get_index, put_index, and scan_index 
     
    404467            misses = [k for k in ids if k not in items] 
    405468        """ 
    406         cachekey = '%s:%s:ids(%s:%s)' % (self.name, cls.__name__, key, 
    407                                          str(value).replace(" ", "+")) 
    408         ids = self.client.get(cachekey) 
     469        cache_key = self._index_key(cls, filters) 
     470        ids = self.client.get(cache_key) 
    409471        if self.logflags & logflags.IO: 
    410472            if ids is None: 
     
    412474            else: 
    413475                idlen = len(ids) 
    414             self.log(logflags.IO.message("INDEX GET %s (%r == %r) len %r" % 
    415                                          (cls.__name__, key, value, idlen))) 
     476            self.log(logflags.IO.message("INDEX GET (%s) len %r" % 
     477                                         (cache_key, idlen))) 
    416478        return ids 
    417479     
    418     def put_index(self, cls, key, value, ids, time=None): 
    419         """Cache a list of unit identifiers where unit.key == value
     480    def put_index(self, cls, filters, ids, time=0): 
     481        """Cache a list of unit identifiers which match the given filters dict
    420482         
    421483        The ids provided MUST be a list of tuples of the form: 
    422             tuple([getattr(unit, name) for name in self._keyattrs[cls]]) 
    423         """ 
     484            tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
     485        """ 
     486        cache_key = self._index_key(cls, filters) 
    424487        if self.logflags & logflags.IO: 
    425             self.log(logflags.IO.message("INDEX PUT %s (%r == %r) len %r" % 
    426                                          (cls.__name__, key, value, len(ids)))) 
    427         cachekey = '%s:%s:ids(%s:%s)' % (self.name, cls.__name__, key, 
    428                                          str(value).replace(" ", "+")) 
    429         self.client.set(cachekey, ids, time=time) 
     488            self.log(logflags.IO.message("INDEX PUT (%s) len %r" % 
     489                                         (cache_key, len(ids)))) 
     490        self.client.set(cache_key, ids, time=time) 
    430491     
    431492    def scan_index(self, cls, ids): 
    432         """Yield multiple units from the given set of ids. 
     493        """Return a dict of multiple units from the given set of ids. 
    433494         
    434495        The ids provided MUST be a list of tuples of the form: 
    435             tuple([getattr(unit, name) for name in self._keyattrs[cls]]) 
    436         """ 
    437         # Shortcut the case of no ids provided. 
    438         if not ids: 
    439             return {} 
    440          
    441         clsname = cls.__name__ 
    442         keys = ["%s:%s:%s" % (self.name, clsname, self.hash(i)) for i in ids] 
    443         units = self.client.get_multi(keys) 
     496            tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
     497         
     498        The returned dict will not contain entries for missed ids. 
     499        """ 
     500        if ids: 
     501            clsname = cls.__name__ 
     502            keys = ["%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     503                    for id in ids] 
     504            data = self.client.get_multi(keys) 
     505             
     506            # Transform the dict back to id keys instead of cache keys. 
     507            units = {} 
     508            for i, k in zip(ids, keys): 
     509                unit = data.get(k, None) 
     510                if unit is not None: 
     511                    units[i] = unit 
     512        else: 
     513            units = {} 
     514         
    444515        if self.logflags & logflags.IO: 
    445516            self.log(logflags.IO.message("INDEX SCAN %s (%r hits of %r)" % 
    446517                                         (cls.__name__, len(units), len(ids)))) 
    447518        return units 
     519     
     520    def _unit_by_primary_key(self, cls, keys, filters): 
     521        """Return a unit (or None) by primary keys which matches the filters dict. 
     522         
     523        The filters argument must contain an entry for each key in the 
     524        given list of keys, although it may and often should contain 
     525        additional entries. 
     526        """ 
     527        ident = tuple([filters[k] for k in keys]) 
     528        key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident)) 
     529        unit = self.client.get(key) 
     530        if unit is not None: 
     531            matching = True 
     532            if set(filters.keys()) > set(keys): 
     533                # We retrieved the Unit using a subset of the filters. 
     534                # Filter in full now. 
     535                for k, v in filters.iteritems(): 
     536                    if getattr(unit, k) != v: 
     537                        matching = False 
     538                        break 
     539             
     540            if matching: 
     541                if self.logflags & logflags.IO: 
     542                    self.log(logflags.IO.message('PK HIT (%s) %s' % (key, filters))) 
     543                unit.cleanse() 
     544                return unit 
     545         
     546        if self.logflags & logflags.IO: 
     547            self.log(logflags.IO.message('PK MISS (%s) %s' % (key, filters))) 
     548        return None 
     549     
     550    def _unit_from_index(self, cls, keys, filters): 
     551        """Scan the (cls, keys) index for a unit which matches the filters dict. 
     552         
     553        The filters argument must contain an entry for each key in the 
     554        given list of keys, although it may and often should contain 
     555        additional entries. 
     556        """ 
     557        if set(filters.keys()) > set(keys): 
     558            for unit in self._xrecall_from_index(cls, keys, filters): 
     559                return unit 
     560        else: 
     561            clsname = cls.__name__ 
     562            # If the filters and index keys are equal, it should be faster 
     563            # to perform single gets against memcached, rather than the 
     564            # get_multi calls that _xrecall_from_index performs. 
     565            ids = self.get_index(cls, dict([(k, filters[k]) for k in keys])) 
     566            if ids: 
     567                for id in ids: 
     568                    cache_key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     569                    unit = self.client.get(cache_key) 
     570                    if unit is None: 
     571                        if self.logflags & logflags.IO: 
     572                            self.log(logflags.IO.message('INDEX MISS (%s) %s' % 
     573                                                         (cache_key, filters))) 
     574                    else: 
     575                        if self.logflags & logflags.IO: 
     576                            self.log(logflags.IO.message('INDEX HIT (%s) %s' % 
     577                                                         (cache_key, filters))) 
     578                        unit.cleanse() 
     579                        return unit 
     580            else: 
     581                if self.logflags & logflags.IO: 
     582                    self.log(logflags.IO.message('INDEX EMPTY (%s) %s' % 
     583                                                 (clsname, filters))) 
     584        return None 
     585     
     586    def _xrecall_from_index(self, cls, keys, filters): 
     587        """Yield units from the (cls, keys) index which match the filters dict. 
     588         
     589        The filters argument must contain an entry for each key in the 
     590        given list of keys, although it may and often should contain 
     591        additional entries. 
     592        """ 
     593        partial_index = set(filters.keys()) > set(keys) 
     594        ids = self.get_index(cls, dict([(k, filters[k]) for k in keys])) 
     595        if ids: 
     596            units = self.scan_index(cls, ids) 
     597            # Preserve order by iterating over the retrieved ids 
     598            # instead of the retrieved units. 
     599            for id in ids: 
     600                unit = units.get(id, None) 
     601                if unit is not None: 
     602                    matching = True 
     603                     
     604                    if partial_index: 
     605                        # Filter in full now. 
     606                        for k, v in filters.iteritems(): 
     607                            if getattr(unit, k) != v: 
     608                                matching = False 
     609                                break 
     610                     
     611                    if matching: 
     612                        unit.cleanse() 
     613                        yield unit 
    448614 
  • branches/crazycache/dejavu/test/stores.py

    r538 r571  
    126126def memcached(fixture, mediated): 
    127127    opts = {'memcached.servers': ['127.0.0.1:11211'], 
    128             'memcached.indexed': True, 
     128            'memcached.global_index': True, 
    129129            'name': 'djvtest', 
    130130            } 
     
    133133def memcached2(fixture, mediated): 
    134134    opts = {'memcached.servers': ['127.0.0.1:11211'], 
    135             'memcached.indexed': False, 
     135            'memcached.global_index': False, 
    136136            'name': 'djvtest', 
    137137            }