Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/tags/1.4.0/storage/__init__.py

Revision 165 (checked in by fumanchu, 3 years ago)

Now distributing recur module with dejavu.

  • Property svn:eol-style set to native
Line 
1 """Storage Managers for Dejavu."""
2
3 import re
4 import datetime
5 import threading
6 import thread
7 try:
8     import cPickle as pickle
9 except ImportError:
10     import pickle
11
12 from dejavu import logic, recur
13
14
15 class StorageManager(object):
16     """A Manager base class for storing and retrieving Units.
17     
18     The base StorageManager class doesn't actually store anything;
19     it needs to be subclassed.
20     """
21    
22     def __init__(self, name, arena, allOptions={}):
23         self.name = name
24         self.arena = arena
25         self.classnames = [x.strip() for x
26                            in allOptions.get('Units', '').split(",")
27                            if x.strip()]
28         self.shutdownOrder = int(allOptions.get('Shutdown Order', '0'))
29    
30     def recall(self, unitClass, expr=None):
31         """Return an iterable object which will populate Units."""
32         raise NotImplementedError
33    
34     def save(self, unit, forceSave=False):
35         """Store the object's data{} dictionary."""
36         raise NotImplementedError
37    
38     def destroy(self, unit):
39         """Delete the object."""
40         raise NotImplementedError
41    
42     def reserve(self, unit):
43         """Reserve storage space for the Unit."""
44         raise NotImplementedError
45    
46     def shutdown(self):
47         pass
48    
49     def view(self, cls, attrs, expr=None):
50         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
51         raise NotImplementedError
52    
53     def distinct(self, cls, attrs, expr=None):
54         """Distinct values for given attributes."""
55         raise NotImplementedError
56    
57     def multirecall(self, classes, expr):
58         """Full inner join units from each class."""
59         raise NotImplementedError
60    
61     #                               Schemas                               #
62    
63     def create_database(self):
64         raise NotImplementedError("%s has no create_database method."
65                                   % self.__class__)
66    
67     def drop_database(self):
68         raise NotImplementedError("%s has no drop_database method."
69                                   % self.__class__)
70    
71     def create_storage(self, cls):
72         raise NotImplementedError("%s has no create_storage method."
73                                   % self.__class__)
74    
75     def has_storage(self, cls):
76         raise NotImplementedError("%s has no has_storage method."
77                                   % self.__class__)
78    
79     def drop_storage(self, cls):
80         raise NotImplementedError("%s has no drop_storage method."
81                                   % self.__class__)
82    
83     def add_property(self, cls, name):
84         raise NotImplementedError("%s has no add_property method."
85                                   % self.__class__)
86    
87     def drop_property(self, cls, name):
88         raise NotImplementedError("%s has no drop_property method."
89                                   % self.__class__)
90    
91     def rename_property(self, cls, oldname, newname):
92         raise NotImplementedError("%s has no rename_property method."
93                                   % self.__class__)
94
95
96 class ProxyStorage(StorageManager):
97     """A Storage Manager which passes calls to another Storage Manager."""
98    
99     nextstore = None
100    
101     def __init__(self, name, arena, allOptions={}):
102         StorageManager.__init__(self, name, arena, allOptions)
103        
104         nextstore = allOptions.get('Next Store')
105         if nextstore:
106             self.nextstore = arena.stores[nextstore]
107    
108     def recall(self, unitClass, expr=None):
109         if self.nextstore:
110             for unit in self.nextstore.recall(unitClass, expr):
111                 yield unit
112    
113     def save(self, unit, forceSave=False):
114         """Store the unit."""
115         if self.nextstore:
116             self.nextstore.save(unit, forceSave)
117    
118     def destroy(self, unit):
119         """Delete the unit."""
120         if self.nextstore:
121             self.nextstore.destroy(unit)
122    
123     def reserve(self, unit):
124         """Reserve storage space for the Unit."""
125         if self.nextstore:
126             self.nextstore.reserve(unit)
127    
128     def view(self, cls, attrs, expr=None):
129         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
130         if self.nextstore:
131             return self.nextstore.view(cls, attrs, expr)
132    
133     def distinct(self, cls, attrs, expr=None):
134         """distinct(cls, attrs, expr=None) -> Distinct values for given attributes."""
135         if self.nextstore:
136             return self.nextstore.distinct(cls, attrs, expr)
137    
138     def multirecall(self, classes, expr):
139         """multirecall(classes, expr) -> Full inner join units from each class."""
140         if self.nextstore:
141             return self.nextstore.multirecall(classes, expr)
142    
143     #                               Schemas                               #
144    
145     def create_database(self):
146         if self.nextstore:
147             self.nextstore.create_database()
148    
149     def drop_database(self):
150         if self.nextstore:
151             self.nextstore.drop_database()
152    
153     def create_storage(self, cls):
154         if self.nextstore:
155             self.nextstore.create_storage(cls)
156    
157     def has_storage(self, cls):
158         if self.nextstore:
159             return self.nextstore.has_storage(cls)
160         return False
161    
162     def drop_storage(self, cls):
163         if self.nextstore:
164             self.nextstore.drop_storage(cls)
165    
166     def add_property(self, cls, name):
167         if self.nextstore:
168             self.nextstore.add_property(cls, name)
169    
170     def drop_property(self, cls, name):
171         if self.nextstore:
172             self.nextstore.drop_property(cls, name)
173    
174     def rename_property(self, cls, oldname, newname):
175         if self.nextstore:
176             self.nextstore.rename_property(cls, oldname, newname)
177
178
179 class CachingProxy(ProxyStorage):
180     """A Proxy Storage Manager which recalls and keeps Units in memory."""
181    
182     def __init__(self, name, arena, allOptions={}):
183         ProxyStorage.__init__(self, name, arena, allOptions)
184        
185         self._caches = {}       # id: pickled Unit
186         self._cache_locks = {}
187         self._recallTimes = {}
188        
189         self.timer = None
190        
191         # Create and motivate a worker to sweep out idle Units.
192         lifetime = allOptions.get('Lifetime', '')
193         if lifetime:
194            
195             class IdleSweeper(recur.Worker):
196                 """A worker to sweep out idle Units."""
197                 def work(me):
198                     """Start a cycle of scheduled work."""
199                     # Note that 'self' refers to the Proxy, not the Worker.
200                     self.sweep_all()
201             self.sweeper = IdleSweeper(lifetime)
202    
203     def cachelen(self, cls):
204         return len(self._caches.get(cls, {}))
205    
206     def cached_units(self, cls):
207         return [pickle.loads(data) for data
208                 in self._caches.get(cls, {}).itervalues()]
209    
210     def _get_lock(self, unitClass):
211         if unitClass not in self._caches:
212             self._caches[unitClass] = {}
213             self._cache_locks[unitClass] = thread.allocate_lock()
214         lock = self._cache_locks[unitClass]
215         lock.acquire(True)
216         return lock
217    
218     def recall(self, unitClass, expr=None):
219         """Return a Unit iterator."""
220         currentTime = datetime.datetime.now()
221         lock = self._get_lock(unitClass)
222         try:
223             cache = self._caches[unitClass]
224             matches = {}
225            
226             # Run through our cache first. Hopefully, this will save us
227             # calling expr(unit) twice for each unit.
228             for id, pickledUnit in cache.iteritems():
229                 unit = pickle.loads(pickledUnit)
230                 if expr is None or expr(unit):
231                     matches[id] = unit
232            
233             if self.nextstore:
234                 for unit in self.nextstore.recall(unitClass, expr):
235                     id = unit.identity()
236                     if id not in cache:
237                         # Pickle the Unit to discard extraneous attributes,
238                         # and avoid identity issues.
239                         cache[id] = pickle.dumps(unit)
240                         self._recallTimes[id] = currentTime
241                        
242                         # Only add to matches if it wasn't already in our
243                         # cache (because stored units may have stale data).
244                         if id not in matches:
245                             matches[id] = unit
246            
247             return iter(matches.values())
248         finally:
249             lock.release()
250    
251     def save(self, unit, forceSave=False):
252         """Store the unit."""
253         # Don't call nextstore.save(). Defer that to sweep().
254        
255         # Don't check .dirty()!! If a unit changes from state A to
256         # to state B, then back again to *exactly* state A, dirty()
257         # will be False, and the cached data will stay in state B.
258 ##        if unit.dirty():
259        
260         lock = self._get_lock(unit.__class__)
261         try:
262             cache = self._caches[unit.__class__]
263             cache[unit.identity()] = pickle.dumps(unit)
264         finally:
265             lock.release()
266    
267     def destroy(self, unit):
268         """Delete the unit."""
269         unitClass = unit.__class__
270         lock = self._get_lock(unitClass)
271         try:
272             id = unit.identity()
273             cache = self._caches[unitClass]
274             if self.nextstore:
275                 self.nextstore.destroy(unit)
276             try:
277                 del cache[id]
278             except KeyError:
279                 pass
280             try:
281                 del self._recallTimes[id]
282             except KeyError:
283                 pass
284         finally:
285             lock.release()
286    
287     def view(self, cls, attrs, expr=None):
288         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
289        
290         if expr is None:
291             expr = logic.Expression(lambda x: True)
292        
293         lock = self._get_lock(cls)
294         try:
295             cache = self._caches[cls]
296             seen = []
297            
298             # Run through our cache first. Hopefully, this will save us
299             # calling expr(unit) twice for each unit.
300             for id, pickledUnit in cache.iteritems():
301                 unit = pickle.loads(pickledUnit)
302                 if expr is None or expr(unit):
303                     seen.append(tuple([getattr(unit, f) for f in attrs]))
304            
305             if self.nextstore:
306                
307                 # Add the identifier attributes if not present. This is
308                 # necessary to avoid duplicating objects which are
309                 # already in our cache.
310                 fields = list(attrs)
311                 indices = []
312                 added_fields = 0
313                 for prop in cls.identifiers:
314                     if prop.key not in fields:
315                         fields.append(prop.key)
316                         added_fields += 1
317                     indices.append(fields.index(prop.key))
318                
319                 for row in self.nextstore.view(cls, fields, expr):
320                     id = tuple([row[x] for x in indices])
321                     if id not in cache:
322                         if added_fields:
323                             # Remove the identifier columns from the row.
324                             row = row[:len(row) - added_fields]
325                         seen.append(row)
326             return iter(seen)
327         finally:
328             lock.release()
329    
330     def distinct(self, cls, attrs, expr=None):
331         """distinct(cls, attrs, expr=None) -> Distinct values for given attributes."""
332        
333         if expr is None:
334             expr = logic.Expression(lambda x: True)
335        
336         # Rather than repeat the logic in recall() where we mix cached
337         # and uncached Units, just call recall itself.
338         distvals = {}
339         for unit in self.recall(cls, expr):
340             val = tuple([getattr(unit, f) for f in attrs])
341             distvals[val] = None
342         return distvals.keys()
343    
344     def reserve(self, unit):
345         """Reserve storage space for the Unit."""
346         unitClass = unit.__class__
347         lock = self._get_lock(unitClass)
348         try:
349             cache = self._caches[unitClass]
350             if self.nextstore:
351                 self.nextstore.reserve(unit)
352             else:
353                 if not unit.sequencer.valid_id(unit.identity()):
354                     unit.sequencer.assign(unit, cache.keys())
355             # Pickle the Unit to discard extraneous attributes,
356             # and avoid identity issues.
357             id = unit.identity()
358             cache[id] = pickle.dumps(unit)
359             self._recallTimes[id] = datetime.datetime.now()
360         finally:
361             lock.release()
362    
363     def sweep(self, unitClass, lastSweepTime=None):
364         lock = self._get_lock(unitClass)
365         try:
366             cache = self._caches[unitClass]
367             for id in cache.keys():
368                 lastRecall = self._recallTimes.setdefault(id, None)
369                 if (lastRecall is None or lastSweepTime is None
370                     or lastRecall < lastSweepTime):
371                     unit = pickle.loads(cache[id])
372                     if unit.dirty():
373                         self.nextstore.save(unit)
374                    
375                     del cache[id]
376                     del self._recallTimes[id]
377         finally:
378             lock.release()
379    
380     def sweep_all(self, lastSweepTime=None):
381         for cls in self._caches:
382             self.sweep(cls, lastSweepTime)
383    
384     def shutdown(self):
385         self.sweep_all()
386         if self.timer:
387             # .cancel does nothing if the thread is already finished.
388             self.timer.cancel()
389    
390     def add_property(self, cls, name):
391         self.sweep(cls)
392         if self.nextstore:
393             self.nextstore.add_property(cls, name)
394    
395     def drop_property(self, cls, name):
396         self.sweep(cls)
397         if self.nextstore:
398             self.nextstore.drop_property(cls, name)
399    
400     def rename_property(self, cls, oldname, newname):
401         self.sweep(cls)
402         if self.nextstore:
403             self.nextstore.rename_property(cls, oldname, newname)
404
405
406 class BurnedProxy(CachingProxy):
407     """A Caching Proxy Storage Manager which recalls and caches ALL Units.
408     
409     The big performance difference for a burned cache is that, once _any_
410     Units have been recalled, further recalls won't hit the next store.
411     Notice we didn't say "performance _benefit_" ;) That would depend to
412     a great extent on the proxied store.
413     """
414    
415     def _icached_units(self, cache, expr=None):
416         if expr is None:
417             for data in cache.itervalues():
418                 yield pickle.loads(data)
419         else:
420             for data in cache.itervalues():
421                 unit = pickle.loads(data)
422                 if expr(unit):
423                     yield unit
424    
425     def recall(self, unitClass, expr=None):
426         """Return a Unit iterator."""
427         lock = self._get_lock(unitClass)
428         try:
429             cache = self._caches[unitClass]
430            
431             if (not cache) and self.nextstore:
432                 # Read ALL units from storage.
433                 now = datetime.datetime.now()
434                 for unit in self.nextstore.recall(unitClass, None):
435                     id = unit.identity()
436                     cache[id] = pickle.dumps(unit)
437                     self._recallTimes[id] = now
438            
439             return self._icached_units(cache, expr)
440         finally:
441             lock.release()
442
443
444 class Adapter(object):
445     """Transform values according to their type. Must be subclassed.
446     
447     In order for your subclass to work, you need to provide functions
448     named 'coerce_' + type, where 'type' refers to the type you wish to
449     support. Replace dots in the type name by underscores. For example,
450     to coerce datetime.date objects, you must provide a function in your
451     subclass named 'coerce_datetime_date'. For builtins, do not include
452     the module name '__builtin__', just use 'coerce_unicode', for example.
453     
454     If you try to coerce a value for whose type you have not provided a
455     coercion function, a TypeError is raised.
456     
457     When writing Adapters for Storage Managers, you should at least
458     provide coerce_* functions for: bool, dict, float, int, list, long,
459     NoneType, str, tuple, and unicode. For most applications, you should
460     also provide:
461         datetime.datetime
462                 .date
463                 .time
464                 .timedelta
465     and:
466         fixedpoint.FixedPoint or decimal.Decimal (preferably both)
467     """
468    
469     def coerce(self, value, valuetype=None):
470         """coerce(value, valuetype=None) -> value, coerced by valuetype."""
471         if valuetype is None:
472             valuetype = type(value)
473        
474         mod = valuetype.__module__
475         if mod == "__builtin__":
476             xform = "coerce_%s" % valuetype.__name__
477         else:
478             xform = "coerce_%s_%s" % (mod, valuetype.__name__)
479         xform = xform.replace(".", "_")
480         try:
481