Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/branches/crazycache/dejavu/storage/storeram.py

Revision 543 (checked in by fumanchu, 1 year ago)

Changed to unit(cls, **kwargs) sig throughout. Also changed sandbox.recall to include order, limit, and offset args; removed **kwargs from recall, but the 'expr' arg may now be a dict for all x/multi/recall methods throughout. Removed inheritance code. Added a 'sum' method to StorageManager?.

  • Property svn:eol-style set to native
Line 
1 try:
2     import cPickle as pickle
3 except ImportError:
4     import pickle
5
6 import thread
7
8 import dejavu
9 from dejavu import errors, logic, logflags, storage
10
11
12 class RAMStorage(storage.StorageManager):
13     """A Storage Manager which keeps all data in RAM."""
14    
15     def __init__(self, allOptions={}):
16         storage.StorageManager.__init__(self, allOptions)
17         self._caches = {}       # id: pickled Unit
18         self._cache_locks = {}
19    
20     def _get_lock(self, cls):
21         lock = self._cache_locks[cls]
22         lock.acquire(True)
23         return lock
24    
25     def unit(self, cls, **kwargs):
26         """A single Unit which matches the given kwargs, else None.
27         
28         The first Unit matching the kwargs is returned; if no Units match,
29         None is returned.
30         """
31         if self.logflags & logflags.RECALL:
32             self.log(logflags.RECALL.message(cls, kwargs))
33        
34         lock = self._get_lock(cls)
35         try:
36             cache = self._caches[cls]
37            
38             if set(kwargs.keys()) == set(cls.identifiers):
39                 # Looking up a Unit by its identifiers.
40                 # Skip grabbing the cached class index (a HUGE optimization).
41                 id = tuple([kwargs[k] for k in cls.identifiers])
42                 pickledUnit = cache.get(id)
43                 if pickledUnit is None:
44                     return None
45                 else:
46                     u = pickle.loads(pickledUnit)
47                     u.cleanse()
48                     return u
49            
50             try:
51                 expr = logic.filter(**kwargs)
52                 return self._xrecall_inner(cache, cache.keys(), expr
53                                            ).next()[0]
54             except StopIteration:
55                 return None
56         finally:
57             lock.release()
58    
59     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
60         """Yield units of the given cls which match the given expr."""
61         if isinstance(classes, dejavu.UnitJoin):
62             return self._xmultirecall(classes, expr, order=order,
63                                       limit=limit, offset=offset)
64        
65         cls = classes
66         if self.logflags & logflags.RECALL:
67             self.log(logflags.RECALL.message(cls, expr))
68        
69         lock = self._get_lock(cls)
70         try:
71             cache = self._caches[cls]
72             ids = cache.keys()
73             data = self._xrecall_inner(cache, ids, expr)
74             return self._paginate(data, order, limit, offset, single=True)
75         finally:
76             lock.release()
77    
78     def _xrecall_inner(self, cache, ids, expr=None):
79         """Private helper for self.xrecall."""
80         for id in ids:
81             pickledUnit = cache.get(id)
82             if pickledUnit is not None:
83                 unit = pickle.loads(pickledUnit)
84                 if expr is None or expr(unit):
85                     unit.cleanse()
86                     # Must yield a sequence for use in _paginate.
87                     yield (unit,)
88    
89     def save(self, unit, forceSave=False):
90         """save(unit, forceSave=False). -> Update storage from unit's data."""
91         if self.logflags & logflags.SAVE:
92             self.log(logflags.SAVE.message(unit, forceSave))
93        
94         if forceSave or unit.dirty():
95             lock = self._get_lock(unit.__class__)
96             try:
97                 cache = self._caches[unit.__class__]
98                 if unit.identifiers:
99                     # Replace the entire value to get around writeback issues.
100                     # See the docs on "shelve" for more info.
101                     key = unit.identity()
102                 else:
103                     # This class has no identifiers, so hash the whole dict.
104                     key = pickle.dumps(unit._properties)
105                
106                 # Cleanse first because pickle state
107                 # includes _initial_property_hash.
108                 unit.cleanse()
109                 cache[key] = pickle.dumps(unit)
110             finally:
111                 lock.release()
112    
113     def destroy(self, unit):
114         """Delete the unit."""
115         if self.logflags & logflags.DESTROY:
116             self.log(logflags.DESTROY.message(unit))
117        
118         cls = unit.__class__
119         lock = self._get_lock(cls)
120         try:
121             cache = self._caches[cls]
122             if unit.identifiers:
123                 id = unit.identity()
124             else:
125                 # This class has no identifiers, so hash the whole dict.
126                 id = pickle.dumps(unit._properties)
127            
128             try:
129                 del cache[id]
130             except KeyError:
131                 pass
132         finally:
133             lock.release()
134    
135     def reserve(self, unit):
136         """Reserve storage space for the Unit."""
137         if unit.identifiers:
138             cls = unit.__class__
139             lock = self._get_lock(cls)
140             try:
141                 cache = self._caches[cls]
142                 if not unit.sequencer.valid_id(unit.identity()):
143                     unit.sequencer.assign(unit, cache.keys())
144                 # Pickle the Unit to discard extraneous attributes,
145                 # and avoid identity issues.
146                 # Cleanse first because pickle state
147                 # includes _initial_property_hash.
148                 unit.cleanse()
149                 cache[unit.identity()] = pickle.dumps(unit)
150             finally:
151                 lock.release()
152         else:
153             # This class has no identifiers, so skip reserve and wait for save.
154             pass
155        
156         # Usually we log ASAP, but here we log after
157         # the unit has had a chance to get an auto ID.
158         if self.logflags & logflags.RESERVE:
159             self.log(logflags.RESERVE.message(unit))
160    
161     def shutdown(self, conflicts='error'):
162         """Shut down all connections to internal storage.
163         
164         conflicts: see errors.conflict.
165         """
166         self._caches = {}
167         self._cache_locks = {}
168    
169     def create_database(self, conflicts='error'):
170         """Create internal structures for the entire database.
171         
172         conflicts: see errors.conflict.
173         """
174         pass
175    
176     def drop_database(self, conflicts='error'):
177         """Destroy internal structures for the entire database.
178         
179         conflicts: see errors.conflict.
180         """
181         self.shutdown(conflicts=conflicts)
182    
183     def create_storage(self, cls, conflicts='error'):
184         """Create internal structures for the given class.
185         
186         conflicts: see errors.conflict.
187         """
188         if self.logflags & logflags.DDL:
189             self.log(logflags.DDL.message("create storage %s" % cls))
190        
191         if cls in self._caches or cls in self._cache_locks:
192             errors.conflict(conflicts, "Class %r already has storage." % cls)
193        
194         self._caches[cls] = {}
195         self._cache_locks[cls] = thread.allocate_lock()
196    
197     def has_storage(self, cls):
198         """If storage structures exist for the given class, return True."""
199         return cls in self._caches
200    
201     def drop_storage(self, cls, conflicts='error'):
202         """Destroy internal structures for the given class.
203         
204         conflicts: see errors.conflict.
205         """
206         if self.logflags & logflags.DDL:
207             self.log(logflags.DDL.message("drop storage %s" % cls))
208        
209         try:
210             del self._caches[cls]
211             del self._cache_locks[cls]
212         except KeyError, x:
213             errors.conflict(conflicts, str(x))
214    
215     def add_property(self, cls, name, conflicts='error'):
216         """Add internal structures for the given property.
217         
218         conflicts: see errors.conflict.
219         """
220         if self.logflags & logflags.DDL:
221             self.log(logflags.DDL.message("add property %s %s" % (cls, name)))
222        
223         try:
224             cache = self._caches[cls]
225             lock = self._get_lock(cls)
226         except KeyError, x:
227             errors.conflict(conflicts, str(x))
228        
229         try:
230             for id, pickledUnit in cache.items():
231                 unit = pickle.loads(pickledUnit)
232                 unit._properties[name] = None
233                 unit.cleanse()
234                 cache[id] = pickle.dumps(unit)
235         finally:
236             lock.release()
237    
238     def has_property(self, cls, name):
239         """If storage structures exist for the given property, return True."""
240         try:
241             cache = self._caches[cls]
242             lock = self._get_lock(cls)
243         except KeyError, x:
244             errors.conflict(conflicts, str(x))
245        
246         try:
247             if not cache:
248                 # We don't have any items, so there's nothing to
249                 # declare as 'unprepared'.
250                 return True
251            
252             for id, pickledUnit in cache.iteritems():
253                 unit = pickle.loads(pickledUnit)
254                 return name in unit._properties
255         finally:
256             lock.release()
257    
258     def drop_property(self, cls, name, conflicts='error'):
259         """Destroy internal structures for the given property.
260         
261         conflicts: see errors.conflict.
262         """
263         if self.logflags & logflags.DDL:
264             self.log(logflags.DDL.message("drop property %s %s" % (cls, name)))
265        
266         try:
267             cache = self._caches[cls]
268             lock = self._get_lock(cls)
269         except KeyError, x:
270             errors.conflict(conflicts, str(x))
271        
272         try:
273             for id, pickledUnit in cache.items():
274                 unit = pickle.loads(pickledUnit)
275                 del unit._properties[name]
276                 unit.cleanse()
277                 cache[id] = pickle.dumps(unit)
278         finally:
279             lock.release()
280    
281     def rename_property(self, cls, oldname, newname, conflicts='error'):
282         """Rename internal structures for the given property.
283         
284         conflicts: see errors.conflict.
285         """
286         if self.logflags & logflags.DDL:
287             self.log(logflags.DDL.message("rename property %s from %s to %s"
288                                           % (cls, oldname, newname)))
289        
290         try:
291             cache = self._caches[cls]
292             lock = self._get_lock(cls)
293         except KeyError, x:
294             errors.conflict(conflicts, str(x))
295        
296         try:
297             for id, pickledUnit in cache.items():
298                 unit = pickle.loads(pickledUnit)
299                 unit._properties[newname] = unit._properties[oldname]
300                 del unit._properties[oldname]
301                 unit.cleanse()
302                 cache[id] = pickle.dumps(unit)
303         finally:
304             lock.release()
305    
306    
307     #                   Extra methods for use as a cache                   #
308    
309     def cachelen(self, cls):
310         return len(self._caches.get(cls, {}))
311    
312     def cached_units(self, cls):
313         return [pickle.loads(data) for data
314                 in self._caches.get(cls, {}).itervalues()]
315    
316     def flush(self, cls):
317         """Dump all objects of the given class."""
318         lock = self._get_lock(cls)
319         try:
320             self._caches[cls] = {}
321         finally:
322             lock.release()
323
Note: See TracBrowser for help on using the browser.