Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

Changeset 572

Show
Ignore:
Timestamp:
11/05/07 09:14:11
Author:
fumanchu
Message:

Crazycache: working indexes!

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/crazycache/dejavu/storage/caching.py

    r571 r572  
    4747        self.fulljoin = allOptions.get("fulljoin", False) 
    4848        self.cache_recalls = [] 
    49         self.xspecial_stride = 50 
    5049         
    5150        self.cache = allOptions.get("cache") 
     
    7473        return u 
    7574     
    76     def xspecial(self, cls, key, value, order=None): 
    77         """Yield multiple units of the given cls where key=value.""" 
    78         keyattrs = self.cache.primary_keys[cls] 
    79          
    80         # Get a cached list of identifier-tuples, ordered if requested. 
    81         # TODO: add order to the idkey. 
    82         ids = self.cache.get_index(cls, {key: value}) 
    83         if ids is None: 
    84             # Not in the cache. Grab the list of id-tuples from nextstore. 
    85             ids = self.view((cls, keyattrs, {key: value}), order=order) 
    86             # Then cache the list result for next time. 
    87             self.cache.put_index(cls, {key: value}, ids, time = 5 * 60) 
    88          
    89         # Query the cache for multiple units (by id). 
    90         items = self.cache.scan_index(cls, ids) 
    91         misses = [k for k in ids if k not in items] 
    92          
    93         # Now query the DB for any items that the cache missed... 
    94         if self.xspecial_stride: 
    95             # ...in chunks of length: self.xspecial_stride. 
    96             for step in xrange(0, len(misses), self.xspecial_stride): 
    97                 # TODO: allow for multiple identifiers 
    98                 f = lambda x: ((getattr(x, keyattrs[0]), ) in 
    99                                misses[step:step + self.xspecial_stride]) 
    100                 for unit in self.recall(cls, f): 
    101                     items[tuple([getattr(unit, a) for a in keyattrs])] = unit 
    102         elif misses: 
    103             # ...or all in one chunk if desired. 
    104             f = lambda x: (getattr(x, keyattrs[0]), ) in misses 
    105             for unit in self.recall(cls, f): 
    106                 items[tuple([getattr(unit, a) for a in keyattrs])] = unit 
    107          
    108         # Return non-null items only, preserving order 
    109         for k in ids: 
    110             yield items[k] 
    111 ##            unit = items[k] 
    112 ##            if unit is not None: 
    113 ##                yield unit 
    114      
    11575    def xrecall(self, classes, expr=None, order=None, limit=None, offset=None): 
    11676        """Return a Unit iterator.""" 
     
    13797            return 
    13898         
     99        if offset and not order: 
     100            raise TypeError("Order argument expected when offset is provided.") 
     101         
     102        if cls.identifiers and cls in self.cache.classes: 
     103            fc = expr.func.func_code 
     104            compkeys = fc.co_names[1:] 
     105            # TODO: allow multiple keys (as long as one of them is indexed). 
     106            if (fc.co_code == simple_attr_lookup and len(compkeys) == 1 
     107                    # Hmmmmm. Should we allow on-the-fly indices? 
     108                    # Something bugs me about that idea, but I don't know what. 
     109                    and getattr(cls, compkeys[0]).index 
     110                    ): 
     111                filters = {compkeys[0]: fc.co_consts[1]} 
     112                try: 
     113                    units = self.cache.scan(self.nextstore, cls, filters, order) 
     114                except ValueError: 
     115                    pass 
     116                else: 
     117                    for unit in units: 
     118                        yield unit 
     119                    return 
     120         
    139121        seen = {} 
    140         if order: 
     122        if not order: 
    141123            # If an order is supplied, there's no point in running the 
    142124            # query against our cache (because we'd have to interleave 
    143             # the results with those from storage anyway). 
    144             pass 
    145         elif offset: 
    146             raise TypeError("Order argument expected when offset is provided.") 
    147         elif cls.identifiers and cls in self.cache.classes: 
    148             fc = expr.func.func_code 
    149             compkeys = fc.co_names[1:] 
    150             if (fc.co_code == simple_attr_lookup and len(compkeys) == 1 
    151                 # TODO: use getattr(cls, compkeys[0]).index to dispatch. 
    152 ##                and (cls, compkeys[0]) in self.cache_recalls 
    153                 ): 
    154                 for unit in self.xspecial(cls, compkeys[0], fc.co_consts[1], order): 
    155                     yield unit 
    156                 return 
    157              
     125            # the results with those from storage anyway). In fact, the 
     126            # only reason to hit the cache at all here is to either hit 
     127            # the limit or allow the caller to stop iterating before 
     128            # reaching the DB. 
    158129            if self.fullquery: 
    159130                # Query the cache. 
    160                 for unit in self.cache.xrecall(cls, expr, limit=limit, offset=offset): 
     131                for unit in self.cache.xrecall(cls, expr, limit=limit): 
    161132                    seen[unit.identity()] = None 
    162133                    yield unit 
  • branches/crazycache/dejavu/storage/storememcached.py

    r571 r572  
    4444            ObjectCache.nextstore to maintain the primary indexes--this 
    4545            allows the cache to run orders of magnitude faster. 
    46      
    47     Secondary Indexes: 
    48      
    49     We don't even try to invalidate any of the secondary indexes, because 
    50     to do so we'd either have to 1) keep around the old criteria for each 
    51     modified unit (to determine from which index to remove the unit), or 
    52     2) rely on an index of indexes, neither of which we want to do. 
    53     They're just there for x/recall() and consumers of ObjectCache; 
    54     we invalidate by timeout, not by event. 
     46         
     47        memcached.index_time: the timeout, in seconds, for any cached indexes. 
     48            Default is 300 seconds. 
    5549    """ 
    5650     
     
    6054        self.name = allOptions['name'] 
    6155        self.global_index = allOptions.pop("memcached.global_index", True) 
     56        self.index_time = allOptions.pop("memcached.index_time", 5 * 60) 
    6257        self.primary_keys = {} 
    63         self.indices = {} 
     58        self.indexsets = {} 
     59        self.index_stride = 50 
    6460         
    6561        cache_opts = dict([(k[10:], v) for k, v in allOptions.iteritems() 
     
    7773        ident = tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
    7874        key = "%s:%s:%s" % (self.name, cls.__name__, self.hash(ident)) 
    79         return ident, key 
     75        return key 
    8076     
    8177    def unit(self, cls, **kwargs): 
     
    9490         
    9591        # Try to retrieve a matching unit using an index. 
    96         # Sort them from most-specific (most properties) to least. 
    9792        # If self.global_index is True, the last one should 
    9893        # be an index with propnames == []. See self.register. 
    99         indices = list(self.indices[cls]) 
    100         indices.sort(lambda x, y: cmp(len(y), len(x))) 
    101         for propnames in indices: 
    102             if keyset >= set(propnames): 
    103                 unit = self._unit_from_index(cls, propnames, kwargs) 
     94        indexset = self.indexsets[cls] 
     95        for index in indexset: 
     96            if keyset >= set(index): 
     97                unit = indexset.unit(index, kwargs) 
    10498                if unit is not None: 
    10599                    if self.logflags & logflags.RECALL: 
     
    121115         
    122116        cls = classes 
     117        indexset = self.indexsets[cls] 
    123118         
    124119        if expr: 
     
    140135                 
    141136                # Try to retrieve matching units using an index. 
    142                 # Sort them from most-specific (most properties) to least. 
    143137                # If self.global_index is True, the last one should 
    144138                # be an index with propnames == []. See self.register. 
    145                 indices = list(self.indices[cls]) 
    146                 indices.sort(lambda x, y: cmp(len(y), len(x))) 
    147                 for propnames in indices: 
    148                     if set(compkeys) >= set(propnames): 
    149                         data = self._xrecall_from_index(cls, propnames, filters) 
     139                for index in indexset: 
     140                    if set(compkeys) >= set(index): 
     141                        data = indexset.xrecall(index, filters) 
    150142                        data = self._xrecall_inner(data, expr) 
    151143                        for unit in self._paginate(data, order, limit, offset, single=True): 
     
    154146         
    155147        if self.global_index: 
    156             data = self._xrecall_from_index(cls, [], {}) 
     148            data = indexset.xrecall([], {}) 
    157149            data = self._xrecall_inner(data, expr) 
    158150            for unit in self._paginate(data, order, limit, offset, single=True): 
     
    178170            # includes _initial_property_hash. 
    179171            unit.cleanse() 
    180              
    181             ident, key = self._unit_key(unit) 
    182             self.client.set(key, unit) 
    183              
    184             # See the class doc for more information about secondary indexes. 
    185             if self.global_index: 
    186                 cls = unit.__class__ 
    187                 index = self.get_index(cls, {}) or set() 
    188                 index.add(ident) 
    189                 self.put_index(cls, {}, index) 
     172            self.client.set(self._unit_key(unit), unit) 
     173            self.indexsets[unit.__class__].add(unit) 
    190174     
    191175    def destroy(self, unit): 
     
    194178            self.log(logflags.DESTROY.message(unit)) 
    195179         
    196         ident, key = self._unit_key(unit) 
    197         self.client.delete(key) 
    198          
    199         # See the class doc for more information about secondary indexes. 
    200         if self.global_index: 
    201             cls = unit.__class__ 
    202             index = self.get_index(cls, {}) or set() 
    203             index.discard(ident) 
    204             self.put_index(cls, {}, index) 
     180        self.client.delete(self._unit_key(unit)) 
     181        self.indexsets[unit.__class__].discard(unit) 
    205182     
    206183    def reserve(self, unit): 
     
    208185        if unit.identifiers: 
    209186            cls = unit.__class__ 
    210             if self.global_index: 
    211                 # See the class doc for more information about secondary indexes. 
    212                 index = self.get_index(cls, {}) or set() 
    213                  
    214                 if not unit.sequencer.valid_id(unit.identity()): 
    215                     ids = [u.identity() for u in 
    216                            self.scan_index(cls, index).itervalues()] 
     187            indexset = self.indexsets[cls] 
     188             
     189            if not unit.sequencer.valid_id(unit.identity()): 
     190                if self.global_index: 
     191                    # Try to generate an identifier by looking 
     192                    # up all units in the global index. 
     193                    index = indexset.get({}) or [] 
     194                    ids = [u.identity() 
     195                           for u in indexset.scan(index).itervalues()] 
    217196                    unit.sequencer.assign(unit, ids) 
    218                 unit.cleanse() 
    219                  
    220                 # Add the unit to the cache. 
    221                 ident, key = self._unit_key(unit) 
    222                 self.client.add(key, unit) 
    223                  
    224                 # Add the unit to the global index. 
    225                 index.add(ident) 
    226                 self.put_index(cls, {}, index) 
    227             else: 
    228                 if not unit.sequencer.valid_id(unit.identity()): 
     197                else: 
    229198                    raise NotImplementedError( 
    230199                        "Unindexed memcache cannot generate identifiers.") 
    231                  
    232                 unit.cleanse() 
    233                 ident, key = self._unit_key(unit) 
    234                 try: 
    235                     self.client.add(key, unit) 
    236                 except IOError, exc: 
    237                     if exc.args[0] == 'NOT_STORED': 
    238                         pass 
    239                     raise 
     200             
     201            unit.cleanse() 
     202             
     203            # Add the unit to the cache. 
     204            try: 
     205                self.client.add(self._unit_key(unit), unit) 
     206            except IOError, exc: 
     207                if exc.args[0] == 'NOT_STORED': 
     208                    pass 
     209                raise 
     210             
     211            # Add the unit to all indices. 
     212            indexset.add(unit) 
    240213        else: 
    241214            # This class has no identifiers, so skip reserve and wait for save. 
     
    279252        if self.global_index: 
    280253            try: 
    281                 self.client.add(self._index_key(cls, {}), set()
     254                self.client.add(self.indexsets[cls].key({}), []
    282255            except IOError, exc: 
    283256                if exc.args[0] == 'NOT STORED': 
     
    312285        if self.global_index: 
    313286            # TODO: recalculate if primary_keys changed 
    314             ci = self.client.get(self._index_key(cls, {})) or set() 
     287            ci = self.client.get(self.indexsets[cls].key({})) or [] 
    315288            for id in ci: 
    316289                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     
    325298        if self.global_index: 
    326299            clsname = cls.__name__ 
    327             ci = self.client.get(self._index_key(cls, {})) 
     300            ci = self.client.get(self.indexsets[cls].key({})) 
    328301             
    329302            if not ci: 
     
    351324         
    352325        if self.global_index: 
    353             ci = self.client.get(self._index_key(cls, {})) or set() 
     326            ci = self.client.get(self.indexsets[cls].key({})) or [] 
    354327            for id in ci: 
    355328                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     
    371344         
    372345        if self.global_index: 
    373             ci = self.client.get(self._index_key(cls, {})) or set() 
     346            ci = self.client.get(self.indexsets[cls].key({})) or [] 
    374347            for id in ci: 
    375348                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     
    386359    def cachelen(self, cls): 
    387360        if self.global_index: 
    388             return len(self.client.get(self._index_key(cls, {}))) 
     361            return len(self.client.get(self.indexsets[cls].key({}))) 
    389362        else: 
    390363            return 0 
     
    393366        units = [] 
    394367        if self.global_index: 
    395             for key in self.client.get(self._index_key(cls, {})): 
     368            for key in self.client.get(self.indexsets[cls].key({})): 
    396369                unit = self.client.get(key) 
    397370                if unit is not None: 
     
    404377         
    405378        if self.global_index: 
     379            gi_key = self.indexsets[cls].key({}) 
    406380            # Delete all units in the global index. 
    407             for id in self.client.get(self._index_key(cls, {})) or set()
     381            for id in self.client.get(gi_key) or []
    408382                key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
    409383                self.client.delete(key) 
    410384             
    411385            # Delete the global index. 
    412             self.client.delete(self._index_key(cls, {})
     386            self.client.delete(gi_key
    413387        # TODO: 
    414388        # else: 
    415389        #     self.increment_generation(cls) 
    416      
    417      
    418     #                               Indexing                               # 
    419390     
    420391    def register(self, cls): 
     
    425396         
    426397        # Add indices based on the .index attribute of each UnitProperty. 
    427         self.indices[cls] = i = set(
     398        self.indexsets[cls] = i = IndexSet(self, cls
    428399        for propname in cls.properties: 
    429400            prop = getattr(cls, propname) 
     
    431402                # No need for an index on the primary key; 
    432403                # we can just fetch each one directly by cache key. 
    433                 if not prop.key
    434                     i.add((prop,)
     404                if propname not in cls.identifiers
     405                    i.add_index(propname
    435406         
    436407        # Add an index with no propnames. This is a special 
    437408        # sentinel value for the global index that keeps us DRY. 
    438409        if self.global_index: 
    439             i.add(()
     410            i.add_index(
    440411         
    441412        storage.StorageManager.register(self, cls) 
    442      
    443     def _index_key(self, cls, filters): 
    444         """Return the cache key for the index of the given class and filters. 
    445          
    446         If filters is an empty dict, the 'global index' key is returned. 
    447         """ 
    448         criteria = ["%s=%s" % (k, str(v).replace(" ", "+")) 
    449                     for k, v in filters.iteritems()] 
    450         return '%s:%s:index(%s)' % (self.name, cls.__name__, 
    451                                     ",".join(criteria)) 
    452      
    453     def get_index(self, cls, filters): 
    454         """Return a cached list of unit ids which match the given filters dict. 
    455          
    456         The ids returned will be a list of tuples of the form: 
    457             tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
    458          
    459         In general, callers should use get_index, put_index, and scan_index 
    460         together: 
    461              
    462             ids = get_index(cls, ...) 
    463             if ids is None: 
    464                 ids = expensive_lookup(cls, ...) 
    465                 put_index(cls, ...) 
    466             items = scan_index(cls, ids) 
    467             misses = [k for k in ids if k not in items] 
    468         """ 
    469         cache_key = self._index_key(cls, filters) 
    470         ids = self.client.get(cache_key) 
    471         if self.logflags & logflags.IO: 
    472             if ids is None: 
    473                 idlen = None 
    474             else: 
    475                 idlen = len(ids) 
    476             self.log(logflags.IO.message("INDEX GET (%s) len %r" % 
    477                                          (cache_key, idlen))) 
    478         return ids 
    479      
    480     def put_index(self, cls, filters, ids, time=0): 
    481         """Cache a list of unit identifiers which match the given filters dict. 
    482          
    483         The ids provided MUST be a list of tuples of the form: 
    484             tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
    485         """ 
    486         cache_key = self._index_key(cls, filters) 
    487         if self.logflags & logflags.IO: 
    488             self.log(logflags.IO.message("INDEX PUT (%s) len %r" % 
    489                                          (cache_key, len(ids)))) 
    490         self.client.set(cache_key, ids, time=time) 
    491      
    492     def scan_index(self, cls, ids): 
    493         """Return a dict of multiple units from the given set of ids. 
    494          
    495         The ids provided MUST be a list of tuples of the form: 
    496             tuple([getattr(unit, name) for name in self.primary_keys[cls]]) 
    497          
    498         The returned dict will not contain entries for missed ids. 
    499         """ 
    500         if ids: 
    501             clsname = cls.__name__ 
    502             keys = ["%s:%s:%s" % (self.name, clsname, self.hash(id)) 
    503                     for id in ids] 
    504             data = self.client.get_multi(keys) 
    505              
    506             # Transform the dict back to id keys instead of cache keys. 
    507             units = {} 
    508             for i, k in zip(ids, keys): 
    509                 unit = data.get(k, None) 
    510                 if unit is not None: 
    511                     units[i] = unit 
    512         else: 
    513             units = {} 
    514          
    515         if self.logflags & logflags.IO: 
    516             self.log(logflags.IO.message("INDEX SCAN %s (%r hits of %r)" % 
    517                                          (cls.__name__, len(units), len(ids)))) 
    518         return units 
    519413     
    520414    def _unit_by_primary_key(self, cls, keys, filters): 
     
    548442        return None 
    549443     
    550     def _unit_from_index(self, cls, keys, filters): 
    551         """Scan the (cls, keys) index for a unit which matches the filters dict. 
    552          
    553         The filters argument must contain an entry for each key in the 
    554         given list of keys, although it may and often should contain 
    555         additional entries. 
    556         """ 
    557         if set(filters.keys()) > set(keys): 
    558             for unit in self._xrecall_from_index(cls, keys, filters): 
     444    def scan(self, mainstore, cls, filters, order): 
     445        """Return units from a cached index, if possible.""" 
     446        indexset = self.indexsets[cls] 
     447        keyattrs = self.primary_keys[cls] 
     448         
     449        # Get a cached list of identifier-tuples, ordered if requested. 
     450        # TODO: add order to the idkey. 
     451        for index in indexset: 
     452            if set(filters.keys()) >= set(index): 
     453                indexcriteria = dict([(k, filters[k]) for k in index]) 
     454                break 
     455        else: 
     456            raise ValueError("The given filters %r are not indexed for %r." % 
     457                             (tuple(filters.keys()), cls.__name__)) 
     458         
     459        ids = indexset.get(indexcriteria) 
     460        if ids is None: 
     461            # Not in the cache. Grab the list of id-tuples from nextstore. 
     462            ids = mainstore.view((cls, keyattrs, filters), order=order) 
     463            # Then cache the list result for next time. 
     464            indexset.put(indexcriteria, ids, time=self.index_time) 
     465             
     466            # Query the cache for multiple units (by id). 
     467            units = indexset.scan(ids) 
     468            print units 
     469            misses = [k for k in ids if k not in units] 
     470            print misses 
     471        else: 
     472            # Query the cache for multiple units (by id). 
     473            units = indexset.scan(ids) 
     474             
     475            # Remove any idents from the index node that no longer 
     476            # satisfy the index criteria. This is how we update 
     477            # index nodes--eager adds but late discards. 
     478            removals = False 
     479            for id, unit in units.items(): 
     480                for key, value in filters.iteritems(): 
     481                    if getattr(unit, key) != value: 
     482                        removals = True 
     483                        del units[id] 
     484                        ids.remove(id) 
     485                        break 
     486            if removals: 
     487                indexset.put(indexcriteria, ids, time=self.index_time) 
     488             
     489            misses = [k for k in ids if k not in units] 
     490         
     491        # Now query the nextstore for any units that the cache missed... 
     492        if self.index_stride: 
     493            # ...in chunks of length: self.index_stride. 
     494            for step in xrange(0, len(misses), self.index_stride): 
     495                # TODO: allow for multiple identifiers 
     496                misstep = zip(*misses[step:step + self.index_stride])[0] 
     497                f = lambda x: getattr(x, keyattrs[0]) in misstep 
     498                print f 
     499                for unit in mainstore.recall(cls, f): 
     500                    units[tuple([getattr(unit, a) for a in keyattrs])] = unit 
     501        elif misses: 
     502            # ...or all in one chunk if desired. 
     503            # TODO: allow for multiple identifiers 
     504            misstep = zip(*misses)[0] 
     505            f = lambda x: getattr(x, keyattrs[0]) in misstep 
     506            print f 
     507            for unit in mainstore.recall(cls, f): 
     508                units[tuple([getattr(unit, a) for a in keyattrs])] = unit 
     509         
     510        # Preserve order 
     511        for k in ids: 
     512            yield units[k] 
     513 
     514 
     515class IndexSet(object): 
     516    """A set of indices for a single class. 
     517     
     518    Each index covers a tuple of unit attributes. 
     519     
     520    Each leaf node of each index is stored in memcached under its own key; 
     521    each value is a list of tuple([unit.k for k in primary_keys[cls]]). 
     522    For example, given an index over ("age", ), each distinct recall 
     523    operation will produce its own index node: 
     524         
     525        recall(Person, {age: 31}) -> ns:Person:index(age=31) = [(1132, 663)] 
     526        recall(Person, {age: 25}) -> ns:Person:index(age=25) = [(12, 34, 22)] 
     527        recall(Person, {age: 64}) -> ns:Person:index(age=64) = [(7, 17, 27)] 
     528     
     529    In general, callers should use get, put, and scan together: 
     530         
     531        ids = index.get(filters) 
     532        if ids is None: 
     533            ids = expensive_lookup(cls, filters) 
     534            index.put(filters, ids) 
     535        units = index.scan(ids) 
     536        misses = [k for k in ids if k not in units] 
     537    """ 
     538     
     539    def __init__(self, store, cls): 
     540        self.store = store 
     541        self.cls = cls 
     542        self._key_template = '%s:%s:index(%%s)' % (store.name, cls.__name__) 
     543        self._indices = [] 
     544     
     545    def add_index(self, *attributes): 
     546        """Add an index over the given attributes.""" 
     547        # Sort them from most-specific (most properties) to least. 
     548        if attributes not in self._indices: 
     549            self._indices.append(attributes) 
     550            self._indices.sort(lambda x, y: cmp(len(y), len(x))) 
     551     
     552    def __iter__(self): 
     553        return iter(self._indices) 
     554     
     555    def key(self, filters): 
     556        """Return the cache key for the given filters. 
     557         
     558        If filters is an empty dict, the 'global index' key is returned. 
     559        """ 
     560        criteria = ["%s=%s" % (k, str(v).replace(" ", "+")) 
     561                    for k, v in filters.iteritems()] 
     562        return self._key_template % ",".join(criteria) 
     563     
     564    def get(self, filters): 
     565        """Return a cached list of unit ids which match the given filters dict. 
     566         
     567        The ids returned will be a list of tuples of the form: 
     568            tuple([getattr(unit, name) for name in primary_keys[cls]]) 
     569        """ 
     570        cache_key = self.key(filters) 
     571        ids = self.store.client.get(cache_key) 
     572        if self.store.logflags & logflags.IO: 
     573            if ids is None: 
     574                idlen = None 
     575            else: 
     576                idlen = len(ids) 
     577            self.store.log(logflags.IO.message("INDEX GET (%s) len %r" % 
     578                                               (cache_key, idlen))) 
     579        return ids 
     580     
     581    def put(self, filters, ids, time=0): 
     582        """Cache a list of unit identifiers which match the given filters dict. 
     583         
     584        The ids provided MUST be a list of tuples of the form: 
     585            tuple([getattr(unit, name) for name in primary_keys[cls]]) 
     586        """ 
     587        cache_key = self.key(filters) 
     588        if self.store.logflags & logflags.IO: 
     589            self.store.log(logflags.IO.message("INDEX PUT (%s) len %r: %r" % 
     590                                               (cache_key, len(ids), ids))) 
     591        self.store.client.set(cache_key, ids, time=time) 
     592     
     593    def scan(self, ids): 
     594        """Return a dict of multiple units from the given list of ids. 
     595         
     596        The ids provided MUST be a list of tuples of the form: 
     597            tuple([getattr(unit, name) for name in primary_keys[cls]]) 
     598         
     599        The returned dict will not contain entries for missed ids. 
     600        """ 
     601        clsname = self.cls.__name__ 
     602        if ids: 
     603            keys = ["%s:%s:%s" % (self.store.name, clsname, self.store.hash(id)) 
     604                    for id in ids] 
     605            data = self.store.client.get_multi(keys) 
     606             
     607            # Transform the dict back to id keys instead of cache keys. 
     608            units = {} 
     609            for i, k in zip(ids, keys): 
     610                unit = data.get(k, None) 
     611                if unit is not None: 
     612                    units[i] = unit 
     613        else: 
     614            units = {} 
     615         
     616        if self.store.logflags & logflags.IO: 
     617            self.store.log(logflags.IO.message("INDEX SCAN %s (%r hits of %r)" % 
     618                                               (clsname, len(units), len(ids)))) 
     619        return units 
     620     
     621    def unit(self, index, filters): 
     622        """Return a unit from the index which matches the filters dict (or None). 
     623         
     624        The filters argument must contain an entry for each key in the given 
     625        index, although it may and often should contain additional entries. 
     626        """ 
     627        if set(filters.keys()) > set(index): 
     628            for unit in self.xrecall(index, filters): 
    559629                return unit 
    560630        else: 
    561             clsname = cls.__name__ 
     631            clsname = self.cls.__name__ 
    562632            # If the filters and index keys are equal, it should be faster 
    563633            # to perform single gets against memcached, rather than the 
    564             # get_multi calls that _xrecall_from_index performs. 
    565             ids = self.get_index(cls, dict([(k, filters[k]) for k in keys])) 
     634            # get_multi calls that self.xrecall performs. 
     635            ids = self.get(dict([(k, filters[k]) for k in index])) 
    566636            if ids: 
    567637                for id in ids: 
    568                     cache_key = "%s:%s:%s" % (self.name, clsname, self.hash(id)) 
     638                    cache_key = "%s:%s:%s" % (self.store.name, clsname, self.hash(id)) 
    569639                    unit = self.client.get(cache_key) 
    570640                    if unit is None: 
    571                         if self.logflags & logflags.IO: 
    572                             self.log(logflags.IO.message('INDEX MISS (%s) %s' % 
    573                                                         (cache_key, filters))) 
     641                        if self.store.logflags & logflags.IO: 
     642                            self.store.log(logflags.IO.message( 
     643                                'INDEX MISS (%s) %s' % (cache_key, filters))) 
    574644                    else: 
    575                         if self.logflags & logflags.IO: 
    576                             self.log(logflags.IO.message('INDEX HIT (%s) %s' % 
    577                                                         (cache_key, filters))) 
     645                        if self.store.logflags & logflags.IO: 
     646                            self.store.log(logflags.IO.message( 
     647                                'INDEX HIT (%s) %s' % (cache_key, filters))) 
    578648                        unit.cleanse() 
    579649                        return unit 
    580650            else: 
    581                 if self.logflags & logflags.IO: 
    582                     self.log(logflags.IO.message('INDEX EMPTY (%s) %s' % 
    583                                                 (clsname, filters))) 
     651                if self.store.logflags & logflags.IO: 
     652                    self.store.log(logflags.IO.message( 
     653                        'INDEX EMPTY (%s) %s' % (clsname, filters))) 
    584654        return None 
    585655     
    586     def _xrecall_from_index(self, cls, keys, filters): 
    587         """Yield units from the (cls, keys) index which match the filters dict. 
    588          
    589         The filters argument must contain an entry for each key in the 
    590         given list of keys, although it may and often should contain 
    591         additional entries. 
    592         """ 
    593         partial_index = set(filters.keys()) > set(keys
    594         ids = self.get_index(cls, dict([(k, filters[k]) for k in keys])
     656    def xrecall(self, index, filters): 
     657        """Yield units from the given index which match the filters dict. 
     658         
     659        The filters argument must contain an entry for each key in the given 
     660        index, although it may and often should contain additional entries. 
     661        """ 
     662        partial_index = set(filters.keys()) > set(index) 
     663        indexcriteria = dict([(k, filters[k]) for k in index]
     664        ids = self.get(indexcriteria
    595665        if ids: 
    596             units = self.scan_index(cls, ids) 
     666            units = self.scan(ids) 
     667             
     668            removals = False 
    597669            # Preserve order by iterating over the retrieved ids 
    598670            # instead of the retrieved units. 
     
    600672                unit = units.get(id, None) 
    601673                if unit is not None: 
    602                     matching = True 
    603                      
    604                     if partial_index: 
    605                         # Filter in full now. 
    606                         for k, v in filters.iteritems(): 
    607                             if getattr(unit, k) != v: 
    608                                 matching = False 
    609                                 break 
    610                      
    611                     if matching: 
     674                    for k, v in filters.iteritems(): 
     675                        if getattr(unit, k) != v: 
     676                            if k in index: 
     677                                removals = True 
     678                                del units[id] 
     679                                ids.remove(id) 
     680                            break 
     681                    else: 
    612682                        unit.cleanse() 
    613683                        yield unit 
    614  
     684             
     685            if removals: 
     686                # Remove any idents from the index node that no longer 
     687                # satisfy the index criteria. This is how we update 
     688                # index nodes--eager adds but late discards. 
     689                self.put(indexcriteria, ids, time=self.store.index_time) 
     690     
     691    def add(self, unit): 
     692        """Add the given unit to all indices.""" 
     693        ident = tuple([getattr(unit, name) 
     694                       for name in self.store.primary_keys[self.cls]]) 
     695        for index in self._indices: 
     696            indexcriteria = dict([(k, getattr(unit, k)) for k in index]) 
     697            indexnode = self.get(indexcriteria) or [] 
     698            if ident not in indexnode: 
     699                indexnode.append(ident) 
     700            self.put(indexcriteria, indexnode) 
     701     
     702    def discard(self, unit): 
     703        """Discard the given unit from all indices.""" 
     704        ident = tuple([getattr(unit, name) 
     705                       for name in self.store.primary_keys[self.cls]]) 
     706        for index in self._indices: 
     707            indexcriteria = dict([(k, getattr(unit, k)) for k in index]) 
     708            indexnode = self.get(indexcriteria) or [] 
     709            if ident in indexnode: 
     710                indexnode.remove(ident) 
     711            self.put(indexcriteria, indexnode) 
     712