Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/branches/crazycache/dejavu/storage/storeshelve.py

Revision 547 (checked in by fumanchu, 4 years ago)

Multithreading fix for shelve.

  • Property svn:eol-style set to native
Line 
1 try:
2     from bsddb._db import DBNoSuchFileError
3 except ImportError:
4     DBNoSuchFileError = object()
5
6 import anydbm
7 import os
8
9 try:
10     import cPickle as pickle
11 except ImportError:
12     import pickle
13
14 import shelve
15 import threading
16
17 import dejavu
18 from dejavu import errors, logic, logflags, storage
19
20
21 class StorageManagerShelve(storage.StorageManager):
22     """StoreManager to save and retrieve Units via stdlib shelve."""
23    
24     def __init__(self, allOptions={}):
25         storage.StorageManager.__init__(self, allOptions)
26        
27         self.recall_stride = allOptions.get('recall_stride', 100)
28        
29         path = allOptions['Path']
30         if not os.path.isabs(path):
31             path = os.path.join(os.getcwd(), path)
32         if not os.path.exists(path):
33             raise IOError(2, "No such directory: '%s'" % path)
34         self.shelvepath = path
35        
36         # A dictionary whose keys are classes and whose
37         # values are objects returned by shelve.open().
38         # Those values are dict-like objects with keys of type 'str'.
39         self.shelves = {}
40        
41         self.locks = {}
42    
43     def shutdown(self, conflicts='error'):
44         """Shut down all connections to internal storage.
45         
46         conflicts: see errors.conflict.
47         """
48         while self.shelves:
49             cls, shelf = self.shelves.popitem()
50             lock = self.get_lock(cls)
51             try:
52                 shelf.close()
53             finally:
54                 lock.release()
55         self.locks = {}
56    
57     def unit(self, cls, **kwargs):
58         """A single Unit which matches the given kwargs, else None.
59         
60         The first Unit matching the kwargs is returned; if no Units match,
61         None is returned.
62         """
63         if self.logflags & logflags.RECALL:
64             self.log(logflags.RECALL.message(cls, kwargs))
65        
66         if set(kwargs.keys()) == set(cls.identifiers):
67             # Looking up a Unit by its identifiers.
68             # Skip grabbing the entire shelf (a HUGE optimization).
69             key = self.key(tuple([kwargs[k] for k in cls.identifiers]))
70            
71             lock = self.get_lock(cls)
72             try:
73                 data = self.shelves[cls] or {}
74                 unitdict = data.get(key, None)
75             finally:
76                 lock.release()
77            
78             if unitdict is None:
79                 return None
80             else:
81                 # Set props directly to avoid __set__ and default overhead.
82                 unit = cls.__new__(cls)
83                 unit._zombie = True
84                 unit.__init__()
85                 unit._properties = unitdict
86                 unit.cleanse()
87                 return unit
88        
89         lock = self.get_lock(cls)
90         try:
91             data = self.shelves[cls] or {}
92             keys = data.keys()
93         finally:
94             lock.release()
95        
96         try:
97             expr = logic.filter(**kwargs)
98             return self._xrecall_inner(cls, expr, keys).next()[0]
99         except StopIteration:
100             return None
101    
102     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
103         """Yield units of the given cls which match the given expr."""
104         if isinstance(classes, dejavu.UnitJoin):
105             return self._xmultirecall(classes, expr, order=order,
106                                       limit=limit, offset=offset)
107        
108         cls = classes
109         if self.logflags & logflags.RECALL:
110             self.log(logflags.RECALL.message(cls, expr))
111        
112         lock = self.get_lock(cls)
113         try:
114             data = self.shelves[cls] or {}
115             keys = data.keys()
116         finally:
117             lock.release()
118         inner = self._xrecall_inner(cls, expr, keys)
119         return self._paginate(inner, order, limit, offset, single=True)
120    
121     def _xrecall_inner(self, cls, expr, keys):
122         """Yield units which match the expr."""
123         stride = self.recall_stride
124         for cursor in xrange(0, len(keys), stride):
125             keyset = keys[cursor:cursor+stride]
126             for unit in self._xrecall_inner_inner(cls, keyset):
127                 if expr is None or expr(unit):
128                     unit.cleanse()
129                     # Must yield a sequence for use in _paginate.
130                     yield (unit,)
131    
132     def _xrecall_inner_inner(self, cls, keyset):
133         """Grab a chunk of units."""
134         units = []
135         lock = self.get_lock(cls)
136         try:
137             data = self.shelves[cls]
138             if data:
139                 for key in keyset:
140                     unitdict = data.get(key, None)
141                     if unitdict is not None:
142                         # Set props directly to avoid __set__ and default overhead.
143                         unit = cls.__new__(cls)
144                         unit._zombie = True
145                         unit.__init__()
146                         unit._properties = unitdict
147                         units.append(unit)
148         finally:
149             lock.release()
150         return units
151    
152     def key(self, arg):
153         return pickle.dumps(arg)
154    
155     def reserve(self, unit):
156         """Reserve a persistent slot for unit."""
157         if unit.identifiers:
158             cls = unit.__class__
159             lock = self.get_lock(cls)
160             try:
161                 data = self.shelves[cls]
162                 if not unit.sequencer.valid_id(unit.identity()):
163                     ids = [[row[key] for key in unit.identifiers]
164                            for row in data.itervalues()]
165                     unit.sequencer.assign(unit, ids)
166                 data[self.key(unit.identity())] = unit._properties
167                 unit.cleanse()
168             finally:
169                 lock.release()
170         else:
171             # This class has no identifiers, so skip reserve and wait for save.
172             pass
173        
174         # Usually we log ASAP, but here we log after
175         # the unit has had a chance to get an auto ID.
176         if self.logflags & logflags.RESERVE:
177             self.log(logflags.RESERVE.message(unit))
178    
179     def save(self, unit, forceSave=False):
180         """Update storage from unit's data."""
181         if self.logflags & logflags.SAVE:
182             self.log(logflags.SAVE.message(unit, forceSave))
183        
184         if forceSave or unit.dirty():
185             cls = unit.__class__
186             lock = self.get_lock(cls)
187             try:
188                 data = self.shelves[cls]
189                 if unit.identifiers:
190                     key = self.key(unit.identity())
191                 else:
192                     # This class has no identifiers, so hash the whole dict.
193                     key = self.key(unit._properties)
194                 # Replace the entire value to get around writeback issues.
195                 # See the docs on "shelve" for more info.
196                 data[key] = unit._properties
197                 unit.cleanse()
198             finally:
199                 lock.release()
200    
201     def destroy(self, unit):
202         """Delete the unit."""
203         if self.logflags & logflags.DESTROY:
204             self.log(logflags.DESTROY.message(unit))
205        
206         cls = unit.__class__
207         lock = self.get_lock(cls)
208         try:
209             data = self.shelves[cls]
210             if unit.identifiers:
211                 del data[self.key(unit.identity())]
212             else:
213                 # This class has no identifiers, so hash the whole dict.
214                 del data[self.key(unit._properties)]
215         finally:
216             lock.release()
217    
218     def version(self):
219         import sys
220         return "Shelve version: %s" % sys.version
221    
222     ext = ".djv"
223    
224     def filename(self, cls):
225         """Return the full path for the given class."""
226         return os.path.join(self.shelvepath, cls.__name__ + self.ext)
227    
228     def insert_into(self, name, query, distinct=False):
229         """INSERT matching data INTO a new class and return the class."""
230         if not isinstance(query, dejavu.Query):
231             query = dejavu.Query(*query)
232        
233         newclass = self.make_class(name)
234         self._create_named_storage(newclass)
235        
236         if self.logflags & logflags.DDL:
237             self.log(logflags.DDL.message("create storage %r" % newcls))
238        
239         source = self.xview(query, distinct)
240         for row in source:
241             data = self.shelves[newclass]
242             if newclass.identifiers:
243                 key = unit.identity()
244             else:
245                 key = hash(unit._properties)
246             data[key] = dict([(k, v) for k, v in zip(source.descr, row)])
247        
248         return newclass
249    
250     #                               Schemas                               #
251    
252     def create_database(self, conflicts='error'):
253         """Create internal structures for the entire database.
254         
255         conflicts: see errors.conflict.
256         """
257         if self.logflags & logflags.DDL:
258             self.log(logflags.DDL.message("create database"))
259        
260         try:
261             if not os.path.exists(self.shelvepath):
262                 os.makedirs(self.shelvepath)
263         except Exception, x:
264             errors.conflict(conflicts, str(x))
265    
266     def drop_database(self, conflicts='error'):
267         """Destroy internal structures for the entire database.
268         
269         conflicts: see errors.conflict.
270         """
271         if self.logflags & logflags.DDL:
272             self.log(logflags.DDL.message("drop database"))
273        
274         while self.shelves:
275             cls, shelf = self.shelves.popitem()
276             shelf.close()
277        
278         for name in os.listdir(self.shelvepath):
279             name = os.path.join(self.shelvepath, name)
280             if not os.path.isdir(name) and name.endswith(self.ext):
281                 try:
282                     os.remove(name)
283                 except Exception, x:
284                     errors.conflict(conflicts, str(x))
285    
286     def create_storage(self, cls, conflicts='error'):
287         """Create internal structures for the given class.
288         
289         conflicts: see errors.conflict.
290         """
291         if self.logflags & logflags.DDL:
292             self.log(logflags.DDL.message("create storage %r" % cls))
293         try:
294             self._create_named_storage(cls)
295         except anydbm.error, x:
296             errors.conflict(conflicts, str(x))
297    
298     def get_lock(self, cls):
299         if cls not in self.locks:
300             lock = self.locks[cls] = threading.Lock()
301         else:
302             lock = self.locks[cls]
303         lock.acquire()
304         return lock
305    
306     def _create_named_storage(self, cls):
307         lock = self.get_lock(cls)
308         try:
309             s = shelve.open(self.filename(cls), 'n')
310         finally:
311             lock.release()
312    
313     def has_storage(self, cls):
314         """If storage structures exist for the given class, return True."""
315         return os.path.exists(self.filename(cls))
316    
317     def drop_storage(self, cls, conflicts='error'):
318         """Destroy internal structures for the given class.
319         
320         conflicts: see errors.conflict.
321         """
322         if self.logflags & logflags.DDL:
323             self.log(logflags.DDL.message("drop storage %r" % cls))
324        
325         lock = self.get_lock(cls)
326         try:
327             try:
328                 shelf = self.shelves.pop(cls)
329             except KeyError:
330                 pass
331             else:
332                 shelf.close()
333            
334             try:
335                 os.remove(self.filename(cls))
336             except Exception, x:
337                 errors.conflict(conflicts, str(x))
338         finally:
339             lock.release()
340    
341     def add_property(self, cls, name, conflicts='error'):
342         """Create internal structures for the given property.
343         
344         conflicts: see errors.conflict.
345         """
346         if self.logflags & logflags.DDL:
347             self.log(logflags.DDL.message("add property %r %r" % (cls, name)))
348        
349         lock = self.get_lock(cls)
350         try:
351             try:
352                 data = self.shelves[cls]
353             except KeyError, x:
354                 errors.conflict(conflicts, str(x))
355            
356             for id, props in data.items():
357                 props[name] = None
358                 data[id] = props
359         finally:
360             lock.release()
361    
362     def drop_property(self, cls, name, conflicts='error'):
363         """Destroy internal structures for the given property.
364         
365         conflicts: see errors.conflict.
366         """
367         if self.logflags & logflags.DDL:
368             self.log(logflags.DDL.message("drop property %r %r" % (cls, name)))
369        
370         lock = self.get_lock(cls)
371         try:
372             try:
373                 data = self.shelves[cls]
374             except KeyError, x:
375                 errors.conflict(conflicts, str(x))
376            
377             for id, props in data.items():
378                 del props[name]
379                 data[id] = props
380         finally:
381             lock.release()
382    
383     def rename_property(self, cls, oldname, newname, conflicts='error'):
384         """Rename internal structures for the given property.
385         
386         conflicts: see errors.conflict.
387         """
388         if self.logflags & logflags.DDL:
389             self.log(logflags.DDL.message("rename property %r from %r to %r"
390                                           % (cls, oldname, newname)))
391        
392         lock = self.get_lock(cls)
393         try:
394             try:
395                 data = self.shelves[cls]
396             except KeyError, x:
397                 errors.conflict(conflicts, str(x))
398            
399             for id, props in data.items():
400                 props[newname] = props[oldname]
401                 del props[oldname]
402                 data[id] = props
403         finally:
404             lock.release()
405    
406     def map(self, classes, conflicts='error'):
407         """Map classes to internal storage.
408         
409         conflicts: see errors.conflict.
410         """
411         for cls in classes:
412             if not self.has_storage(cls):
413                 if conflicts == 'repair':
414                     self.create_storage(cls)
415                 else:
416                     errors.conflict(conflicts,
417                                     "%s: no storage found." % cls.__name__)
418            
419             s = self.shelves.get(cls)
420             if s is None:
421                 try:
422                     s = shelve.open(self.filename(cls), 'w')
423                 except anydbm.error, x:
424                     errors.conflict(conflicts, str(x))
425                 else:
426                     self.shelves[cls] = s
427
Note: See TracBrowser for help on using the browser.