Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/__init__.py

Revision 21 (checked in by fumanchu, 9 years ago)

1. Removed Unit.temporary
2. Added Unit.on_recall() with Sandbox support.
3. Engines etc replaced temporary with Expiration, permanent() with immortalize().

Line 
1 """Storage Managers for Dejavu."""
2
3 import datetime
4 import threading
5 import thread
6 import recur
7 try:
8     import cPickle as pickle
9 except ImportError:
10     import pickle
11
12
13 class StorageManager(object):
14     """A Manager base class for storing and retrieving Units.
15     
16     The base StorageManager class doesn't actually store anything;
17     it needs to be subclassed.
18     """
19    
20     def __init__(self, name, arena, allOptions={}):
21         self.name = name
22         self.arena = arena
23         self.shutdownOrder = int(allOptions.get('Shutdown Order', '0'))
24    
25     def recall(self, unitClass, expr=None):
26         """Return an iterable object which will populate Units."""
27         raise NotImplementedError
28    
29     def save(self, unit, forceSave=False):
30         """Store the object's data{} dictionary."""
31         raise NotImplementedError
32    
33     def destroy(self, unit):
34         """Delete the object."""
35         raise NotImplementedError
36    
37     def create_storage(self, unitClass):
38         raise NotImplementedError
39    
40     def reserve(self, unit):
41         """Reserve storage space for the Unit."""
42         raise AttributeError
43    
44     def shutdown(self):
45         pass
46
47
48 class ProxyStorage(StorageManager):
49     """A Storage Manager which passes calls to another Storage Manager."""
50    
51     nextstore = None
52    
53     def __init__(self, name, arena, allOptions={}):
54         StorageManager.__init__(self, name, arena, allOptions)
55        
56         nextstore = allOptions.get('Next Store')
57         if nextstore:
58             self.nextstore = arena.stores[nextstore]
59    
60     def recall(self, unitClass, expr=None):
61         if self.nextstore:
62             for unit in self.nextstore.recall(unitClass, expr):
63                 yield unit
64    
65     def save(self, unit, forceSave=False):
66         """Store the unit."""
67         if self.nextstore:
68             self.nextstore.save(unit, forceSave)
69    
70     def destroy(self, unit):
71         """Delete the unit."""
72         if self.nextstore:
73             self.nextstore.destroy(unit)
74    
75     def create_storage(self, unitClass):
76         if self.nextstore:
77             self.nextstore.create_storage(unitClass)
78    
79     def reserve(self, unit):
80         """Reserve storage space for the Unit."""
81         if self.nextstore:
82             self.nextstore.reserve(unit)
83
84
85 class CachingProxy(ProxyStorage):
86     """A Proxy Storage Manager which recalls and keeps Units in memory."""
87    
88     def __init__(self, name, arena, allOptions={}):
89         ProxyStorage.__init__(self, name, arena, allOptions)
90        
91         self._caches = {}       # id: pickled Unit
92         self._cache_locks = {}
93         self._recallTimes = {}
94        
95         # Create and motivate a worker to sweep out idle Units.
96         lifetime = allOptions.get('Lifetime', '')
97         if lifetime:
98             self.recurrence = recur.Recurrence(None, lifetime)
99             if self.recurrence:
100                 # Throw away the first occurrence value,
101                 # which is almost always .now()
102                 self.recurrence.next()
103             self.lastrun = datetime.datetime.now()
104             self.active = True
105             threading.Timer(self.recurrence.interval(), self._cycle).start()
106    
107     def _cycle(self):
108         """Run the scheduled work."""
109         threading.Timer(self.recurrence.interval(), self._cycle).start()
110         if self.active:
111             self.sweep_all(self.lastrun)
112             self.lastrun = datetime.datetime.now()
113    
114     def cachelen(self, cls):
115         return len(self._caches.get(cls, {}))
116    
117     def cached_units(self, cls):
118         return [pickle.loads(data) for data
119                 in self._caches.get(cls, {}).itervalues()]
120    
121     def _get_lock(self, unitClass):
122         if unitClass not in self._caches:
123             self._caches[unitClass] = {}
124             self._cache_locks[unitClass] = thread.allocate_lock()
125         lock = self._cache_locks[unitClass]
126         lock.acquire(True)
127         return lock
128    
129     def recall(self, unitClass, expr=None):
130         """Return a Unit iterator."""
131         currentTime = datetime.datetime.now()
132         lock = self._get_lock(unitClass)
133         try:
134             cache = self._caches[unitClass]
135             seen = {}
136            
137             # Run through our cache first. Hopefully, this will save us
138             # calling expr.evaluate twice for each unit.
139             for id, pickledUnit in cache.iteritems():
140                 unit = pickle.loads(pickledUnit)
141                 if expr is None or expr.evaluate(unit):
142                     seen[id] = unit
143            
144             if self.nextstore:
145                 for unit in self.nextstore.recall(unitClass, expr):
146                     if unit.ID not in seen:
147                         seen[unit.ID] = unit
148                     if unit.ID not in cache:
149                         # Pickle the Unit to discard extraneous attributes,
150                         # and avoid identity issues.
151                         cache[unit.ID] = pickle.dumps(unit)
152                         self._recallTimes[unit.ID] = currentTime
153            
154             return iter(seen.values())
155         finally:
156             lock.release()
157    
158     def save(self, unit, forceSave=False):
159         """Store the unit."""
160         # Defer saves to sweep().
161         pass
162    
163     def destroy(self, unit):
164         """Delete the unit."""
165         unitClass = unit.__class__
166         lock = self._get_lock(unitClass)
167         try:
168             cache = self._caches[unitClass]
169             if self.nextstore:
170                 self.nextstore.destroy(unit)
171             if unit.ID in cache:
172                 del cache[unit.ID]
173                 del self._recallTimes[unit.ID]
174         finally:
175             lock.release()
176    
177     def reserve(self, unit):
178         """Reserve storage space for the Unit."""
179         unitClass = unit.__class__
180         lock = self._get_lock(unitClass)
181         try:
182             cache = self._caches[unitClass]
183             if self.nextstore:
184                 self.nextstore.reserve(unit)
185             # Pickle the Unit to discard extraneous attributes,
186             # and avoid identity issues.
187             cache[unit.ID] = pickle.dumps(unit)
188             self._recallTimes[unit.ID] = datetime.datetime.now()
189         finally:
190             lock.release()
191    
192     def sweep(self, unitClass, lastSweepTime=None):
193         lock = self._get_lock(unitClass)
194         try:
195             cache = self._caches[unitClass]
196             for id in cache.keys():
197                 lastRecall = self._recallTimes.setdefault(id, None)
198                 if (lastRecall is None or lastSweepTime is None
199                     or lastRecall < lastSweepTime):
200                     unit = pickle.loads(cache[id])
201                     if unit.dirty:
202                         self.nextstore.save(unit)
203                    
204                     del cache[id]
205                     del self._recallTimes[id]
206         finally:
207             lock.release()
208    
209     def sweep_all(self, lastSweepTime=None):
210         for cls in self._caches:
211             self.sweep(cls, lastSweepTime)
212    
213     def shutdown(self):
214         self.sweep_all()
215
216
217 class BurnedProxy(CachingProxy):
218     """A Caching Proxy Storage Manager which recalls and caches ALL Units.
219     
220     The big performance difference for a burned cache is that, once _any_
221     Units have been recalled, further recalls won't hit the next store.
222     Notice we didn't say "performance _benefit_" ;) That would depend to
223     a great extent on the proxied store.
224     """
225    
226     def _icached_units(self, cls, expr=None):
227         cache = self._caches.get(cls, {})
228         if expr is None:
229             for data in cache.itervalues():
230                 yield pickle.loads(data)
231         else:
232             for data in cache.itervalues():
233                 unit = pickle.loads(data)
234                 if expr.evaluate(unit):
235                     yield unit
236    
237     def recall(self, unitClass, expr=None):
238         """Return a Unit iterator."""
239         lock = self._get_lock(unitClass)
240         try:
241             cache = self._caches[unitClass]
242            
243             if (not cache) and self.nextstore:
244                 # Read ALL units from storage.
245                 now = datetime.datetime.now()
246                 for unit in self.nextstore.recall(unitClass, None):
247                     cache[unit.ID] = pickle.dumps(unit)
248                     self._recallTimes[unit.ID] = now
249            
250             return self._icached_units(unitClass, expr)
251         finally:
252             lock.release()
253
254
255 class Adapter(object):
256     """Transform values according to their type. Must be subclassed.
257     
258     In order for your subclass to work, you need to provide functions
259     named 'coerce_' + type, where 'type' refers to the type you wish to
260     support. Replace dots in the type name by underscores. For example,
261     to coerce datetime.date objects, you must provide a function in your
262     subclass named 'coerce_datetime_date'. For builtins, do not include
263     the module name '__builtin__', just use 'coerce_unicode', for example.
264     
265     If you try to coerce a value for whose type you have not provided a
266     coercion function, a TypeError is raised.
267     
268     When writing Adapters for Storage Managers, you should at least
269     provide coerce_* functions for: bool, dict, float, int, list, long,
270     NoneType, str, tuple, and unicode. For most applications, you should
271     also provide:
272         datetime.datetime
273                 .date
274                 .time
275                 .timedelta
276     and:
277         fixedpoint.FixedPoint or decimal.Decimal (preferably both)
278     """
279    
280     def coerce(self, value, valuetype=None):
281         """coerce(value, valuetype=None) -> value, coerced by valuetype."""
282         if valuetype is None:
283             valuetype = type(value)
284        
285         mod = valuetype.__module__
286         if mod == "__builtin__":
287             xform = "coerce_%s" % valuetype.__name__
288         else:
289             xform = "coerce_%s_%s" % (mod, valuetype.__name__)
290         xform = xform.replace(".", "_")
291         try:
292             xform = getattr(self, xform)
293         except AttributeError:
294             raise TypeError("'%s' is not handled by %s." %
295                             (valuetype, self.__class__))
296         return xform(value)
297
Note: See TracBrowser for help on using the browser.