Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/storeram.py

Revision 327 (checked in by fumanchu, 6 years ago)

New RAMStorage module.

  • Property svn:eol-style set to native
Line 
1 try:
2     import cPickle as pickle
3 except ImportError:
4     import pickle
5
6 import thread
7
8 from dejavu import logic, UnitJoin
9 from dejavu.storage import StorageManager
10
11
12 class RAMStorage(StorageManager):
13     """A Storage Manager which keeps all data in RAM."""
14    
15     def __init__(self, arena, allOptions={}):
16         StorageManager.__init__(self, arena, allOptions)
17         self._caches = {}       # id: pickled Unit
18         self._cache_locks = {}
19    
20     def cachelen(self, cls):
21         return len(self._caches.get(cls, {}))
22    
23     def cached_units(self, cls):
24         return [pickle.loads(data) for data
25                 in self._caches.get(cls, {}).itervalues()]
26    
27     def _get_lock(self, cls):
28         lock = self._cache_locks[cls]
29         lock.acquire(True)
30         return lock
31    
32     def recall(self, unitClass, expr=None):
33         """Return a Unit iterator."""
34         lock = self._get_lock(unitClass)
35         try:
36             cache = self._caches[unitClass]
37             matches = {}
38            
39             for id, pickledUnit in cache.iteritems():
40                 unit = pickle.loads(pickledUnit)
41                 if expr is None or expr(unit):
42                     matches[id] = unit
43            
44             return iter(matches.values())
45         finally:
46             lock.release()
47    
48     def multirecall(self, classes, expr):
49         """multirecall(classes, expr) -> Full inner join units from each class."""
50         if expr is None:
51             expr = logic.Expression(lambda *args: True)
52        
53         firstcls = list(classes)[0]
54         # TODO: deconstruct expr into a set of subexpr's, one for
55         # each class in classes.
56         filters = dict([(cls, None) for cls in classes])
57        
58         def combine(unitjoin):
59             cls1, cls2 = unitjoin.class1, unitjoin.class2
60            
61             if isinstance(cls1, UnitJoin):
62                 table1 = combine(cls1)
63                 classlist1 = iter(cls1)
64             else:
65                 table1 = [[x] for x in self.recall(cls1, filters[cls1])]
66                 classlist1 = [cls1]
67            
68             if isinstance(cls2, UnitJoin):
69                 table2 = combine(cls2)
70                 classlist2 = iter(cls2)
71             else:
72                 table2 = [[x] for x in self.recall(cls2, filters[cls2])]
73                 classlist2 = [cls2]
74            
75             # Find an association between the two halves.
76             ua = None
77             for indexA, clsA in enumerate(classlist1):
78                 for indexB, clsB in enumerate(classlist2):
79                     path = unitjoin.path or clsB.__name__
80                     ua = clsA._associations.get(path, None)
81                     if ua:
82                         nearKey, farKey = ua.nearKey, ua.farKey
83                         break
84                     path = unitjoin.path or clsA.__name__
85                     ua = clsB._associations.get(path, None)
86                     if ua:
87                         nearKey, farKey = ua.farKey, ua.nearKey
88                         break
89                 if ua: break
90             if ua is None:
91                 msg = ("No association found between %s and %s." % (cls1, cls2))
92                 raise errors.AssociationError(msg)
93            
94             unitrows = []
95             if unitjoin.leftbiased is None:
96                 # INNER JOIN
97                 for row1 in table1:
98                     nearVal = getattr(row1[indexA], nearKey)
99                     for row2 in table2:
100                         # Test against join constraint
101                         farVal = getattr(row2[indexB], farKey)
102                         if nearVal == farVal:
103                             unitrows.append(row1 + row2)
104             elif unitjoin.leftbiased is True:
105                 # LEFT JOIN
106                 for row1 in table1:
107                     nearVal = getattr(row1[indexA], nearKey)
108                     found = False
109                     for row2 in table2:
110                         # Test against join constraint
111                         farVal = getattr(row2[indexB], farKey)
112                         if nearVal == farVal:
113                             unitrows.append(row1 + row2)
114                             found = True
115                     if not found:
116                         unitrows.append(row1 + [unit.__class__() for unit in row2])
117             else:
118                 # RIGHT JOIN
119                 for row2 in table2:
120                     farVal = getattr(row2[indexB], farKey)
121                     found = False
122                     for row1 in table1:
123                         # Test against join constraint
124                         nearVal = getattr(row1[indexA], nearKey)
125                         if nearVal == farVal:
126                             unitrows.append(row1 + row2)
127                             found = True
128                     if not found:
129                         unitrows.append([unit.__class__() for unit in row1] + row2)
130             return unitrows
131        
132         for unitrow in combine(classes):
133             if expr(*unitrow):
134                 yield unitrow
135    
136     def save(self, unit, forceSave=False):
137         """save(unit, forceSave=False). -> Update storage from unit's data."""
138         if unit.dirty() or forceSave:
139             lock = self._get_lock(unit.__class__)
140             try:
141                 unit.cleanse()
142                 cache = self._caches[unit.__class__]
143                 cache[unit.identity()] = pickle.dumps(unit)
144             finally:
145                 lock.release()
146    
147     def destroy(self, unit):
148         """Delete the unit."""
149         unitClass = unit.__class__
150         lock = self._get_lock(unitClass)
151         try:
152             id = unit.identity()
153             cache = self._caches[unitClass]
154             try:
155                 del cache[id]
156             except KeyError:
157                 pass
158         finally:
159             lock.release()
160    
161     def view(self, cls, attrs, expr=None):
162         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
163         if expr is None:
164             expr = logic.Expression(lambda x: True)
165        
166         lock = self._get_lock(cls)
167         try:
168             cache = self._caches[cls]
169             seen = []
170            
171             for id, pickledUnit in cache.iteritems():
172                 unit = pickle.loads(pickledUnit)
173                 if expr is None or expr(unit):
174                     seen.append(tuple([getattr(unit, f) for f in attrs]))
175            
176             return iter(seen)
177         finally:
178             lock.release()
179    
180     def distinct(self, cls, attrs, expr=None):
181         """distinct(cls, attrs, expr=None) -> Distinct values for given attributes."""
182         if expr is None:
183             expr = logic.Expression(lambda x: True)
184        
185         # Rather than repeat the logic in recall() where we mix cached
186         # and uncached Units, just call recall itself.
187         distvals = {}
188         for unit in self.recall(cls, expr):
189             val = tuple([getattr(unit, f) for f in attrs])
190             distvals[val] = None
191         return distvals.keys()
192    
193     def reserve(self, unit):
194         """Reserve storage space for the Unit."""
195         unitClass = unit.__class__
196         lock = self._get_lock(unitClass)
197         try:
198             cache = self._caches[unitClass]
199             if not unit.sequencer.valid_id(unit.identity()):
200                 unit.sequencer.assign(unit, cache.keys())
201             # Pickle the Unit to discard extraneous attributes,
202             # and avoid identity issues.
203             cache[unit.identity()] = pickle.dumps(unit)
204         finally:
205             lock.release()
206             unit.cleanse()
207    
208     def shutdown(self):
209         self._caches = {}
210         self._cache_locks = {}
211    
212     def create_database(self):
213         pass
214    
215     drop_database = shutdown
216    
217     def create_storage(self, cls):
218         self._caches[cls] = {}
219         self._cache_locks[cls] = thread.allocate_lock()
220    
221     def has_storage(self, cls):
222         return cls in self._caches
223    
224     def drop_storage(self, cls):
225         self._caches.pop(cls, None)
226         self._cache_locks.pop(cls, None)
227    
228     def add_property(self, cls, name):
229         lock = self._get_lock(cls)
230         try:
231             cache = self._caches[cls]
232             for id, pickledUnit in cache.items():
233                 unit = pickle.loads(pickledUnit)
234                 unit._properties[name] = None
235                 unit.cleanse()
236                 cache[id] = pickle.dumps(unit)
237         finally:
238             lock.release()
239    
240     def drop_property(self, cls, name):
241         lock = self._get_lock(cls)
242         try:
243             cache = self._caches[cls]
244             for id, pickledUnit in cache.items():
245                 unit = pickle.loads(pickledUnit)
246                 del unit._properties[name]
247                 unit.cleanse()
248                 cache[id] = pickle.dumps(unit)
249         finally:
250             lock.release()
251    
252     def rename_property(self, cls, oldname, newname):
253         lock = self._get_lock(cls)
254         try:
255             cache = self._caches[cls]
256             for id, pickledUnit in cache.items():
257                 unit = pickle.loads(pickledUnit)
258                 unit._properties[newname] = unit._properties[oldname]
259                 del unit._properties[oldname]
260                 unit.cleanse()
261                 cache[id] = pickle.dumps(unit)
262         finally:
263             lock.release()
264
Note: See TracBrowser for help on using the browser.