Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 21 (checked in by fumanchu, 9 years ago)

1. Removed Unit.temporary
2. Added Unit.on_recall() with Sandbox support.
3. Engines etc replaced temporary with Expiration, permanent() with immortalize().

Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 from dejavu import logic
17 import sets
18 import xray
19
20
21 class UnitCollection(dejavu.Unit):
22     """A Set of Unit IDs.
23     
24     Type: Unit Type of all Units referenced by this collection.
25     
26     The Unit Collection is primarily for use as an index for Units.
27     Unit Engines use Expressions and other rules to transform a Collection
28     as a whole. These classes consume and produce Unit Collections.
29     The Unit Collection provides special methods for iteration, whether
30     reading or writing, to avoid errors common with multi-process/
31     multi-threaded access.
32     
33     UnitCollection is a subclass of Unit, so that it can be managed by
34     Sandboxes. However, due to the structure of the data contained in a
35     UnitCollection, it is recommended that Storage Managers use different
36     techniques to store and retrieve Unit Collections. They do not need
37     more than the ID's of their contained Units stored, since they will
38     recall such Units as needed. Not every Storage Manager is going to be
39     able to handle this kind of dynamic storage; deployers-- examine your
40     Storage Managers and make sure they can!
41     """
42    
43     _IDs = None
44     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
45     Type = dejavu.UnitProperty(u'Type')
46     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
47     Timestamp = dejavu.UnitProperty(u'Timestamp', datetime.datetime)
48    
49     def __init__(self, **kwargs):
50         dejavu.Unit.__init__(self)
51         self._IDs = sets.Set()
52         self._mutex = threading.RLock()
53        
54         for k, v in kwargs.iteritems():
55             setattr(self, k, v)
56    
57     def __getstate__(self):
58         return (self._properties, self.dirty, self._IDs)
59    
60     def __setstate__(self, state):
61         self.sandbox = None
62         self._mutex = threading.RLock()
63         self._properties, self.dirty, self._IDs = state
64    
65     def acquire(self):
66         self._mutex.acquire(True)
67    
68     def release(self):
69         self._mutex.release()
70    
71     def __len__(self):
72         return len(self._IDs)
73    
74     def add(self, ID):
75         self.acquire()
76         try:
77             self._IDs.add(ID)
78         finally:
79             self.release()
80    
81     def unit_class(self):
82         return self.sandbox.arena.class_by_name(self.Type)
83    
84     def ids(self):
85         self.acquire()
86         try:
87             return self._IDs.copy()
88         finally:
89             self.release()
90    
91     def units(self, quota=None):
92         cls = self.unit_class()
93         output = []
94         self.acquire()
95         try:
96             for i, eachID in enumerate(self._IDs):
97                 if quota and i >= quota:
98                     break
99                 unit = self.sandbox.unit(cls, ID=eachID)
100                 if unit:
101                     output.append(unit)
102         finally:
103             self.release()
104         return output
105    
106     def xdict(self, attr):
107         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
108         product = {}
109         self.acquire()
110         try:
111             for unit in self.units():
112                 key = getattr(unit, attr)
113                 product.setdefault(key, []).append(unit)
114         finally:
115             self.release()
116         return product
117    
118     def __copy__(self):
119         newUnit = dejavu.Unit.__copy__(self)
120         newUnit._IDs = self._IDs.copy()
121         return newUnit
122    
123     def on_recall(self):
124         if self.Expiration is not None:
125             if self.Expiration <= datetime.datetime.now():
126                 self.forget()
127                 raise dejavu.UnrecallableError
128             else:
129                 self.decay(minutes=15)
130    
131     def decay(self, **kw):
132         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
133         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
134
135
136 operations = [              #       OPERAND
137     'COPY',                 # SetID of mixin
138     'CREATE',               # New type (= class.__name__)
139     'DIFFERENCE',           # SetID of mixin
140     'FILTER',               # logic.Expression
141     'FUNCTION',             # key into arena.engine_functions dict
142     'INTERSECTION',         # SetID of mixin
143     'RETURN',               #
144     'TRANSFORM',            # New type (= class.__name__)
145     'UNION',                # SetID of mixin
146     ]
147
148
149 class RuleProperty(dejavu.UnitProperty):
150     def post(self, unit, value):
151         eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
152         if eng:
153             eng.update_final_class()
154
155 class UnitEngineRule(dejavu.Unit):
156     """A Rule for Unit Engines."""
157    
158     Operation = RuleProperty(u'Operation', str)
159     SetID = RuleProperty(u'SetID', int)
160     Operand = RuleProperty(u'Operand', str, False, hints = {u'Size': 0})
161     Sequence = RuleProperty(u'Sequence', int)
162     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
163     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
164    
165     def __init__(self, **kwargs):
166         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
167         
168         FILTER:
169             If the Operation is 'FILTER', the Operand shall be a
170             logic.Expression, and the snapshot will consist of the IDs of
171             Units which match the Expression.
172         
173         Everything else:
174             transforms: the snapshot will consist of IDs of all units
175                 which are associated with the current snapshot.
176             union, difference, and intersection: these all take a setID.
177         
178         So, a typical Engine might have a set of rules which look like:
179             --Operation--   --Set-- --Operand--
180             CREATE          1       Invoice         # Full set
181             FILTER          1       (Expression)    # modifies Set 1
182             CREATE          2       Inventory       # Full set
183             FILTER          2       (Expression)    # modifies Set 2
184             FILTER          2       (Expression)    # modifies Set 2
185             TRANSFORM       2       Invoice         # modifies Set 2
186             DIFFERENCE      1       2               # Set1 -= Set2
187             RETURN          1                       # This is optional.
188         
189         The last RETURN statement is optional. If omitted, the last Set
190         touched will be returned.
191         
192         For all operations, the Set ID indicates which Set will be
193             modified by the operation. Using the above example, you can
194             see that for the DIFFERENCE operation, the Set which is modified
195             is Set 1.
196         """
197         dejavu.Unit.__init__(self)
198        
199         if kwargs.get('Operation', '') == 'FILTER':
200             if not isinstance(kwargs.get('Operand'), (str, unicode)):
201                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
202        
203         for k, v in kwargs.iteritems():
204             setattr(self, k, v)
205    
206     def __repr__(self):
207         op = self.Operand
208         if self.Operation == 'FILTER':
209             op = pickle.loads(op)
210         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
211                 % (self.Operation, self.SetID, repr(op)))
212    
213     def expr(self):
214         """expr() -> If a FILTER rule, return the Expression, else None."""
215         if self.Operation == 'FILTER':
216             op = self.Operand
217             return pickle.loads(op)
218         return None
219    
220     def on_recall(self):
221         if self.Expiration is not None:
222             if self.Expiration <= datetime.datetime.now():
223                 self.forget()
224                 raise dejavu.UnrecallableError
225             else:
226                 self.decay(minutes=15)
227    
228     def decay(self, **kw):
229         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
230         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
231
232
233 class UnitEngine(dejavu.Unit):
234     """A factory for Unit Collections."""
235    
236     Owner = dejavu.UnitProperty(u'Owner')
237     Name = dejavu.UnitProperty(u'Name')
238     Created = dejavu.UnitProperty(u'Created', datetime.datetime)
239     FinalClassName = dejavu.UnitProperty(u'FinalClassName')
240     Expiration = dejavu.UnitProperty('Expiration', datetime.datetime)
241    
242     def __init__(self, **kwargs):
243         dejavu.Unit.__init__(self)
244         self.Created = datetime.datetime.today()
245         self.Owner = u''
246        
247         for k, v in kwargs.iteritems():
248             setattr(self, k, v)
249    
250     def on_forget(self):
251         # Rules and Snapshots shouldn't persist past
252         # the life of their Engines. Forget them.
253         for rule in self.rules():
254             rule.forget()
255         for snap in self.snapshots():
256             snap.forget()
257    
258     def on_recall(self):
259         if self.Expiration is not None:
260             if self.Expiration <= datetime.datetime.now():
261                 msg = ("Forgetting Engine %s. Exp = %s. Now = %s" %
262                        (self.ID, self.Expiration, datetime.datetime.now()))
263                 self.sandbox.arena.application.logger.info(msg)
264                 self.forget()
265                 raise dejavu.UnrecallableError
266             else:
267                 self.decay(minutes=15)
268    
269     def decay(self, **kw):
270         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
271         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
272    
273     def update_final_class(self):
274         results = {}
275         last_set = 1
276         for rule in self.rules():
277             last_set = rule.SetID
278             operation = rule.Operation
279             if operation in ('CREATE', 'TRANSFORM'):
280                 results[last_set] = rule.Operand
281             if operation == 'RETURN':
282                 break
283         if last_set in results:
284             self.FinalClassName = results[last_set]
285    
286     def final_class(self):
287         return self.sandbox.arena.class_by_name(self.FinalClassName)
288    
289     def rules(self):
290         """An ordered list of Rules for this Engine."""
291         f = logic.filter(EngineID=self.ID)
292         allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)]
293         allrules.sort(dejavu.sort(u'Sequence'))
294         return allrules
295    
296     def add_rule(self, Operation, SetID=None, Operand=None):
297         allrules = self.rules()
298         if isinstance(Operation, UnitEngineRule):
299             newRule = Operation
300         else:
301             if SetID is None:
302                 try:
303                     SetID = allrules[-1].SetID
304                 except IndexError:
305                     SetID = 1
306             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
307                                      Operand=Operand)
308        
309         try:
310             nextSeq = allrules[-1].Sequence + 1
311         except IndexError:
312             nextSeq = 0
313         newRule.Sequence = nextSeq
314        
315         newRule.EngineID = self.ID
316         newRule.Expiration = self.Expiration
317         self.sandbox.memorize(newRule)
318         self.update_final_class()
319    
320     def snapshots(self):
321         """Unit Collections obtained by executing the rules sometime in the past."""
322         f = logic.filter(EngineID=self.ID)
323         allSnap = [x for x in self.sandbox.recall(UnitCollection, f)]
324         allSnap.sort(dejavu.sort(u'Timestamp'))
325         return allSnap
326    
327     def take_snapshot(self, args={}):
328         """Execute the rules and return a Unit Collection (or None)."""
329         allrules = self.rules()
330         snap = RuleProcessor(self.sandbox).process(allrules, args)
331         if snap is not None:
332             snap.EngineID = self.ID
333             now = datetime.datetime.now()
334             snap.Timestamp = now
335             snap.decay(minutes=15)
336             self.sandbox.memorize(snap)
337         return snap
338    
339     def last_snapshot(self, args={}):
340         allSnaps = self.snapshots()
341         if len(allSnaps) == 0:
342             aSnap = self.take_snapshot(args)
343         else:
344             aSnap = allSnaps[-1]
345         return aSnap
346    
347     def immortalize(self):
348         self.Expiration = None
349         for rule in self.rules():
350             rule.Expiration = None
351    
352     def __copy__(self):
353         newUnit = dejavu.Unit.__copy__(self)
354         newUnit.Name = "Copy of %s" % newUnit.Name
355         newUnit.Created = datetime.datetime.now()
356         return newUnit
357    
358     def clone(self, user, temporary=True):
359         """Copy self and all Rules of self. Memorize automatically."""
360         newUnit = self.__copy__()
361         newUnit.Owner = user
362         if temporary:
363             newUnit.decay(minutes=15)
364         else:
365             newUnit.Expiration = None
366         self.sandbox.memorize(newUnit)
367         for rule in self.rules():
368             newRule = rule.__copy__()
369             newRule.EngineID = newUnit.ID
370             newRule.Expiration = newUnit.Expiration
371             self.sandbox.memorize(newRule)
372         return newUnit
373    
374     def permit(self, user, write_access=True):
375         if write_access:
376             return self.Owner in (u'Public', user)
377         else:
378             return self.Owner in ('System', 'Public', user)
379
380
381 class RuleProcessor(object):
382     """Processor for the Rules of a Unit Engine."""
383    
384     def __init__(self, sandbox):
385         self.sandbox = sandbox
386         self.arena = sandbox.arena
387    
388     def process(self, rules, args):
389         """Execute the rules and return a Unit Collection (or None)."""
390         self.sets = {}
391         self.args = args
392         final = None
393         for rule in rules:
394             operation = rule.Operation
395             func = getattr(self, 'visit_' + operation)
396             final = rule.SetID
397             func(final, rule.Operand)
398         if final is None:
399             return None
400         else:
401             return self.sets[final]
402    
403     def visit_COPY(self, setID, operand):
404         """Copy the set whose ID = operand into another set, whose ID = setID."""
405         A = self.sets[setID]
406         operand = int(operand)
407         if operand in self.sets:
408             # Overwrite the existing set.
409             B = self.sets[operand]
410         else:
411             # Create a new set.
412             B = UnitCollection(Type=A.Type)
413             self.sets[operand] = B
414         B.empty = A.empty
415         A.acquire()
416         B.acquire()
417         try:
418             B._IDs = A._IDs.copy()
419         finally:
420             A.release()
421             B.release()
422    
423     def visit_CREATE(self, setID, operand):
424         """Create an empty set. The next instruction is responsible to fill it."""
425         newset = UnitCollection(Type=operand)
426         newset.empty = True
427         self.sets[setID] = newset
428    
429     def realize_empty(self, setID):
430         """realize_empty(setID). Populate the specified set only if empty."""
431         A = self.sets[setID]
432         if hasattr(A, 'empty') and A.empty:
433             A.empty = False
434             A.acquire()
435             try:
436                 for unit in self.sandbox.recall(self.arena.class_by_name(A.Type)):
437                     A._IDs.add(unit.ID)
438             finally:
439                 A.release()
440    
441     def visit_DIFFERENCE(self, setID, operand):
442         self.realize_empty(setID)
443         A = self.sets[setID]
444         B = self.sets[int(operand)]
445         A.acquire()
446         B.acquire()
447         try:
448             A._IDs = A._IDs.difference(B._IDs)
449         finally:
450             A.release()
451             B.release()
452    
453     def visit_FILTER(self, setID, operand):
454         expr = pickle.loads(operand)
455         expr.bind_args(**self.args)
456         A = self.sets[setID]
457         if hasattr(A, 'empty') and A.empty:
458             A.empty = False
459             A.acquire()
460             try:
461                 cls = self.arena.class_by_name(A.Type)
462                 for unit in self.sandbox.recall(cls, expr):
463                     A._IDs.add(unit.ID)
464             finally:
465                 A.release()
466         else:
467             A.acquire()
468             try:
469                 cls = self.arena.class_by_name(A.Type)
470                 newset = sets.Set()
471                 for id in A._IDs:
472                     unit = self.sandbox.unit(cls, ID=id)
473                     if unit and expr.evaluate(unit):
474                         newset.add(id)
475                 A._IDs = newset
476             finally:
477                 A.release()
478    
479     def visit_FUNCTION(self, setID, operand):
480         func = self.arena.engine_functions[operand]
481        
482         A = self.sets[setID]
483         A.acquire()
484         try:
485             func(self.sandbox, A)
486         finally:
487             A.release()
488    
489     def visit_INTERSECTION(self, setID, operand):
490         self.realize_empty(setID)
491         A = self.sets[setID]
492         B = self.sets[int(operand)]
493         A.acquire()
494         B.acquire()
495         try:
496             A._IDs = A._IDs.intersection(B._IDs)
497         finally:
498             A.release()
499             B.release()
500    
501     def visit_RETURN(self, setID, operand):
502         self.realize_empty(setID)
503    
504     def visit_TRANSFORM(self, setID, operand):
505         """operand=far class name. Multiple hops are supported."""
506         self.realize_empty(setID)
507         A = self.sets[setID]
508         start = self.arena.class_by_name(A.Type)
509         end = self.arena.class_by_name(operand)
510         nodes = self.arena.associations.shortest_path(start, end)
511         if nodes is None:
512             raise KeyError("No association found between '%s' and '%s'"
513                            % (start, end))
514        
515         # Skip the first node, which should be A.Type
516         nodes.pop(0)
517         A.acquire()
518         try:
519             for eachType in nodes:
520                 # Add all associated Units to the collection A.
521                 oppfunc = getattr(start, eachType.__name__)
522                 cls = self.arena.class_by_name(A.Type)
523                 newset = sets.Set()
524                 for id in A._IDs:
525                     unit = self.sandbox.unit(cls, ID=id)
526                     if unit:
527                         for farUnit in oppfunc(unit):
528                             newset.add(farUnit.ID)
529                 A._IDs = newset
530                 start = eachType
531                 A.Type = eachType.__name__
532         finally:
533             A.release()
534    
535     def visit_UNION(self, setID, operand):
536         self.realize_empty(setID)
537         A = self.sets[setID]
538         B = self.sets[int(operand)]
539         A.acquire()
540         B.acquire()
541         try:
542             A._IDs = A._IDs.union(B._IDs)
543         finally:
544             A.release()
545             B.release()
546
Note: See TracBrowser for help on using the browser.