Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 20 (checked in by fumanchu, 9 years ago)

1. Added getstate, setstate to Unit and subclasses.
2. Now pickling Units in CachingProxy? to avoid persistent sandbox and other identity issues.

Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you mark each instance as no longer temporary. If you
5 use UnitEngine.permanent(), it will make all of its rules permanent
6 (not temporary) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 from dejavu import logic
17 import sets
18 import xray
19
20
21 class UnitCollection(dejavu.Unit):
22     """A Set of Unit IDs.
23     
24     Type: Unit Type of all Units referenced by this collection.
25     
26     The Unit Collection is primarily for use as an index for Units.
27     Unit Engines use Expressions and other rules to transform a Collection
28     as a whole. These classes consume and produce Unit Collections.
29     The Unit Collection provides special methods for iteration, whether
30     reading or writing, to avoid errors common with multi-process/
31     multi-threaded access.
32     
33     UnitCollection is a subclass of Unit, so that it can be managed by
34     Sandboxes. However, due to the structure of the data contained in a
35     UnitCollection, it is recommended that Storage Managers use different
36     techniques to store and retrieve Unit Collections. They do not need
37     more than the ID's of their contained Units stored, since they will
38     recall such Units as needed. Not every Storage Manager is going to be
39     able to handle this kind of dynamic storage; deployers-- examine your
40     Storage Managers and make sure they can!
41     """
42    
43     _IDs = None
44    
45     def __init__(self, **kwargs):
46         dejavu.Unit.__init__(self)
47         self._IDs = sets.Set()
48         self._mutex = threading.RLock()
49        
50         for k, v in kwargs.iteritems():
51             setattr(self, k, v)
52    
53     def __getstate__(self):
54         return (self._properties, self.dirty, self.temporary, self._IDs)
55    
56     def __setstate__(self, state):
57         self.sandbox = None
58         self._mutex = threading.RLock()
59         self._properties, self.dirty, self.temporary, self._IDs = state
60    
61     def acquire(self):
62         self._mutex.acquire(True)
63    
64     def release(self):
65         self._mutex.release()
66    
67     def __len__(self):
68         return len(self._IDs)
69    
70     def add(self, ID):
71         self.acquire()
72         try:
73             self._IDs.add(ID)
74         finally:
75             self.release()
76    
77     def unit_class(self):
78         return self.sandbox.arena.class_by_name(self.Type)
79    
80     def ids(self):
81         self.acquire()
82         try:
83             return self._IDs.copy()
84         finally:
85             self.release()
86    
87     def units(self, quota=None):
88         cls = self.unit_class()
89         output = []
90         self.acquire()
91         try:
92             for i, eachID in enumerate(self._IDs):
93                 if quota and i >= quota:
94                     break
95                 unit = self.sandbox.unit(cls, ID=eachID)
96                 if unit:
97                     output.append(unit)
98         finally:
99             self.release()
100         return output
101    
102     def xdict(self, attr):
103         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
104         product = {}
105         self.acquire()
106         try:
107             for unit in self.units():
108                 key = getattr(unit, attr)
109                 product.setdefault(key, []).append(unit)
110         finally:
111             self.release()
112         return product
113    
114     def __copy__(self):
115         newUnit = dejavu.Unit.__copy__(self)
116         newUnit._IDs = self._IDs.copy()
117         return newUnit
118
119 UnitCollection.set_property(u'EngineID', int, index=True)
120 UnitCollection.set_properties({u'Type': unicode,
121                                u'Timestamp': datetime.datetime,
122                                })
123
124
125 operations = [              #       OPERAND
126     'COPY',                 # SetID of mixin
127     'CREATE',               # New type (= class.__name__)
128     'DIFFERENCE',           # SetID of mixin
129     'FILTER',               # logic.Expression
130     'FUNCTION',             # key into arena.engine_functions dict
131     'INTERSECTION',         # SetID of mixin
132     'RETURN',               #
133     'TRANSFORM',            # New type (= class.__name__)
134     'UNION',                # SetID of mixin
135     ]
136
137
138 class UnitEngineRule(dejavu.Unit):
139     """A Rule for Unit Engines."""
140    
141     def __init__(self, **kwargs):
142         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
143         
144         FILTER:
145             If the Operation is 'FILTER', the Operand shall be a
146             logic.Expression, and the snapshot will consist of the IDs of
147             Units which match the Expression.
148         
149         Everything else:
150             transforms: the snapshot will consist of IDs of all units
151                 which are associated with the current snapshot.
152             union, difference, and intersection: these all take a setID.
153         
154         So, a typical Engine might have a set of rules which look like:
155             --Operation--   --Set-- --Operand--
156             CREATE          1       Invoice         # Full set
157             FILTER          1       (Expression)    # modifies Set 1
158             CREATE          2       Inventory       # Full set
159             FILTER          2       (Expression)    # modifies Set 2
160             FILTER          2       (Expression)    # modifies Set 2
161             TRANSFORM       2       Invoice         # modifies Set 2
162             DIFFERENCE      1       2               # Set1 -= Set2
163             RETURN          1                       # This is optional.
164         
165         The last RETURN statement is optional. If omitted, the last Set
166         touched will be returned.
167         
168         For all operations, the Set ID indicates which Set will be
169             modified by the operation. Using the above example, you can
170             see that for the DIFFERENCE operation, the Set which is modified
171             is Set 1.
172         """
173         dejavu.Unit.__init__(self)
174        
175         if kwargs.get('Operation', '') == 'FILTER':
176             if not isinstance(kwargs.get('Operand'), (str, unicode)):
177                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
178        
179         for k, v in kwargs.iteritems():
180             setattr(self, k, v)
181    
182     def __repr__(self):
183         op = self.Operand
184         if self.Operation == 'FILTER':
185             op = pickle.loads(op)
186         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
187                 % (self.Operation, self.SetID, repr(op)))
188    
189     def expr(self):
190         """expr() -> If a FILTER rule, return the Expression, else None."""
191         if self.Operation == 'FILTER':
192             op = self.Operand
193             return pickle.loads(op)
194         return None
195
196 class RuleProperty(dejavu.UnitProperty):
197     def post(self, unit, value):
198         eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
199         if eng:
200             eng.update_final_class()
201 UnitEngineRule.set_property(u'Operation', str, descriptor=RuleProperty)
202 UnitEngineRule.set_property(u'SetID', int, descriptor=RuleProperty)
203 UnitEngineRule.set_property(u'Operand', str, descriptor=RuleProperty)
204 UnitEngineRule.Operand.hints = {u'Size': 0}
205 UnitEngineRule.set_property(u'Sequence', int, descriptor=RuleProperty)
206 UnitEngineRule.set_property(u'EngineID', int, index=True)
207
208
209 class UnitEngine(dejavu.Unit):
210     """A factory for Unit Collections."""
211    
212     def __init__(self, **kwargs):
213         dejavu.Unit.__init__(self)
214         self.Created = datetime.datetime.today()
215         self.Owner = u''
216        
217         for k, v in kwargs.iteritems():
218             setattr(self, k, v)
219    
220     def on_forget(self):
221         # Rules and Snapshots shouldn't persist past
222         # the life of their Engines. Forget them.
223         for rule in self.rules():
224             rule.forget()
225         for snap in self.snapshots():
226             snap.forget()
227    
228     def update_final_class(self):
229         results = {}
230         last_set = 1
231         for rule in self.rules():
232             last_set = rule.SetID
233             operation = rule.Operation
234             if operation in ('CREATE', 'TRANSFORM'):
235                 results[last_set] = rule.Operand
236             if operation == 'RETURN':
237                 break
238         if last_set in results:
239             self.FinalClassName = results[last_set]
240    
241     def final_class(self):
242         return self.sandbox.arena.class_by_name(self.FinalClassName)
243    
244     def rules(self):
245         """An ordered list of Rules for this Engine."""
246         f = logic.filter(EngineID=self.ID)
247         allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)]
248         allrules.sort(dejavu.sort(u'Sequence'))
249         return allrules
250    
251     def add_rule(self, Operation, SetID=None, Operand=None):
252         allrules = self.rules()
253         if isinstance(Operation, UnitEngineRule):
254             newRule = Operation
255         else:
256             if SetID is None:
257                 try:
258                     SetID = allrules[-1].SetID
259                 except IndexError:
260                     SetID = 1
261             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
262                                      Operand=Operand)
263        
264         try:
265             nextSeq = allrules[-1].Sequence + 1
266         except IndexError:
267             nextSeq = 0
268         newRule.Sequence = nextSeq
269        
270         newRule.EngineID = self.ID
271         newRule.temporary = self.temporary
272         self.sandbox.memorize(newRule)
273         self.update_final_class()
274    
275     def snapshots(self):
276         """Unit Collections obtained by executing the rules sometime in the past."""
277         f = logic.filter(EngineID=self.ID)
278         allSnap = [x for x in self.sandbox.recall(UnitCollection, f)]
279         allSnap.sort(dejavu.sort(u'Timestamp'))
280         return allSnap
281    
282     def take_snapshot(self, args={}):
283         """Execute the rules and return a Unit Collection (or None)."""
284         allrules = self.rules()
285         snap = RuleProcessor(self.sandbox).process(allrules, args)
286         if snap is not None:
287             snap.EngineID = self.ID
288             snap.Timestamp = datetime.datetime.now()
289             snap.temporary = True
290             self.sandbox.memorize(snap)
291         return snap
292    
293     def last_snapshot(self, args={}):
294         allSnaps = self.snapshots()
295         if len(allSnaps) == 0:
296             aSnap = self.take_snapshot(args)
297         else:
298             aSnap = allSnaps[-1]
299         return aSnap
300    
301     def permanent(self):
302         self.temporary = False
303         for rule in self.rules():
304             rule.temporary = False
305    
306     def __copy__(self):
307         newUnit = dejavu.Unit.__copy__(self)
308         newUnit.Name = "Copy of %s" % newUnit.Name
309         newUnit.Created = datetime.datetime.now()
310         return newUnit
311    
312     def clone(self, user, temporary=True):
313         """Copy self and all Rules of self. Memorize automatically."""
314         newUnit = self.__copy__()
315         newUnit.Owner = user
316         newUnit.temporary = temporary
317         self.sandbox.memorize(newUnit)
318         for rule in self.rules():
319             newRule = rule.__copy__()
320             newRule.EngineID = newUnit.ID
321             newRule.temporary = temporary
322             self.sandbox.memorize(newRule)
323         return newUnit
324    
325     def permit(self, user, write_access=True):
326         if write_access:
327             return self.Owner in (u'Public', user)
328         else:
329             return self.Owner in ('System', 'Public', user)
330
331 UnitEngine.set_properties({u'Owner': unicode,
332                            u'Name': unicode,
333                            u'Created': datetime.datetime,
334                            u'FinalClassName': unicode,
335                            })
336
337
338 class RuleProcessor(object):
339     """Processor for the Rules of a Unit Engine."""
340    
341     def __init__(self, sandbox):
342         self.sandbox = sandbox
343         self.arena = sandbox.arena
344    
345     def process(self, rules, args):
346         """Execute the rules and return a Unit Collection (or None)."""
347         self.sets = {}
348         self.args = args
349         final = None
350         for rule in rules:
351             operation = rule.Operation
352             func = getattr(self, 'visit_' + operation)
353             final = rule.SetID
354             func(final, rule.Operand)
355         if final is None:
356             return None
357         else:
358             return self.sets[final]
359    
360     def visit_COPY(self, setID, operand):
361         """Copy the set whose ID = operand into another set, whose ID = setID."""
362         A = self.sets[setID]
363         operand = int(operand)
364         if operand in self.sets:
365             # Overwrite the existing set.
366             B = self.sets[operand]
367         else:
368             # Create a new set.
369             B = UnitCollection(Type=A.Type)
370             self.sets[operand] = B
371         B.empty = A.empty
372         A.acquire()
373         B.acquire()
374         try:
375             B._IDs = A._IDs.copy()
376         finally:
377             A.release()
378             B.release()
379    
380     def visit_CREATE(self, setID, operand):
381         """Create an empty set. The next instruction is responsible to fill it."""
382         newset = UnitCollection(Type=operand)
383         newset.empty = True
384         self.sets[setID] = newset
385    
386     def realize_empty(self, setID):
387         """realize_empty(setID). Populate the specified set only if empty."""
388         A = self.sets[setID]
389         if hasattr(A, 'empty') and A.empty:
390             A.empty = False
391             A.acquire()
392             try:
393                 for unit in self.sandbox.recall(self.arena.class_by_name(A.Type)):
394                     A._IDs.add(unit.ID)
395             finally:
396                 A.release()
397    
398     def visit_DIFFERENCE(self, setID, operand):
399         self.realize_empty(setID)
400         A = self.sets[setID]
401         B = self.sets[int(operand)]
402         A.acquire()
403         B.acquire()
404         try:
405             A._IDs = A._IDs.difference(B._IDs)
406         finally:
407             A.release()
408             B.release()
409    
410     def visit_FILTER(self, setID, operand):
411         expr = pickle.loads(operand)
412         expr.bind_args(**self.args)
413         A = self.sets[setID]
414         if hasattr(A, 'empty') and A.empty:
415             A.empty = False
416             A.acquire()
417             try:
418                 cls = self.arena.class_by_name(A.Type)
419                 for unit in self.sandbox.recall(cls, expr):
420                     A._IDs.add(unit.ID)
421             finally:
422                 A.release()
423         else:
424             A.acquire()
425             try:
426                 cls = self.arena.class_by_name(A.Type)
427                 newset = sets.Set()
428                 for id in A._IDs:
429                     unit = self.sandbox.unit(cls, ID=id)
430                     if unit and expr.evaluate(unit):
431                         newset.add(id)
432                 A._IDs = newset
433             finally:
434                 A.release()
435    
436     def visit_FUNCTION(self, setID, operand):
437         func = self.arena.engine_functions[operand]
438        
439         A = self.sets[setID]
440         A.acquire()
441         try:
442             func(self.sandbox, A)
443         finally:
444             A.release()
445    
446     def visit_INTERSECTION(self, setID, operand):
447         self.realize_empty(setID)
448         A = self.sets[setID]
449         B = self.sets[int(operand)]
450         A.acquire()
451         B.acquire()
452         try:
453             A._IDs = A._IDs.intersection(B._IDs)
454         finally:
455             A.release()
456             B.release()
457    
458     def visit_RETURN(self, setID, operand):
459         self.realize_empty(setID)
460    
461     def visit_TRANSFORM(self, setID, operand):
462         """operand=far class name. Multiple hops are supported."""
463         self.realize_empty(setID)
464         A = self.sets[setID]
465         start = self.arena.class_by_name(A.Type)
466         end = self.arena.class_by_name(operand)
467         nodes = self.arena.associations.shortest_path(start, end)
468         if nodes is None:
469             raise KeyError("No association found between '%s' and '%s'"
470                            % (start, end))
471        
472         # Skip the first node, which should be A.Type
473         nodes.pop(0)
474         A.acquire()
475         try:
476             for eachType in nodes:
477                 # Add all associated Units to the collection A.
478                 oppfunc = getattr(start, eachType.__name__)
479                 cls = self.arena.class_by_name(A.Type)
480                 newset = sets.Set()
481                 for id in A._IDs:
482                     unit = self.sandbox.unit(cls, ID=id)
483                     if unit:
484                         for farUnit in oppfunc(unit):
485                             newset.add(farUnit.ID)
486                 A._IDs = newset
487                 start = eachType
488                 A.Type = eachType.__name__
489         finally:
490             A.release()
491    
492     def visit_UNION(self, setID, operand):
493         self.realize_empty(setID)
494         A = self.sets[setID]
495         B = self.sets[int(operand)]
496         A.acquire()
497         B.acquire()
498         try:
499             A._IDs = A._IDs.union(B._IDs)
500         finally:
501             A.release()
502             B.release()
503
504
Note: See TracBrowser for help on using the browser.