Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/tags/1.4.0/storage/storeshelve.py

Revision 157 (checked in by fumanchu, 3 years ago)

Safer storeshelve.drop_database (using a unique file extension).

  • Property svn:eol-style set to native
Line 
1 try:
2     from bsddb._db import DBNoSuchFileError
3 except ImportError:
4     DBNoSuchFileError = object()
5
6 import os
7
8 try:
9     import cPickle as pickle
10 except ImportError:
11     import pickle
12
13 import shelve
14 import threading
15
16 import dejavu
17 from dejavu import storage, logic
18
19
20 class StorageManagerShelve(storage.StorageManager):
21     """StoreManager to save and retrieve Units via stdlib shelve."""
22    
23     def __init__(self, name, arena, allOptions={}):
24         storage.StorageManager.__init__(self, name, arena, allOptions)
25        
26         path = allOptions['Path']
27         if not os.path.isabs(path):
28             path = os.path.join(os.getcwd(), path)
29         if not os.path.exists(path):
30             raise IOError(2, "No such directory: '%s'" % path)
31         self.shelvepath = path
32        
33         # A dictionary whose keys are classnames and whose
34         # values are objects returned by shelve.open().
35         # Those values are dict-like objects with keys of type 'str'.
36         self.shelves = {}
37        
38         self.locks = {}
39    
40     def shutdown(self):
41         while self.shelves:
42             clsname, shelf = self.shelves.popitem()
43             shelf.close()
44    
45     def shelf(self, cls):
46         clsname = cls.__name__
47         if clsname not in self.locks:
48             lock = self.locks[clsname] = threading.Lock()
49         else:
50             lock = self.locks[clsname]
51         lock.acquire()
52        
53         s = self.shelves.get(clsname)
54         if s is None:
55             s = shelve.open(self.filename(clsname), 'w')
56             self.shelves[clsname] = s
57        
58         return s, lock
59    
60     def recall(self, cls, expr=None):
61         units = []
62         data, lock = self.shelf(cls)
63         try:
64             if data:
65                 for unitdict in data.itervalues():
66                     unit = cls()
67                     # Set the attribute directly to avoid __set__ overhead.
68                     unit._properties = unitdict
69                     if expr is None or expr(unit):
70                         unit.cleanse()
71                         units.append(unit)
72         finally:
73             lock.release()
74        
75         for unit in units:
76             yield unit
77    
78     def key(self, arg):
79         return pickle.dumps(arg)
80    
81     def reserve(self, unit):
82         """reserve(unit). -> Reserve a persistent slot for unit."""
83         data, lock = self.shelf(unit.__class__)
84         try:
85             if not unit.sequencer.valid_id(unit.identity()):
86                 ids = [[row[prop.key] for prop in unit.identifiers]
87                        for row in data.itervalues()]
88                 unit.sequencer.assign(unit, ids)
89             data[self.key(unit.identity())] = unit._properties
90         finally:
91             lock.release()
92             unit.cleanse()
93    
94     def save(self, unit, forceSave=False):
95         """save(unit, forceSave=False). -> Update storage from unit's data."""
96         if unit.dirty() or forceSave:
97             data, lock = self.shelf(unit.__class__)
98             try:
99                 # Replace the entire value to get around writeback issues.
100                 # See the docs on "shelve" for more info.
101                 data[self.key(unit.identity())] = unit._properties
102             finally:
103                 lock.release()
104                 unit.cleanse()
105    
106     def destroy(self, unit):
107         """Delete the unit."""
108         data, lock = self.shelf(unit.__class__)
109         try:
110             del data[self.key(unit.identity())]
111         finally:
112             lock.release()
113    
114     def version(self):
115         import sys
116         return "Shelve version: %s" % sys.version
117    
118     def view(self, cls, fields, expr=None):
119         """view(cls, fields, expr=None) -> All value-tuples for given fields."""
120         if expr is None:
121             expr = logic.Expression(lambda x: True)
122        
123         data, lock = self.shelf(cls)
124         try:
125             globs = []
126             for unitdict in data.itervalues():
127                 unit = cls()
128                 # Set the attributes directly to avoid __set__ overhead.
129                 unit._properties = unitdict
130                 if expr is None or expr(unit):
131                     globs.append(tuple([getattr(unit, field) for field in fields]))
132             return globs
133         finally:
134             lock.release()
135    
136     def distinct(self, cls, fields, expr=None):
137         """distinct(cls, fields, expr=None) -> Distinct values for given fields."""
138         if expr is None:
139             expr = logic.Expression(lambda x: True)
140        
141         data, lock = self.shelf(cls)
142         try:
143             globs = {}
144             for unitdict in data.itervalues():
145                 unit = cls()
146                 # Set the attributes directly to avoid __set__ overhead.
147                 unit._properties = unitdict
148                 if expr is None or expr(unit):
149                     key = tuple([getattr(unit, field) for field in fields])
150                     globs[key] = None
151             return globs.keys()
152         finally:
153             lock.release()
154    
155     def multirecall(self, classes, expr):
156         """multirecall(classes, expr) -> Full inner join units."""
157         if expr is None:
158             expr = logic.Expression(lambda *args: True)
159        
160         firstcls = list(classes)[0]
161         # TODO: deconstruct expr into a set of subexpr's, one for
162         # each class in classes.
163         filters = dict([(cls, None) for cls in classes])
164        
165         def combine(unitjoin):
166             cls1, cls2 = unitjoin.class1, unitjoin.class2
167            
168             if isinstance(cls1, dejavu.UnitJoin):
169                 table1 = combine(cls1)
170                 classlist1 = iter(cls1)
171             else:
172                 table1 = [[x] for x in self.recall(cls1, filters[cls1])]
173                 classlist1 = [cls1]
174            
175             if isinstance(cls2, dejavu.UnitJoin):
176                 table2 = combine(cls2)
177                 classlist2 = iter(cls2)
178             else:
179                 table2 = [[x] for x in self.recall(cls2, filters[cls2])]
180                 classlist2 = [cls2]
181            
182             # Find an association between the two halves.
183             ua = None
184             for indexA, clsA in enumerate(classlist1):
185                 for indexB, clsB in enumerate(classlist2):
186                     ua = clsA._associations.get(clsB.__name__, None)
187                     if ua:
188                         nearKey, farKey = ua.nearKey, ua.farKey
189                         break
190                     ua = clsB._associations.get(clsA.__name__, None)
191                     if ua:
192                         nearKey, farKey = ua.farKey, ua.nearKey
193                         break
194                 if ua: break
195             if ua is None:
196                 msg = ("No association found between %s and %s." % (cls1, cls2))
197                 raise dejavu.AssociationError(msg)
198            
199             unitrows = []
200             if unitjoin.leftbiased is None:
201                 # INNER JOIN
202                 for row1 in table1:
203                     nearVal = getattr(row1[indexA], nearKey)
204                     for row2 in table2:
205                         # Test against join constraint
206                         farVal = getattr(row2[indexB], farKey)
207                         if nearVal == farVal:
208                             unitrows.append(row1 + row2)
209             elif unitjoin.leftbiased is True:
210                 # LEFT JOIN
211                 for row1 in table1:
212                     nearVal = getattr(row1[indexA], nearKey)
213                     found = False
214                     for row2 in table2:
215                         # Test against join constraint
216                         farVal = getattr(row2[indexB], farKey)
217                         if nearVal == farVal:
218                             unitrows.append(row1 + row2)
219                             found = True
220                     if not found:
221                         unitrows.append(row1 + [unit.__class__() for unit in row2])
222             else:
223                 # RIGHT JOIN
224                 for row2 in table2:
225                     farVal = getattr(row2[indexB], farKey)
226                     found = False
227                     for row1 in table1:
228                         # Test against join constraint
229                         nearVal = getattr(row1[indexA], nearKey)
230                         if nearVal == farVal:
231                             unitrows.append(row1 + row2)
232                             found = True
233                     if not found:
234                         unitrows.append([unit.__class__() for unit in row1] + row2)
235             return unitrows
236        
237         for unitrow in combine(classes):
238             if expr(*unitrow):
239                 yield unitrow
240    
241     ext = ".djv"
242    
243     def filename(self, clsname):
244         if not isinstance(clsname, basestring):
245             clsname = clsname.__name__
246         return os.path.join(self.shelvepath, clsname + self.ext)
247        
248     #                               Schemas                               #
249    
250     def create_database(self):
251         if not os.path.exists(self.shelvepath):
252             os.makedirs(self.shelvepath)
253    
254     def drop_database(self):
255         while self.shelves:
256             clsname, shelf = self.shelves.popitem()
257             shelf.close()
258        
259         for name in os.listdir(self.shelvepath):
260             name = os.path.join(self.shelvepath, name)
261             if not os.path.isdir(name) and name.endswith(self.ext):
262                 os.remove(name)
263    
264     def create_storage(self, cls):
265         clsname = cls.__name__
266         if clsname not in self.locks:
267             lock = self.locks[clsname] = threading.Lock()
268         else:
269             lock = self.locks[clsname]
270         lock.acquire()
271        
272         try:
273             s = shelve.open(self.filename(clsname), 'n')
274             self.shelves[clsname] = s
275         finally:
276             lock.release()
277    
278     def has_storage(self, cls):
279         return os.path.exists(self.filename(cls))
280    
281     def drop_storage(self, cls):
282         clsname = cls.__name__
283         try:
284             shelf = self.shelves.pop(clsname)
285         except KeyError:
286             pass
287         else:
288             shelf.close()
289         os.remove(self.filename(clsname))
290    
291     def add_property(self, cls, name):
292         data, lock = self.shelf(cls)
293         try:
294             for id, props in data.items():
295                 props[name] = None
296                 data[id] = props
297         finally:
298             lock.release()
299    
300     def drop_property(self, cls, name):
301         data, lock = self.shelf(cls)
302         try:
303             for id, props in data.items():
304                 del props[name]
305                 data[id] = props
306         finally:
307             lock.release()
308    
309     def rename_property(self, cls, oldname, newname):
310         data, lock = self.shelf(cls)
311         try:
312             for id, props in data.items():
313                 props[newname] = props[oldname]
314                 del props[oldname]
315                 data[id] = props
316         finally:
317             lock.release()
Note: See TracBrowser for help on using the browser.