Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/__init__.py

Revision 65 (checked in by fumanchu, 8 years ago)

CachingProxy?, BurnedProxy? now do distinct().

Line 
1 """Storage Managers for Dejavu."""
2
3 import re
4 import datetime
5 import threading
6 import thread
7 try:
8     import cPickle as pickle
9 except ImportError:
10     import pickle
11
12
13 class StorageManager(object):
14     """A Manager base class for storing and retrieving Units.
15     
16     The base StorageManager class doesn't actually store anything;
17     it needs to be subclassed.
18     """
19    
20     def __init__(self, name, arena, allOptions={}):
21         self.name = name
22         self.arena = arena
23         self.classnames = [x.strip() for x
24                            in allOptions.get('Units', '').split(",")
25                            if x.strip()]
26         self.shutdownOrder = int(allOptions.get('Shutdown Order', '0'))
27    
28     def recall(self, unitClass, expr=None):
29         """Return an iterable object which will populate Units."""
30         raise NotImplementedError
31    
32     def save(self, unit, forceSave=False):
33         """Store the object's data{} dictionary."""
34         raise NotImplementedError
35    
36     def destroy(self, unit):
37         """Delete the object."""
38         raise NotImplementedError
39    
40     def create_storage(self, unitClass):
41         pass
42    
43     def reserve(self, unit):
44         """Reserve storage space for the Unit."""
45         raise NotImplementedError
46    
47     def shutdown(self):
48         pass
49    
50     def distinct(self, cls, fields, expr=None):
51         """distinct(cls, fields, expr=None) -> Distinct values for given fields."""
52         raise NotImplementedError
53    
54     def multirecall(self, *pairs):
55         """multirecall(*pairs) -> Full inner join units for each (cls, expr) pair."""
56         raise NotImplementedError
57
58
59 class ProxyStorage(StorageManager):
60     """A Storage Manager which passes calls to another Storage Manager."""
61    
62     nextstore = None
63    
64     def __init__(self, name, arena, allOptions={}):
65         StorageManager.__init__(self, name, arena, allOptions)
66        
67         nextstore = allOptions.get('Next Store')
68         if nextstore:
69             self.nextstore = arena.stores[nextstore]
70    
71     def recall(self, unitClass, expr=None):
72         if self.nextstore:
73             for unit in self.nextstore.recall(unitClass, expr):
74                 yield unit
75    
76     def save(self, unit, forceSave=False):
77         """Store the unit."""
78         if self.nextstore:
79             self.nextstore.save(unit, forceSave)
80    
81     def destroy(self, unit):
82         """Delete the unit."""
83         if self.nextstore:
84             self.nextstore.destroy(unit)
85    
86     def create_storage(self, unitClass):
87         if self.nextstore:
88             self.nextstore.create_storage(unitClass)
89    
90     def reserve(self, unit):
91         """Reserve storage space for the Unit."""
92         if self.nextstore:
93             self.nextstore.reserve(unit)
94    
95     def distinct(self, cls, fields, expr=None):
96         """distinct(cls, fields, expr=None) -> Distinct values for given fields."""
97         if self.nextstore:
98             return self.nextstore.distinct(cls, fields, expr)
99    
100     def multirecall(self, *pairs):
101         """multirecall(*pairs) -> Full inner join units for each (cls, expr) pair."""
102         if self.nextstore:
103             return self.nextstore.multirecall(*pairs)
104
105
106 class CachingProxy(ProxyStorage):
107     """A Proxy Storage Manager which recalls and keeps Units in memory."""
108    
109     def __init__(self, name, arena, allOptions={}):
110         ProxyStorage.__init__(self, name, arena, allOptions)
111        
112         self._caches = {}       # id: pickled Unit
113         self._cache_locks = {}
114         self._recallTimes = {}
115        
116         self.timer = None
117        
118         # Create and motivate a worker to sweep out idle Units.
119         lifetime = allOptions.get('Lifetime', '')
120         if lifetime:
121             import recur
122             self.recurrence = recur.Recurrence(None, lifetime)
123             if self.recurrence:
124                 # Throw away the first occurrence value,
125                 # which is almost always .now()
126                 self.recurrence.next()
127             self.lastrun = datetime.datetime.now()
128             self.active = True
129             self.timer = threading.Timer(self.recurrence.interval(), self._cycle)
130             self.timer.start()
131    
132     def _cycle(self):
133         """Run the scheduled work."""
134         self.timer = threading.Timer(self.recurrence.interval(), self._cycle)
135         self.timer.start()
136         if self.active:
137             self.sweep_all(self.lastrun)
138             self.lastrun = datetime.datetime.now()
139    
140     def cachelen(self, cls):
141         return len(self._caches.get(cls, {}))
142    
143     def cached_units(self, cls):
144         return [pickle.loads(data) for data
145                 in self._caches.get(cls, {}).itervalues()]
146    
147     def _get_lock(self, unitClass):
148         if unitClass not in self._caches:
149             self._caches[unitClass] = {}
150             self._cache_locks[unitClass] = thread.allocate_lock()
151         lock = self._cache_locks[unitClass]
152         lock.acquire(True)
153         return lock
154    
155     def recall(self, unitClass, expr=None):
156         """Return a Unit iterator."""
157         currentTime = datetime.datetime.now()
158         lock = self._get_lock(unitClass)
159         try:
160             cache = self._caches[unitClass]
161             seen = {}
162            
163             # Run through our cache first. Hopefully, this will save us
164             # calling expr.evaluate twice for each unit.
165             for id, pickledUnit in cache.iteritems():
166                 unit = pickle.loads(pickledUnit)
167                 if expr is None or expr.evaluate(unit):
168                     seen[id] = unit
169            
170             if self.nextstore:
171                 for unit in self.nextstore.recall(unitClass, expr):
172                     if unit.ID not in seen:
173                         seen[unit.ID] = unit
174                     if unit.ID not in cache:
175                         # Pickle the Unit to discard extraneous attributes,
176                         # and avoid identity issues.
177                         cache[unit.ID] = pickle.dumps(unit)
178                         self._recallTimes[unit.ID] = currentTime
179            
180             return iter(seen.values())
181         finally:
182             lock.release()
183    
184     def save(self, unit, forceSave=False):
185         """Store the unit."""
186         # Don't call nextstore.save(). Defer that to sweep().
187        
188         # Don't check .dirty()!! If a unit changes from state A to
189         # to state B, then back again to *exactly* state A, dirty()
190         # will be False, and the cached unit will stay in state B.
191 ##        if unit.dirty():
192        
193         lock = self._get_lock(unit.__class__)
194         try:
195             cache = self._caches[unit.__class__]
196             cache[unit.ID] = pickle.dumps(unit)
197         finally:
198             lock.release()
199    
200     def destroy(self, unit):
201         """Delete the unit."""
202         unitClass = unit.__class__
203         lock = self._get_lock(unitClass)
204         try:
205             cache = self._caches[unitClass]
206             if self.nextstore:
207                 self.nextstore.destroy(unit)
208             try:
209                 del cache[unit.ID]
210             except KeyError:
211                 pass
212             try:
213                 del self._recallTimes[unit.ID]
214             except KeyError:
215                 pass
216         finally:
217             lock.release()
218    
219     def distinct(self, cls, fields, expr=None):
220         """distinct(cls, fields, expr=None) -> Distinct values for given fields."""
221        
222         if expr is None:
223             expr = logic.Expression(lambda x: True)
224        
225         # Rather than repeat the logic in recall() where we mix cached
226         # and uncached Units, just call recall itself.
227         distvals = {}
228         for unit in self.recall(cls, expr):
229             # Must use inner tuples for hashability in Sandbox.distinct()
230             val = tuple([getattr(unit, f) for f in fields])
231             distvals[val] = None
232         return distvals.keys()
233    
234     def reserve(self, unit):
235         """Reserve storage space for the Unit."""
236         unitClass = unit.__class__
237         lock = self._get_lock(unitClass)
238         try:
239             cache = self._caches[unitClass]
240             if self.nextstore:
241                 self.nextstore.reserve(unit)
242             else:
243                 if unit.ID is None:
244                     unit.ID = unit.sequencer.next(cache.keys())
245             # Pickle the Unit to discard extraneous attributes,
246             # and avoid identity issues.
247             cache[unit.ID] = pickle.dumps(unit)
248             self._recallTimes[unit.ID] = datetime.datetime.now()
249         finally:
250             lock.release()
251    
252     def sweep(self, unitClass, lastSweepTime=None):
253         lock = self._get_lock(unitClass)
254         try:
255             cache = self._caches[unitClass]
256             for id in cache.keys():
257                 lastRecall = self._recallTimes.setdefault(id, None)
258                 if (lastRecall is None or lastSweepTime is None
259                     or lastRecall < lastSweepTime):
260                     unit = pickle.loads(cache[id])
261                     if unit.dirty():
262                         self.nextstore.save(unit)
263                    
264                     del cache[id]
265                     del self._recallTimes[id]
266         finally:
267             lock.release()
268    
269     def sweep_all(self, lastSweepTime=None):
270         for cls in self._caches:
271             self.sweep(cls, lastSweepTime)
272    
273     def shutdown(self):
274         self.sweep_all()
275         if self.timer:
276             # .cancel does nothing if the thread is already finished.
277             self.timer.cancel()
278
279
280 class BurnedProxy(CachingProxy):
281     """A Caching Proxy Storage Manager which recalls and caches ALL Units.
282     
283     The big performance difference for a burned cache is that, once _any_
284     Units have been recalled, further recalls won't hit the next store.
285     Notice we didn't say "performance _benefit_" ;) That would depend to
286     a great extent on the proxied store.
287     """
288    
289     def _icached_units(self, cache, expr=None):
290         if expr is None:
291             for data in cache.itervalues():
292                 yield pickle.loads(data)
293         else:
294             for data in cache.itervalues():
295                 unit = pickle.loads(data)
296                 if expr.evaluate(unit):
297                     yield unit
298    
299     def recall(self, unitClass, expr=None):
300         """Return a Unit iterator."""
301         lock = self._get_lock(unitClass)
302         try:
303             cache = self._caches[unitClass]
304            
305             if (not cache) and self.nextstore:
306                 # Read ALL units from storage.
307                 now = datetime.datetime.now()
308                 for unit in self.nextstore.recall(unitClass, None):
309                     cache[unit.ID] = pickle.dumps(unit)
310                     self._recallTimes[unit.ID] = now
311            
312             return self._icached_units(cache, expr)
313         finally:
314             lock.release()
315
316
317 class Adapter(object):
318     """Transform values according to their type. Must be subclassed.
319     
320     In order for your subclass to work, you need to provide functions
321     named 'coerce_' + type, where 'type' refers to the type you wish to
322     support. Replace dots in the type name by underscores. For example,
323     to coerce datetime.date objects, you must provide a function in your
324     subclass named 'coerce_datetime_date'. For builtins, do not include
325     the module name '__builtin__', just use 'coerce_unicode', for example.
326     
327     If you try to coerce a value for whose type you have not provided a
328     coercion function, a TypeError is raised.
329     
330     When writing Adapters for Storage Managers, you should at least
331     provide coerce_* functions for: bool, dict, float, int, list, long,
332     NoneType, str, tuple, and unicode. For most applications, you should
333     also provide:
334         datetime.datetime
335                 .date
336                 .time
337                 .timedelta
338     and:
339         fixedpoint.FixedPoint or decimal.Decimal (preferably both)
340     """
341    
342     def coerce(self, value, valuetype=None):
343         """coerce(value, valuetype=None) -> value, coerced by valuetype."""
344         if valuetype is None:
345             valuetype = type(value)
346        
347         mod = valuetype.__module__
348         if mod == "__builtin__":
349             xform = "coerce_%s" % valuetype.__name__
350         else:
351             xform = "coerce_%s_%s" % (mod, valuetype.__name__)
352         xform = xform.replace(".", "_")
353         try:
354             xform = getattr(self, xform)
355         except AttributeError:
356             raise TypeError("'%s' is not handled by %s." %
357                             (valuetype, self.__class__))
358         return xform(value)
359
360
361 class Version(object):
362    
363     def __init__(self, atoms):
364         if isinstance(atoms, basestring):
365             self.atoms = re.split(r'\W', atoms)
366         else:
367             self.atoms = [str(x) for x in atoms]
368    
369     def __str__(self):
370         return ".".join([str(x) for x in self.atoms])
371    
372     def __cmp__(self, other):
373         index = 0
374         while index < len(self.atoms) and index < len(other.atoms):
375             mine, theirs = self.atoms[index], other.atoms[index]
376             if mine.isdigit() and theirs.isdigit():
377                 mine, theirs = int(mine), int(theirs)
378             if mine < theirs:
379                 return -1
380             if mine > theirs:
381                 return 1
382             index += 1
383         if index < len(other.atoms):
384             return -1
385         if index < len(self.atoms):
386             return 1
387         return 0
388
Note: See TracBrowser for help on using the browser.