Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/branches/crazycache/dejavu/storage/storejson.py

Revision 543 (checked in by fumanchu, 3 years ago)

Changed to unit(cls, **kwargs) sig throughout. Also changed sandbox.recall to include order, limit, and offset args; removed **kwargs from recall, but the 'expr' arg may now be a dict for all x/multi/recall methods throughout. Removed inheritance code. Added a 'sum' method to StorageManager?.

  • Property svn:eol-style set to native
Line 
1 try:
2     import cPickle as pickle
3 except ImportError:
4     import pickle
5
6 import os
7 import shutil
8 import threading
9 import time
10
11 import dejavu
12 from dejavu import errors, json, logflags, storage
13
14 from geniusql import logic
15
16
17 class StorageManagerJSON(storage.StorageManager):
18     """StoreManager to save and retrieve Units in flat files as JSON.
19     
20     This is slightly different from fs, in that this saves each Unit class
21     to its own folder, and each Unit instance to its own file.
22     
23     Be aware that this SM stores each Unit in its own file,
24     and your operating system may have hard limits or performance
25     issues when the number of Units of a given class (or the number
26     of classes) grows large.
27     """
28    
29     def __init__(self, allOptions={}):
30         storage.StorageManager.__init__(self, allOptions)
31        
32         root = allOptions['root']
33         if not os.path.isabs(root):
34             root = os.path.join(os.getcwd(), root)
35         self.root = root
36        
37         self.mode = int(allOptions.get('mode', '0777'), 8)
38        
39         # Character to use for joining multiple identifiers
40         # into a single file name.
41         self.idsepchar = allOptions.get('idsepchar', '_')
42        
43         encoding = allOptions.get('encoding', None)
44         self.decoder = json.Decoder(encoding=encoding)
45        
46         skipkeys = allOptions.get('skipkeys', False)
47         check_circular = allOptions.get('check_circular', True)
48         allow_nan = allOptions.get('allow_nan', False)
49         indent = allOptions.get('indent', None)
50         self.encoder = json.Encoder(skipkeys, ensure_ascii=True,
51                                     check_circular=True, allow_nan=True,
52                                     indent=indent)
53    
54     def shelf(self, cls):
55         """Return path for the given cls."""
56         return os.path.join(self.root, cls.__name__)
57    
58     def get_lock(self, cls):
59         clsname = cls.__name__
60         path = os.path.join(self.root, clsname, "class.lock")
61         while True:
62             try:
63                 lockfd = os.open(path, os.O_CREAT|os.O_WRONLY|os.O_EXCL)
64             except OSError:
65                 time.sleep(0.1)
66             else:
67                 os.close(lockfd)
68                 break
69    
70     def release_lock(self, cls):
71         clsname = cls.__name__
72         path = os.path.join(self.root, clsname, "class.lock")
73         os.unlink(path)
74    
75     def _push(self, root, fname, data):
76         """Persist a unit property dict into its file.
77         
78         This assumes the folder exists and we have a lock on it.
79         """
80         fname = os.path.join(root, fname)
81         f = open(fname, 'wb')
82         try:
83             f.write(self.encoder.encode(data))
84         finally:
85             f.close()
86    
87     def _pull(self, root, fname):
88         """Return a unit property dict from the given filename, or None."""
89         fname = os.path.join(root, fname)
90         v = open(fname, 'rb').read()
91         return self.decoder.decode(v)
92    
93     def unit(self, cls, **kwargs):
94         """A single Unit which matches the given kwargs, else None.
95         
96         The first Unit matching the kwargs is returned; if no Units match,
97         None is returned.
98         """
99         if set(kwargs.keys()) == set(cls.identifiers):
100             # Looking up a Unit by its identifiers.
101             # Skip walking the shelf.
102             if self.logflags & logflags.RECALL:
103                 self.log(logflags.RECALL.message(cls, kwargs))
104            
105             classdir = self.shelf(cls)
106             fname = self.idsepchar.join([str(kwargs[k])
107                                          for k in cls.identifiers])
108             if not fname:
109                 fname = "__blank__"
110            
111             try:
112                 data = self._pull(classdir, fname)
113                 unit = json.dict_to_unit(data, cls=cls)
114             except IOError:
115                 return None
116            
117             unit.cleanse()
118             return unit
119        
120         return storage.StorageManager.unit(self, cls, **kwargs)
121    
122     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
123         if isinstance(classes, dejavu.UnitJoin):
124             return self._xmultirecall(classes, expr, order=order,
125                                       limit=limit, offset=offset)
126        
127         cls = classes
128         if self.logflags & logflags.RECALL:
129             self.log(logflags.RECALL.message(cls, expr))
130        
131         path = self.shelf(cls)
132         self.get_lock(cls)
133         try:
134             files = os.listdir(path)
135         finally:
136             self.release_lock(cls)
137        
138         data = self._xrecall_inner(cls, expr, path, files)
139         return self._paginate(data, order, limit, offset, single=True)
140    
141     def _xrecall_inner(self, cls, expr, root, files):
142         """Private helper for self.xrecall."""
143         for fname in files:
144             if fname == "class.lock":
145                 continue
146            
147             data = self._pull(root, fname)
148             unit = json.dict_to_unit(data, cls=cls)
149             if expr is None or expr(unit):
150                 unit.cleanse()
151                 # Must yield a sequence for use in _paginate.
152                 yield (unit,)
153    
154     def ids_from_file(self, cls, fname):
155         """Return a list of identifier (k, v) pairs for the named folder."""
156         if cls.identifiers:
157             if fname == "__blank__":
158                 return [(k, "") for k in cls.identifiers]
159            
160             # cls.identifiers is ordered, and should match
161             # the order of atoms inside fname.
162             return [(k, getattr(cls, k).coerce(None, v))
163                     for k, v in zip(cls.identifiers,
164                                     fname.split(self.idsepchar))
165                     ]
166         else:
167             return []
168    
169     def filename_from_unit(self, unit):
170         """Return the folder name for the given unit."""
171         if unit.identifiers:
172             folder = self.idsepchar.join([str(getattr(unit, k))
173                                           for k in unit.identifiers])
174         else:
175             folder = str(hash(unit))
176        
177         if not folder:
178             folder = "__blank__"
179         return folder
180    
181     def reserve(self, unit):
182         """Reserve a persistent slot for unit."""
183         if self.logflags & logflags.RESERVE:
184             self.log(logflags.RESERVE.message(unit))
185        
186         cls = unit.__class__
187         path = self.shelf(cls)
188         self.get_lock(cls)
189         try:
190             if not unit.sequencer.valid_id(unit.identity()):
191                 files = os.listdir(path)
192                 ids = [[v for k, v in self.ids_from_file(cls, fname)]
193                        for fname in files if fname != 'class.lock']
194                 unit.sequencer.assign(unit, ids)
195            
196             fname = self.filename_from_unit(unit)
197             self._push(path, fname, json.unit_to_dict(unit))
198         finally:
199             self.release_lock(cls)
200             unit.cleanse()
201    
202     def save(self, unit, forceSave=False):
203         """Update storage from unit's data."""
204         if self.logflags & logflags.SAVE:
205             self.log(logflags.SAVE.message(unit, forceSave))
206        
207         if forceSave or unit.dirty():
208             cls = unit.__class__
209             path = self.shelf(cls)
210             self.get_lock(cls)
211             try:
212                 fname = self.filename_from_unit(unit)
213                 self._push(path, fname, json.unit_to_dict(unit))
214             finally:
215                 self.release_lock(cls)
216                 unit.cleanse()
217    
218     def destroy(self, unit):
219         """Delete the unit."""
220         if self.logflags & logflags.DESTROY:
221             self.log(logflags.DESTROY.message(unit))
222        
223         cls = unit.__class__
224         path = self.shelf(cls)
225         self.get_lock(cls)
226         try:
227             fname = self.filename_from_unit(unit)
228             os.unlink(os.path.join(path, fname))
229         finally:
230             self.release_lock(cls)
231    
232     __version__ = "0.1"
233    
234     def version(self):
235         return "%s %s" % (self.__class__.__name__, self.__version__)
236    
237    
238     #                               Schemas                               #
239    
240     def create_database(self, conflicts='error'):
241         """Create internal structures for the entire database.
242         
243         conflicts: see errors.conflict.
244         """
245         if self.logflags & logflags.DDL:
246             self.log(logflags.DDL.message("create database"))
247        
248         try:
249             os.makedirs(self.root)
250         except Exception, x:
251             errors.conflict(conflicts, str(x))
252    
253     def drop_database(self, conflicts='error'):
254         """Destroy internal structures for the entire database.
255         
256         conflicts: see errors.conflict.
257         """
258         if self.logflags & logflags.DDL:
259             self.log(logflags.DDL.message("drop database"))
260        
261         try:
262             shutil.rmtree(self.root)
263         except Exception, x:
264             errors.conflict(conflicts, str(x))
265    
266     def create_storage(self, cls, conflicts='error'):
267         """Destroy internal structures for the given class.
268         
269         conflicts: see errors.conflict.
270         """
271         if self.logflags & logflags.DDL:
272             self.log(logflags.DDL.message("create storage %s" % cls))
273        
274         try:
275             path = self.shelf(cls)
276             os.mkdir(path, self.mode)
277         except Exception, x:
278             errors.conflict(conflicts, str(x))
279    
280     def has_storage(self, cls):
281         """If storage structures exist for the given class, return True."""
282         return os.path.exists(self.shelf(cls))
283    
284     def drop_storage(self, cls, conflicts='error'):
285         """Destroy internal structures for the given class.
286         
287         conflicts: see errors.conflict.
288         """
289         if self.logflags & logflags.DDL:
290             self.log(logflags.DDL.message("drop storage %s" % cls))
291        
292         try:
293             shutil.rmtree(self.shelf(cls))
294         except Exception, x:
295             errors.conflict(conflicts, str(x))
296    
297     def add_property(self, cls, name, conflicts='error'):
298         """Add internal structures for the given property.
299         
300         conflicts: see errors.conflict.
301         """
302         if self.logflags & logflags.DDL:
303             self.log(logflags.DDL.message("add property %s %s"
304                                           % (cls, name)))
305        
306         path = self.shelf(cls)
307         self.get_lock(cls)
308         try:
309             for fname in os.listdir(path):
310                 if fname == "class.lock":
311                     continue
312                 try:
313                     data = self._pull(path, fname)
314                     data[name] = None
315                     self._push(path, fname, data)
316                 except Exception, x:
317                     errors.conflict(conflicts, str(x))
318         finally:
319             self.release_lock(cls)
320    
321     def has_property(self, cls, name):
322         """If storage structures exist for the given property, return True."""
323         path = self.shelf(cls)
324         self.get_lock(cls)
325         try:
326             for fname in os.listdir(path):
327                 if fname == "class.lock":
328                     continue
329                 return name in self._pull(path, fname)
330         finally:
331             self.release_lock(cls)
332    
333     def drop_property(self, cls, name, conflicts='error'):
334         """Destroy internal structures for the given property.
335         
336         conflicts: see errors.conflict.
337         """
338         if self.logflags & logflags.DDL:
339             self.log(logflags.DDL.message("drop property %s %s"
340                                           % (cls, name)))
341        
342         path = self.shelf(cls)
343         self.get_lock(cls)
344         try:
345             for fname in os.listdir(path):
346                 if fname == "class.lock":
347                     continue
348                 try:
349                     data = self._pull(path, fname)
350                     data.pop(name, None)
351                     self._push(path, fname, data)
352                 except Exception, x:
353                     errors.conflict(conflicts, str(x))
354         finally:
355             self.release_lock(cls)
356    
357     def rename_property(self, cls, oldname, newname, conflicts='error'):
358         """Rename internal structures for the given property.
359         
360         conflicts: see errors.conflict.
361         """
362         if self.logflags & logflags.DDL:
363             self.log(logflags.DDL.message(
364                 "rename property %s from %s to %s"
365                 % (cls, oldname, newname)))
366        
367         path = self.shelf(cls)
368         self.get_lock(cls)
369         try:
370             for fname in os.listdir(path):
371                 if fname == "class.lock":
372                     continue
373                 try:
374                     data = self._pull(path, fname)
375                     data[newname] = data.pop(oldname, None)
376                     self._push(path, fname, data)
377                 except Exception, x:
378                     errors.conflict(conflicts, str(x))
379         finally:
380             self.release_lock(cls)
Note: See TracBrowser for help on using the browser.