Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/__init__.py

Revision 106 (checked in by fumanchu, 7 years ago)

Fixed a failing test for CachingProxy? and BurnedProxy?. Both were retrieving some units from storage that they should not have (because they were testing stale data).

  • Property svn:eol-style set to native
Line 
1 """Storage Managers for Dejavu."""
2
3 import re
4 import datetime
5 import threading
6 import thread
7 try:
8     import cPickle as pickle
9 except ImportError:
10     import pickle
11
12 from dejavu import logic
13
14
15 class StorageManager(object):
16     """A Manager base class for storing and retrieving Units.
17     
18     The base StorageManager class doesn't actually store anything;
19     it needs to be subclassed.
20     """
21    
22     def __init__(self, name, arena, allOptions={}):
23         self.name = name
24         self.arena = arena
25         self.classnames = [x.strip() for x
26                            in allOptions.get('Units', '').split(",")
27                            if x.strip()]
28         self.shutdownOrder = int(allOptions.get('Shutdown Order', '0'))
29    
30     def recall(self, unitClass, expr=None):
31         """Return an iterable object which will populate Units."""
32         raise NotImplementedError
33    
34     def save(self, unit, forceSave=False):
35         """Store the object's data{} dictionary."""
36         raise NotImplementedError
37    
38     def destroy(self, unit):
39         """Delete the object."""
40         raise NotImplementedError
41    
42     def create_storage(self, unitClass):
43         pass
44    
45     def reserve(self, unit):
46         """Reserve storage space for the Unit."""
47         raise NotImplementedError
48    
49     def shutdown(self):
50         pass
51    
52     def view(self, cls, attrs, expr=None):
53         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
54         raise NotImplementedError
55    
56     def distinct(self, cls, attrs, expr=None):
57         """distinct(cls, attrs, expr=None) -> Distinct values for given attributes."""
58         raise NotImplementedError
59    
60     def multirecall(self, *pairs):
61         """multirecall(*pairs) -> Full inner join units for each (cls, expr) pair."""
62         raise NotImplementedError
63
64
65 class ProxyStorage(StorageManager):
66     """A Storage Manager which passes calls to another Storage Manager."""
67    
68     nextstore = None
69    
70     def __init__(self, name, arena, allOptions={}):
71         StorageManager.__init__(self, name, arena, allOptions)
72        
73         nextstore = allOptions.get('Next Store')
74         if nextstore:
75             self.nextstore = arena.stores[nextstore]
76    
77     def recall(self, unitClass, expr=None):
78         if self.nextstore:
79             for unit in self.nextstore.recall(unitClass, expr):
80                 yield unit
81    
82     def save(self, unit, forceSave=False):
83         """Store the unit."""
84         if self.nextstore:
85             self.nextstore.save(unit, forceSave)
86    
87     def destroy(self, unit):
88         """Delete the unit."""
89         if self.nextstore:
90             self.nextstore.destroy(unit)
91    
92     def create_storage(self, unitClass):
93         if self.nextstore:
94             self.nextstore.create_storage(unitClass)
95    
96     def reserve(self, unit):
97         """Reserve storage space for the Unit."""
98         if self.nextstore:
99             self.nextstore.reserve(unit)
100    
101     def view(self, cls, attrs, expr=None):
102         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
103         if self.nextstore:
104             return self.nextstore.view(cls, attrs, expr)
105    
106     def distinct(self, cls, attrs, expr=None):
107         """distinct(cls, attrs, expr=None) -> Distinct values for given attributes."""
108         if self.nextstore:
109             return self.nextstore.distinct(cls, attrs, expr)
110    
111     def multirecall(self, *pairs):
112         """multirecall(*pairs) -> Full inner join units for each (cls, expr) pair."""
113         if self.nextstore:
114             return self.nextstore.multirecall(*pairs)
115
116
117 class CachingProxy(ProxyStorage):
118     """A Proxy Storage Manager which recalls and keeps Units in memory."""
119    
120     def __init__(self, name, arena, allOptions={}):
121         ProxyStorage.__init__(self, name, arena, allOptions)
122        
123         self._caches = {}       # id: pickled Unit
124         self._cache_locks = {}
125         self._recallTimes = {}
126        
127         self.timer = None
128        
129         # Create and motivate a worker to sweep out idle Units.
130         lifetime = allOptions.get('Lifetime', '')
131         if lifetime:
132             import recur
133             self.recurrence = recur.Recurrence(None, lifetime)
134             if self.recurrence:
135                 # Throw away the first occurrence value,
136                 # which is almost always .now()
137                 self.recurrence.next()
138             self.lastrun = datetime.datetime.now()
139             self.active = True
140             self.timer = threading.Timer(self.recurrence.interval(), self._cycle)
141             self.timer.start()
142    
143     def _cycle(self):
144         """Run the scheduled work."""
145         self.timer = threading.Timer(self.recurrence.interval(), self._cycle)
146         self.timer.start()
147         if self.active:
148             self.sweep_all(self.lastrun)
149             self.lastrun = datetime.datetime.now()
150    
151     def cachelen(self, cls):
152         return len(self._caches.get(cls, {}))
153    
154     def cached_units(self, cls):
155         return [pickle.loads(data) for data
156                 in self._caches.get(cls, {}).itervalues()]
157    
158     def _get_lock(self, unitClass):
159         if unitClass not in self._caches:
160             self._caches[unitClass] = {}
161             self._cache_locks[unitClass] = thread.allocate_lock()
162         lock = self._cache_locks[unitClass]
163         lock.acquire(True)
164         return lock
165    
166     def recall(self, unitClass, expr=None):
167         """Return a Unit iterator."""
168         currentTime = datetime.datetime.now()
169         lock = self._get_lock(unitClass)
170         try:
171             cache = self._caches[unitClass]
172             matches = {}
173            
174             # Run through our cache first. Hopefully, this will save us
175             # calling expr.evaluate twice for each unit.
176             for id, pickledUnit in cache.iteritems():
177                 unit = pickle.loads(pickledUnit)
178                 if expr is None or expr.evaluate(unit):
179                     matches[id] = unit
180            
181             if self.nextstore:
182                 for unit in self.nextstore.recall(unitClass, expr):
183                     if unit.ID not in cache:
184                         # Pickle the Unit to discard extraneous attributes,
185                         # and avoid identity issues.
186                         cache[unit.ID] = pickle.dumps(unit)
187                         self._recallTimes[unit.ID] = currentTime
188                        
189                         # Only add to matches if it wasn't already in our
190                         # cache (because stored units may have stale data).
191                         if unit.ID not in matches:
192                             matches[unit.ID] = unit
193            
194             return iter(matches.values())
195         finally:
196             lock.release()
197    
198     def save(self, unit, forceSave=False):
199         """Store the unit."""
200         # Don't call nextstore.save(). Defer that to sweep().
201        
202         # Don't check .dirty()!! If a unit changes from state A to
203         # to state B, then back again to *exactly* state A, dirty()
204         # will be False, and the cached data will stay in state B.
205 ##        if unit.dirty():
206        
207         lock = self._get_lock(unit.__class__)
208         try:
209             cache = self._caches[unit.__class__]
210             cache[unit.ID] = pickle.dumps(unit)
211         finally:
212             lock.release()
213    
214     def destroy(self, unit):
215         """Delete the unit."""
216         unitClass = unit.__class__
217         lock = self._get_lock(unitClass)
218         try:
219             cache = self._caches[unitClass]
220             if self.nextstore:
221                 self.nextstore.destroy(unit)
222             try:
223                 del cache[unit.ID]
224             except KeyError:
225                 pass
226             try:
227                 del self._recallTimes[unit.ID]
228             except KeyError:
229                 pass
230         finally:
231             lock.release()
232    
233     def view(self, cls, attrs, expr=None):
234         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
235        
236         if expr is None:
237             expr = logic.Expression(lambda x: True)
238        
239         lock = self._get_lock(cls)
240         try:
241             cache = self._caches[cls]
242             seen = []
243            
244             # Run through our cache first. Hopefully, this will save us
245             # calling expr.evaluate twice for each unit.
246             for id, pickledUnit in cache.iteritems():
247                 unit = pickle.loads(pickledUnit)
248                 if expr is None or expr.evaluate(unit):
249                     seen.append(tuple([getattr(unit, f) for f in attrs]))
250            
251             if self.nextstore:
252                
253                 # Add the ID attribute if not present. This is necessary to
254                 # avoid duplicating objects which are already in our cache.
255                 fields = list(attrs)
256                 if "ID" not in fields:
257                     fields.append("ID")
258                 index_of_id = fields.index("ID")
259                
260                 for row in self.nextstore.view(cls, fields, expr):
261                     if row[index_of_id] not in cache:
262                         if "ID" not in attrs:
263                             # Remove the ID column from the tuple.
264                             row = row[:len(row) - 1]
265                         seen.append(row)
266             return iter(seen)
267         finally:
268             lock.release()
269    
270     def distinct(self, cls, attrs, expr=None):
271         """distinct(cls, attrs, expr=None) -> Distinct values for given attributes."""
272        
273         if expr is None:
274             expr = logic.Expression(lambda x: True)
275        
276         # Rather than repeat the logic in recall() where we mix cached
277         # and uncached Units, just call recall itself.
278         distvals = {}
279         for unit in self.recall(cls, expr):
280             val = tuple([getattr(unit, f) for f in attrs])
281             distvals[val] = None
282         return distvals.keys()
283    
284     def reserve(self, unit):
285         """Reserve storage space for the Unit."""
286         unitClass = unit.__class__
287         lock = self._get_lock(unitClass)
288         try:
289             cache = self._caches[unitClass]
290             if self.nextstore:
291                 self.nextstore.reserve(unit)
292             else:
293                 if unit.ID is None:
294                     unit.ID = unit.sequencer.next(cache.keys())
295             # Pickle the Unit to discard extraneous attributes,
296             # and avoid identity issues.
297             cache[unit.ID] = pickle.dumps(unit)
298             self._recallTimes[unit.ID] = datetime.datetime.now()
299         finally:
300             lock.release()
301    
302     def sweep(self, unitClass, lastSweepTime=None):
303         lock = self._get_lock(unitClass)
304         try:
305             cache = self._caches[unitClass]
306             for id in cache.keys():
307                 lastRecall = self._recallTimes.setdefault(id, None)
308                 if (lastRecall is None or lastSweepTime is None
309                     or lastRecall < lastSweepTime):
310                     unit = pickle.loads(cache[id])
311                     if unit.dirty():
312                         self.nextstore.save(unit)
313                    
314                     del cache[id]
315                     del self._recallTimes[id]
316         finally:
317             lock.release()
318    
319     def sweep_all(self, lastSweepTime=None):
320         for cls in self._caches:
321             self.sweep(cls, lastSweepTime)
322    
323     def shutdown(self):
324         self.sweep_all()
325         if self.timer:
326             # .cancel does nothing if the thread is already finished.
327             self.timer.cancel()
328
329
330 class BurnedProxy(CachingProxy):
331     """A Caching Proxy Storage Manager which recalls and caches ALL Units.
332     
333     The big performance difference for a burned cache is that, once _any_
334     Units have been recalled, further recalls won't hit the next store.
335     Notice we didn't say "performance _benefit_" ;) That would depend to
336     a great extent on the proxied store.
337     """
338    
339     def _icached_units(self, cache, expr=None):
340         if expr is None:
341             for data in cache.itervalues():
342                 yield pickle.loads(data)
343         else:
344             for data in cache.itervalues():
345                 unit = pickle.loads(data)
346                 if expr.evaluate(unit):
347                     yield unit
348    
349     def recall(self, unitClass, expr=None):
350         """Return a Unit iterator."""
351         lock = self._get_lock(unitClass)
352         try:
353             cache = self._caches[unitClass]
354            
355             if (not cache) and self.nextstore:
356                 # Read ALL units from storage.
357                 now = datetime.datetime.now()
358                 for unit in self.nextstore.recall(unitClass, None):
359                     cache[unit.ID] = pickle.dumps(unit)
360                     self._recallTimes[unit.ID] = now
361            
362             return self._icached_units(cache, expr)
363         finally:
364             lock.release()
365
366
367 class Adapter(object):
368     """Transform values according to their type. Must be subclassed.
369     
370     In order for your subclass to work, you need to provide functions
371     named 'coerce_' + type, where 'type' refers to the type you wish to
372     support. Replace dots in the type name by underscores. For example,
373     to coerce datetime.date objects, you must provide a function in your
374     subclass named 'coerce_datetime_date'. For builtins, do not include
375     the module name '__builtin__', just use 'coerce_unicode', for example.
376     
377     If you try to coerce a value for whose type you have not provided a
378     coercion function, a TypeError is raised.
379     
380     When writing Adapters for Storage Managers, you should at least
381     provide coerce_* functions for: bool, dict, float, int, list, long,
382     NoneType, str, tuple, and unicode. For most applications, you should
383     also provide:
384         datetime.datetime
385                 .date
386                 .time
387                 .timedelta
388     and:
389         fixedpoint.FixedPoint or decimal.Decimal (preferably both)
390     """
391    
392     def coerce(self, value, valuetype=None):
393         """coerce(value, valuetype=None) -> value, coerced by valuetype."""
394         if valuetype is None:
395             valuetype = type(value)
396        
397         mod = valuetype.__module__
398         if mod == "__builtin__":
399             xform = "coerce_%s" % valuetype.__name__
400         else:
401             xform = "coerce_%s_%s" % (mod, valuetype.__name__)
402         xform = xform.replace(".", "_")
403         try:
404             xform = getattr(self, xform)
405         except AttributeError:
406             raise TypeError("'%s' is not handled by %s." %
407                             (valuetype, self.__class__))
408         return xform(value)
409
410
411 class Version(object):
412    
413     def __init__(self, atoms):
414         if isinstance(atoms, basestring):
415             self.atoms = re.split(r'\W', atoms)
416         else:
417             self.atoms = [str(x) for x in atoms]
418    
419     def __str__(self):
420         return ".".join([str(x) for x in self.atoms])
421    
422     def __cmp__(self, other):
423         index = 0
424         while index < len(self.atoms) and index < len(other.atoms):
425             mine, theirs = self.atoms[index], other.atoms[index]
426             if mine.isdigit() and theirs.isdigit():
427                 mine, theirs = int(mine), int(theirs)
428             if mine < theirs:
429                 return -1
430             if mine > theirs:
431                 return 1
432             index += 1
433         if index < len(other.atoms):
434             return -1
435         if index < len(self.atoms):
436             return 1
437         return 0
438
Note: See TracBrowser for help on using the browser.