Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/branches/crazycache/dejavu/storage/caching.py

Revision 577 (checked in by fumanchu, 1 year ago)

Crazycache: Test and fix for some indexing errors.

  • Property svn:eol-style set to native
Line 
1 """Caching Storage Managers for Dejavu."""
2
3 import datetime
4 import dejavu
5 from dejavu import logic, logflags, recur
6 from dejavu.storage import ProxyStorage, resolve
7
8
9 class ObjectCache(ProxyStorage):
10     """A Proxy Storage Manager which recalls and keeps Units in memory.
11     
12     The recall, reserve, and save methods place units in the cache;
13     the destroy method invalidates those entries. Units which exist in the
14     cache perfectly reflect the units in the next store; any modifications
15     are immediately written to the next store. Unit classes which have
16     no identifiers are not cached.
17     
18     This is primarily designed to be used in OLTP environments, and then
19     only for a small subset of classes for which a sizable number of
20     objects are usually recalled by ID but rarely modified.
21     
22     Options:
23         
24         cache: if given, this should be a StorageManager instance for the
25             store which will actually retain cached data (the ObjectCache
26             is just a dispatcher). If not given, the default cache will be
27             an instance of storeram.RAMStorage.
28         
29         fulljoin: if True, perform recalls involving multiple classes using
30             the cache. This can be quite slow when the involved classes are
31             large. If False (the default), multirecall will skip reading
32             the cache and query self.nextstore directly.
33     """
34    
35     def __init__(self, allOptions={}):
36         ProxyStorage.__init__(self, allOptions)
37        
38         self.fulljoin = allOptions.get("fulljoin", False)
39         self.cache_recalls = []
40        
41         self.cache = allOptions.get("cache")
42         if self.cache is None:
43             self.cache = resolve("ram")
44    
45     def unit(self, cls, **kwargs):
46         """A single Unit which matches the given kwargs, else None.
47         
48         The first Unit matching the kwargs is returned; if no Units match,
49         None is returned.
50         """
51         if cls in self.cache.classes:
52             u = self.cache.unit(cls, **kwargs)
53             if u is not None:
54                 return u
55        
56         u = self.nextstore.unit(cls, **kwargs)
57         if u is not None:
58             try:
59                 self.cache.save(u, forceSave=True)
60             except KeyError:
61                 # The cache refused to save the unit (possibly full).
62                 pass
63        
64         return u
65    
66     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
67         """Return a Unit iterator."""
68         if isinstance(classes, dejavu.UnitJoin):
69             for unitrow in self._xmultirecall(classes, expr, order=order,
70                                               limit=limit, offset=offset):
71                 yield unitrow
72                 return
73        
74         cls = classes
75        
76         # Units which have no identifiers are not cached
77         if not cls.identifiers:
78             for unit in ProxyStorage.xrecall(self, cls, expr, order=order,
79                                              limit=limit, offset=offset):
80                 yield unit
81                 return
82        
83         if not isinstance(expr, logic.Expression):
84             expr = logic.Expression(expr)
85         if self.logflags & logflags.RECALL:
86             self.log(logflags.RECALL.message(cls, expr))
87        
88         if limit == 0:
89             return
90        
91         if offset and not order:
92             raise TypeError("Order argument expected when offset is provided.")
93        
94         # Try to retrieve units using a cached index.
95         if cls.identifiers and cls in self.cache.classes:
96             if hasattr(self.cache, 'scan'):
97                 units = self.cache.scan(self.nextstore, cls, expr)
98                 if units is not None:
99                     units = [(unit,) for unit in units]
100                     for unit in self._paginate(units, order, limit, offset,
101                                                single=True):
102                         yield unit
103                     return
104        
105         # Query storage.
106         if cls in self.cache.classes:
107             for unit in self.nextstore.xrecall(cls, expr, order=order,
108                                                limit=limit, offset=offset):
109                 try:
110                     self.cache.save(unit, forceSave=True)
111                 except KeyError:
112                     # The cache refused to save the unit (possibly full).
113                     pass
114                 yield unit
115         else:
116             for unit in self.nextstore.xrecall(cls, expr, order=order,
117                                                limit=limit, offset=offset):
118                 yield unit
119    
120     def _xmultirecall(self, classes, expr=None, order=None, limit=None, offset=None):
121         """Yield lists of units of the given classes which match expr."""
122         if self.fulljoin:
123             # Use the superclass' _combine method. This can be VERY slow,
124             # since ALL objects of each class will be read into memory,
125             # combined, and tested against the expression.
126             for unitrow in ProxyStorage._xmultirecall(
127                 self, classes, expr, order=order, limit=limit, offset=offset):
128                 yield unitrow
129         else:
130             # Skip reading from the cache (since it probably has poor
131             # performance for this kind of operation). But we'll still
132             # *write* to the cache.
133             seen = [{} for cls in classes]
134             for unitrow in self.nextstore._xmultirecall(
135                 classes, expr, order=order, limit=limit, offset=offset):
136                 for i, unit in enumerate(unitrow):
137                     if unit.__class__ in self.cache.classes:
138                         ident = unit.identity()
139                         if not unit.sequencer.valid_id(ident):
140                             # This is a 'dummy unit' from an outer join.
141                             continue
142                        
143                         if ident not in seen[i]:
144                             try:
145                                 self.cache.save(unit, forceSave=True)
146                             except KeyError:
147                                 # The cache refused to save the unit (possibly full).
148                                 pass
149                         seen[i][ident] = None
150                 yield unitrow
151    
152     def save(self, unit, forceSave=False):
153         """Store the unit."""
154         if not unit.identifiers:
155             return ProxyStorage.save(self, unit, forceSave)
156        
157         if self.logflags & logflags.SAVE:
158             self.log(logflags.SAVE.message(unit, forceSave))
159        
160         # nextstore might call unit.cleanse()
161         update_cache = (unit.__class__ in self.cache.classes
162                         and (forceSave or unit.dirty()))
163         self.nextstore.save(unit, forceSave)
164         if update_cache:
165             try:
166                 self.cache.save(unit, forceSave=update_cache)
167             except KeyError:
168                 # The cache refused to save the unit (possibly full).
169                 pass
170    
171     def destroy(self, unit):
172         """Delete the unit."""
173         if not unit.identifiers:
174             return ProxyStorage.destroy(self, unit)
175        
176         if self.logflags & logflags.DESTROY:
177             self.log(logflags.DESTROY.message(unit))
178        
179         self.nextstore.destroy(unit)
180         self.invalidate(unit)
181    
182     def reserve(self, unit):
183         """Reserve storage space for the Unit."""
184         if not unit.identifiers:
185             return ProxyStorage.reserve(self, unit)
186        
187         # Allow the proxied store to set any auto-ID's
188         self.nextstore.reserve(unit)
189        
190         if unit.__class__ in self.cache.classes and not unit.dirty():
191             try:
192                 self.cache.reserve(unit)
193             except KeyError:
194                 # The cache refused to save the unit (possibly full).
195                 pass
196        
197         if self.logflags & logflags.RESERVE:
198             self.log(logflags.RESERVE.message(unit))
199    
200     def invalidate(self, unit):
201         if unit.identifiers and unit.__class__ in self.cache.classes:
202             self.cache.destroy(unit)
203    
204     def shutdown(self, conflicts='error'):
205         """Shut down all connections to internal storage.
206         
207         conflicts: see errors.conflict.
208         """
209         self.cache.shutdown(conflicts=conflicts)
210         self.nextstore.shutdown(conflicts=conflicts)
211    
212     def create_database(self, conflicts='error'):
213         """Create internal structures for the entire database.
214         
215         conflicts: see errors.conflict.
216         
217         This method will NOT create storage for each class, nor will
218         it create any dependent properties or indexes.
219         """
220         ProxyStorage.create_database(self, conflicts=conflicts)
221         self.cache.create_database(conflicts=conflicts)
222    
223     def drop_database(self, conflicts='error'):
224         """Destroy internal structures for the entire database.
225         
226         conflicts: see errors.conflict.
227         
228         This method will also drop storage for each class, including
229         all properties and indexes.
230         """
231         ProxyStorage.drop_database(self, conflicts=conflicts)
232         self.cache.drop_database(conflicts=conflicts)
233    
234     def create_storage(self, cls, conflicts='error'):
235         """Create internal structures for the given class.
236         
237         conflicts: see errors.conflict.
238         
239         This method will also create all dependent properties and indexes.
240         """
241         ProxyStorage.create_storage(self, cls, conflicts=conflicts)
242         if cls in self.cache.classes:
243             self.cache.create_storage(cls, conflicts=conflicts)
244    
245     def drop_storage(self, cls, conflicts='error'):
246         """Destroy internal structures for the given class.
247         
248         conflicts: see errors.conflict.
249         
250         This method will also drop all dependent properties and indexes.
251         """
252         ProxyStorage.drop_storage(self, cls, conflicts=conflicts)
253         if cls in self.cache.classes:
254             self.cache.drop_storage(cls, conflicts=conflicts)
255    
256     def add_property(self, cls, name, conflicts='error'):
257         """Add internal structures for the given property.
258         
259         conflicts: see errors.conflict.
260         """
261         if self.logflags & logflags.DDL:
262             self.log(logflags.DDL.message("add property %r %r" % (cls, name)))
263         self.nextstore.add_property(cls, name, conflicts=conflicts)
264         if cls in self.cache.classes:
265             self.cache.add_property(cls, name, conflicts=conflicts)
266    
267     def drop_property(self, cls, name, conflicts='error'):
268         """Destroy internal structures for the given property.
269         
270         conflicts: see errors.conflict.
271         """
272         if self.logflags & logflags.DDL:
273             self.log(logflags.DDL.message("drop property %r %r" % (cls, name)))
274         self.nextstore.drop_property(cls, name, conflicts=conflicts)
275         if cls in self.cache.classes:
276             self.cache.drop_property(cls, name, conflicts=conflicts)
277    
278     def rename_property(self, cls, oldname, newname, conflicts='error'):
279         """Rename internal structures for the given property.
280         
281         conflicts: see errors.conflict.
282         """
283         if self.logflags & logflags.DDL:
284             self.log(logflags.DDL.message("rename property %r from %r to %r"
285                                  % (cls, oldname, newname)))
286         self.nextstore.rename_property(cls, oldname, newname, conflicts=conflicts)
287         if cls in self.cache.classes:
288             self.cache.rename_property(cls, oldname, newname, conflicts=conflicts)
289    
290     def map(self, classes, conflicts='error'):
291         """Map classes to internal storage.
292         
293         conflicts: see errors.conflict.
294         """
295         ProxyStorage.map(self, classes, conflicts=conflicts)
296        
297         classes = set(classes).intersection(self.cache.classes)
298         if classes:
299             self.cache.map(classes, conflicts=conflicts)
300    
301     def start(self, isolation=None):
302         ProxyStorage.start(self, isolation)
303         if self.cache.start:
304             self.cache.start(isolation)
305    
306     def rollback(self):
307         ProxyStorage.rollback(self)
308         if self.cache.rollback:
309             self.cache.rollback()
310    
311     def commit(self):
312         ProxyStorage.commit(self)
313         if self.cache.commit:
314             self.cache.commit()
315
316
317 class AgedCache(ObjectCache):
318     """A Proxy Storage Manager which recalls and keeps Units in memory.
319     
320     The recall, reserve, and save methods place units in the cache;
321     the destroy method invalidates those entries. Units which exist in the
322     cache perfectly reflect the units in the next store; any modifications
323     are immediately written to the next store. Unit classes which have
324     no identifiers are not cached. DDL methods empty all involved caches.
325     
326     This is primarily designed to be used in OLTP environments, and then
327     only for a small subset of classes for which a sizable number of
328     objects are usually recalled by ID but rarely modified.
329     
330     The 'lifetime' option should be the number of seconds (a float)
331     between "sweeps". Each sweep checks the last recall date of all
332     objects in the cache, invalidating any that have been idle longer
333     than the given lifetime. Note that the sweeper Worker is not started
334     for you; you must either call sm.sleeper.start() at regular intervals,
335     or use a recur.Scheduler to cycle it for you.
336     """
337    
338     def __init__(self, allOptions={}):
339         ObjectCache.__init__(self, allOptions)
340        
341         self._recallTimes = {}  # {cls: {id: datetime.datetime}}
342         # Create and motivate a worker to sweep out idle Units.
343         lifetime = allOptions.get('Lifetime', '')
344         if lifetime:
345            
346             class IdleSweeper(recur.Worker):
347                 """A worker to sweep out idle Units."""
348                 def work(me):
349                     """Start a cycle of scheduled work."""
350                     # Note that 'self' refers to the Proxy, not the Worker.
351                     self.sweep_all()
352             self.sweeper = IdleSweeper(lifetime)
353    
354     def map(self, classes, conflicts='error'):
355         """Map classes to internal storage.
356         
357         conflicts: see errors.conflict.
358         """
359         ProxyStorage.map(self, classes, conflicts=conflicts)
360        
361         classes = set(classes).intersection(self.cache.classes)
362         if classes:
363             self.cache.map(classes, conflicts=conflicts)
364             for cls in classes:
365                 self._recallTimes.setdefault(cls, {})
366    
367     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
368         """Return a Unit iterator."""
369         if isinstance(classes, dejavu.UnitJoin):
370             for unitrow in self._xmultirecall(classes, expr, order=order,
371                                               limit=limit, offset=offset):
372                 yield unitrow
373             return
374        
375         cls = classes
376         if cls in self.cache.classes:
377             start = datetime.datetime.now()
378             recallTimes = self._recallTimes[cls]
379             for unit in ObjectCache.xrecall(self, cls, expr,
380                                             order, limit, offset):
381                 recallTimes[unit.identity()] = start
382                 yield unit
383         else:
384             for unit in ObjectCache.xrecall(self, cls, expr,
385                                             order, limit, offset):
386                 yield unit
387    
388     def reserve(self, unit):
389         """Reserve storage space for the Unit."""
390         ObjectCache.reserve(self, unit)
391         cls = unit.__class__
392         if cls in self.cache.classes:
393             recallTimes = self._recallTimes[cls]
394             recallTimes[unit.identity()] = datetime.datetime.now()
395    
396     def invalidate(self, unit):
397         if unit.identifiers:
398             cls = unit.__class__
399             if cls in self.cache.classes:
400                 ObjectCache.invalidate(self, unit)
401                 try:
402                     del self._recallTimes[cls][unit.identity()]
403                 except KeyError:
404                     pass
405    
406     def sweep(self, cls, lastSweepTime=None):
407         """Sweep idle units out of the cache for the given class."""
408         recallTimes = self._recallTimes[cls]
409         for unit in self.cache.xrecall(cls):
410             id = unit.identity()
411             lastRecall = recallTimes.setdefault(id, None)
412             if (lastRecall is None or lastSweepTime is None or lastRecall < lastSweepTime):
413                 self.invalidate(unit)
414    
415     def sweep_all(self, lastSweepTime=None):
416         """Sweep idle units out of the cache for all classes."""
417         for cls in self.cache.classes:
418             self.sweep(cls, lastSweepTime)
419
420
421 class BurnedCache(ObjectCache):
422     """An Object Cache which recalls and caches ALL Units.
423     
424     The big performance difference for a burned cache is that, once _any_
425     Units have been recalled, all Units for that class are placed in the
426     cache, so further recalls won't hit the next store unless the cache
427     is completely emptied.
428     
429     Notice we didn't say "performance _benefit_" ;) That would depend to
430     a great extent on the proxied store.
431     
432     This should NOT be used with lossy caches like memcached, since it
433     depends on always having a complete cache of a given class.
434     """
435    
436     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
437         """Return a Unit iterator."""
438         if isinstance(classes, dejavu.UnitJoin):
439             return self._xmultirecall(classes, expr, order=order,
440                                       limit=limit, offset=offset)
441        
442         cls = classes
443         # Units which have no identifiers are not cached
444         if not cls.identifiers:
445             return ProxyStorage.xrecall(self, cls, expr, order=order,
446                                         limit=limit, offset=offset)
447         else:
448             if