Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 22 (checked in by fumanchu, 9 years ago)

1. Moved xray, recur into dejavu.
2. Doc updates.

Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 from dejavu import logic, xray
17 import sets
18
19
20 class UnitCollection(dejavu.Unit):
21     """A Set of Unit IDs.
22     
23     Type: Unit Type of all Units referenced by this collection.
24     
25     The Unit Collection is primarily for use as an index for Units.
26     Unit Engines use Expressions and other rules to transform a Collection
27     as a whole. These classes consume and produce Unit Collections.
28     The Unit Collection provides special methods for iteration, whether
29     reading or writing, to avoid errors common with multi-process/
30     multi-threaded access.
31     
32     UnitCollection is a subclass of Unit, so that it can be managed by
33     Sandboxes. However, due to the structure of the data contained in a
34     UnitCollection, it is recommended that Storage Managers use different
35     techniques to store and retrieve Unit Collections. They do not need
36     more than the ID's of their contained Units stored, since they will
37     recall such Units as needed. Not every Storage Manager is going to be
38     able to handle this kind of dynamic storage; deployers-- examine your
39     Storage Managers and make sure they can!
40     """
41    
42     _IDs = None
43     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
44     Type = dejavu.UnitProperty(u'Type')
45     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
46     Timestamp = dejavu.UnitProperty(u'Timestamp', datetime.datetime)
47    
48     def __init__(self, **kwargs):
49         dejavu.Unit.__init__(self)
50         self._IDs = sets.Set()
51         self._mutex = threading.RLock()
52        
53         for k, v in kwargs.iteritems():
54             setattr(self, k, v)
55    
56     def __getstate__(self):
57         return (self._properties, self.dirty, self._IDs)
58    
59     def __setstate__(self, state):
60         self.sandbox = None
61         self._mutex = threading.RLock()
62         self._properties, self.dirty, self._IDs = state
63    
64     def acquire(self):
65         self._mutex.acquire(True)
66    
67     def release(self):
68         self._mutex.release()
69    
70     def __len__(self):
71         return len(self._IDs)
72    
73     def add(self, ID):
74         self.acquire()
75         try:
76             self._IDs.add(ID)
77         finally:
78             self.release()
79    
80     def unit_class(self):
81         return self.sandbox.arena.class_by_name(self.Type)
82    
83     def ids(self):
84         self.acquire()
85         try:
86             return self._IDs.copy()
87         finally:
88             self.release()
89    
90     def units(self, quota=None):
91         cls = self.unit_class()
92         output = []
93         self.acquire()
94         try:
95             for i, eachID in enumerate(self._IDs):
96                 if quota and i >= quota:
97                     break
98                 unit = self.sandbox.unit(cls, ID=eachID)
99                 if unit:
100                     output.append(unit)
101         finally:
102             self.release()
103         return output
104    
105     def xdict(self, attr):
106         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
107         product = {}
108         self.acquire()
109         try:
110             for unit in self.units():
111                 key = getattr(unit, attr)
112                 product.setdefault(key, []).append(unit)
113         finally:
114             self.release()
115         return product
116    
117     def __copy__(self):
118         newUnit = dejavu.Unit.__copy__(self)
119         newUnit._IDs = self._IDs.copy()
120         return newUnit
121    
122     def on_recall(self):
123         if self.Expiration is not None:
124             if self.Expiration <= datetime.datetime.now():
125                 self.forget()
126                 raise dejavu.UnrecallableError
127             else:
128                 self.decay(minutes=15)
129    
130     def decay(self, **kw):
131         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
132         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
133
134
135 operations = [              #       OPERAND
136     'COPY',                 # SetID of mixin
137     'CREATE',               # New type (= class.__name__)
138     'DIFFERENCE',           # SetID of mixin
139     'FILTER',               # logic.Expression
140     'FUNCTION',             # key into arena.engine_functions dict
141     'INTERSECTION',         # SetID of mixin
142     'RETURN',               #
143     'TRANSFORM',            # New type (= class.__name__)
144     'UNION',                # SetID of mixin
145     ]
146
147
148 class RuleProperty(dejavu.UnitProperty):
149     def post(self, unit, value):
150         eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
151         if eng:
152             eng.update_final_class()
153
154 class UnitEngineRule(dejavu.Unit):
155     """A Rule for Unit Engines."""
156    
157     Operation = RuleProperty(u'Operation', str)
158     SetID = RuleProperty(u'SetID', int)
159     Operand = RuleProperty(u'Operand', str, False, hints = {u'Size': 0})
160     Sequence = RuleProperty(u'Sequence', int)
161     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
162     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
163    
164     def __init__(self, **kwargs):
165         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
166         
167         FILTER:
168             If the Operation is 'FILTER', the Operand shall be a
169             logic.Expression, and the snapshot will consist of the IDs of
170             Units which match the Expression.
171         
172         Everything else:
173             transforms: the snapshot will consist of IDs of all units
174                 which are associated with the current snapshot.
175             union, difference, and intersection: these all take a setID.
176         
177         So, a typical Engine might have a set of rules which look like:
178             --Operation--   --Set-- --Operand--
179             CREATE          1       Invoice         # Full set
180             FILTER          1       (Expression)    # modifies Set 1
181             CREATE          2       Inventory       # Full set
182             FILTER          2       (Expression)    # modifies Set 2
183             FILTER          2       (Expression)    # modifies Set 2
184             TRANSFORM       2       Invoice         # modifies Set 2
185             DIFFERENCE      1       2               # Set1 -= Set2
186             RETURN          1                       # This is optional.
187         
188         The last RETURN statement is optional. If omitted, the last Set
189         touched will be returned.
190         
191         For all operations, the Set ID indicates which Set will be
192             modified by the operation. Using the above example, you can
193             see that for the DIFFERENCE operation, the Set which is modified
194             is Set 1.
195         """
196         dejavu.Unit.__init__(self)
197        
198         if kwargs.get('Operation', '') == 'FILTER':
199             if not isinstance(kwargs.get('Operand'), (str, unicode)):
200                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
201        
202         for k, v in kwargs.iteritems():
203             setattr(self, k, v)
204    
205     def __repr__(self):
206         op = self.Operand
207         if self.Operation == 'FILTER':
208             op = pickle.loads(op)
209         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
210                 % (self.Operation, self.SetID, repr(op)))
211    
212     def expr(self):
213         """expr() -> If a FILTER rule, return the Expression, else None."""
214         if self.Operation == 'FILTER':
215             op = self.Operand
216             return pickle.loads(op)
217         return None
218    
219     def on_recall(self):
220         if self.Expiration is not None:
221             if self.Expiration <= datetime.datetime.now():
222                 self.forget()
223                 raise dejavu.UnrecallableError
224             else:
225                 self.decay(minutes=15)
226    
227     def decay(self, **kw):
228         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
229         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
230
231
232 class UnitEngine(dejavu.Unit):
233     """A factory for Unit Collections."""
234    
235     Owner = dejavu.UnitProperty(u'Owner')
236     Name = dejavu.UnitProperty(u'Name')
237     Created = dejavu.UnitProperty(u'Created', datetime.datetime)
238     FinalClassName = dejavu.UnitProperty(u'FinalClassName')
239     Expiration = dejavu.UnitProperty('Expiration', datetime.datetime)
240    
241     def __init__(self, **kwargs):
242         dejavu.Unit.__init__(self)
243         self.Created = datetime.datetime.today()
244         self.Owner = u''
245        
246         for k, v in kwargs.iteritems():
247             setattr(self, k, v)
248    
249     def on_forget(self):
250         # Rules and Snapshots shouldn't persist past
251         # the life of their Engines. Forget them.
252         for rule in self.rules():
253             rule.forget()
254         for snap in self.snapshots():
255             snap.forget()
256    
257     def on_recall(self):
258         if self.Expiration is not None:
259             if self.Expiration <= datetime.datetime.now():
260                 msg = ("Forgetting Engine %s. Exp = %s. Now = %s" %
261                        (self.ID, self.Expiration, datetime.datetime.now()))
262                 self.sandbox.arena.application.logger.info(msg)
263                 self.forget()
264                 raise dejavu.UnrecallableError
265             else:
266                 self.decay(minutes=15)
267    
268     def decay(self, **kw):
269         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
270         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
271    
272     def update_final_class(self):
273         results = {}
274         last_set = 1
275         for rule in self.rules():
276             last_set = rule.SetID
277             operation = rule.Operation
278             if operation in ('CREATE', 'TRANSFORM'):
279                 results[last_set] = rule.Operand
280             if operation == 'RETURN':
281                 break
282         if last_set in results:
283             self.FinalClassName = results[last_set]
284    
285     def final_class(self):
286         return self.sandbox.arena.class_by_name(self.FinalClassName)
287    
288     def rules(self):
289         """An ordered list of Rules for this Engine."""
290         f = logic.filter(EngineID=self.ID)
291         allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)]
292         allrules.sort(dejavu.sort(u'Sequence'))
293         return allrules
294    
295     def add_rule(self, Operation, SetID=None, Operand=None):
296         allrules = self.rules()
297         if isinstance(Operation, UnitEngineRule):
298             newRule = Operation
299         else:
300             if SetID is None:
301                 try:
302                     SetID = allrules[-1].SetID
303                 except IndexError:
304                     SetID = 1
305             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
306                                      Operand=Operand)
307        
308         try:
309             nextSeq = allrules[-1].Sequence + 1
310         except IndexError:
311             nextSeq = 0
312         newRule.Sequence = nextSeq
313        
314         newRule.EngineID = self.ID
315         newRule.Expiration = self.Expiration
316         self.sandbox.memorize(newRule)
317         self.update_final_class()
318    
319     def snapshots(self):
320         """Unit Collections obtained by executing the rules sometime in the past."""
321         f = logic.filter(EngineID=self.ID)
322         allSnap = [x for x in self.sandbox.recall(UnitCollection, f)]
323         allSnap.sort(dejavu.sort(u'Timestamp'))
324         return allSnap
325    
326     def take_snapshot(self, args={}):
327         """Execute the rules and return a Unit Collection (or None)."""
328         allrules = self.rules()
329         snap = RuleProcessor(self.sandbox).process(allrules, args)
330         if snap is not None:
331             snap.EngineID = self.ID
332             now = datetime.datetime.now()
333             snap.Timestamp = now
334             snap.decay(minutes=15)
335             self.sandbox.memorize(snap)
336         return snap
337    
338     def last_snapshot(self, args={}):
339         allSnaps = self.snapshots()
340         if len(allSnaps) == 0:
341             aSnap = self.take_snapshot(args)
342         else:
343             aSnap = allSnaps[-1]
344         return aSnap
345    
346     def immortalize(self):
347         self.Expiration = None
348         for rule in self.rules():
349             rule.Expiration = None
350    
351     def __copy__(self):
352         newUnit = dejavu.Unit.__copy__(self)
353         newUnit.Name = "Copy of %s" % newUnit.Name
354         newUnit.Created = datetime.datetime.now()
355         return newUnit
356    
357     def clone(self, user, temporary=True):
358         """Copy self and all Rules of self. Memorize automatically."""
359         newUnit = self.__copy__()
360         newUnit.Owner = user
361         if temporary:
362             newUnit.decay(minutes=15)
363         else:
364             newUnit.Expiration = None
365         self.sandbox.memorize(newUnit)
366         for rule in self.rules():
367             newRule = rule.__copy__()
368             newRule.EngineID = newUnit.ID
369             newRule.Expiration = newUnit.Expiration
370             self.sandbox.memorize(newRule)
371         return newUnit
372    
373     def permit(self, user, write_access=True):
374         if write_access:
375             return self.Owner in (u'Public', user)
376         else:
377             return self.Owner in ('System', 'Public', user)
378
379
380 class RuleProcessor(object):
381     """Processor for the Rules of a Unit Engine."""
382    
383     def __init__(self, sandbox):
384         self.sandbox = sandbox
385         self.arena = sandbox.arena
386    
387     def process(self, rules, args):
388         """Execute the rules and return a Unit Collection (or None)."""
389         self.sets = {}
390         self.args = args
391         final = None
392         for rule in rules:
393             operation = rule.Operation
394             func = getattr(self, 'visit_' + operation)
395             final = rule.SetID
396             func(final, rule.Operand)
397         if final is None:
398             return None
399         else:
400             return self.sets[final]
401    
402     def visit_COPY(self, setID, operand):
403         """Copy the set whose ID = operand into another set, whose ID = setID."""
404         A = self.sets[setID]
405         operand = int(operand)
406         if operand in self.sets:
407             # Overwrite the existing set.
408             B = self.sets[operand]
409         else:
410             # Create a new set.
411             B = UnitCollection(Type=A.Type)
412             self.sets[operand] = B
413         B.empty = A.empty
414         A.acquire()
415         B.acquire()
416         try:
417             B._IDs = A._IDs.copy()
418         finally:
419             A.release()
420             B.release()
421    
422     def visit_CREATE(self, setID, operand):
423         """Create an empty set. The next instruction is responsible to fill it."""
424         newset = UnitCollection(Type=operand)
425         newset.empty = True
426         self.sets[setID] = newset
427    
428     def realize_empty(self, setID):
429         """realize_empty(setID). Populate the specified set only if empty."""
430         A = self.sets[setID]
431         if hasattr(A, 'empty') and A.empty:
432             A.empty = False
433             A.acquire()
434             try:
435                 for unit in self.sandbox.recall(self.arena.class_by_name(A.Type)):
436                     A._IDs.add(unit.ID)
437             finally:
438                 A.release()
439    
440     def visit_DIFFERENCE(self, setID, operand):
441         self.realize_empty(setID)
442         A = self.sets[setID]
443         B = self.sets[int(operand)]
444         A.acquire()
445         B.acquire()
446         try:
447             A._IDs = A._IDs.difference(B._IDs)
448         finally:
449             A.release()
450             B.release()
451    
452     def visit_FILTER(self, setID, operand):
453         expr = pickle.loads(operand)
454         expr.bind_args(**self.args)
455         A = self.sets[setID]
456         if hasattr(A, 'empty') and A.empty:
457             A.empty = False
458             A.acquire()
459             try:
460                 cls = self.arena.class_by_name(A.Type)
461                 for unit in self.sandbox.recall(cls, expr):
462                     A._IDs.add(unit.ID)
463             finally:
464                 A.release()
465         else:
466             A.acquire()
467             try:
468                 cls = self.arena.class_by_name(A.Type)
469                 newset = sets.Set()
470                 for id in A._IDs:
471                     unit = self.sandbox.unit(cls, ID=id)
472                     if unit and expr.evaluate(unit):
473                         newset.add(id)
474                 A._IDs = newset
475             finally:
476                 A.release()
477    
478     def visit_FUNCTION(self, setID, operand):
479         func = self.arena.engine_functions[operand]
480        
481         A = self.sets[setID]
482         A.acquire()
483         try:
484             func(self.sandbox, A)
485         finally:
486             A.release()
487    
488     def visit_INTERSECTION(self, setID, operand):
489         self.realize_empty(setID)
490         A = self.sets[setID]
491         B = self.sets[int(operand)]
492         A.acquire()
493         B.acquire()
494         try:
495             A._IDs = A._IDs.intersection(B._IDs)
496         finally:
497             A.release()
498             B.release()
499    
500     def visit_RETURN(self, setID, operand):
501         self.realize_empty(setID)
502    
503     def visit_TRANSFORM(self, setID, operand):
504         """operand=far class name. Multiple hops are supported."""
505         self.realize_empty(setID)
506         A = self.sets[setID]
507         start = self.arena.class_by_name(A.Type)
508         end = self.arena.class_by_name(operand)
509         nodes = self.arena.associations.shortest_path(start, end)
510         if nodes is None:
511             raise KeyError("No association found between '%s' and '%s'"
512                            % (start, end))
513        
514         # Skip the first node, which should be A.Type
515         nodes.pop(0)
516         A.acquire()
517         try:
518             for eachType in nodes:
519                 # Add all associated Units to the collection A.
520                 oppfunc = getattr(start, eachType.__name__)
521                 cls = self.arena.class_by_name(A.Type)
522                 newset = sets.Set()
523                 for id in A._IDs:
524                     unit = self.sandbox.unit(cls, ID=id)
525                     if unit:
526                         for farUnit in oppfunc(unit):
527                             newset.add(farUnit.ID)
528                 A._IDs = newset
529                 start = eachType
530                 A.Type = eachType.__name__
531         finally:
532             A.release()
533    
534     def visit_UNION(self, setID, operand):
535         self.realize_empty(setID)
536         A = self.sets[setID]
537         B = self.sets[int(operand)]
538         A.acquire()
539         B.acquire()
540         try:
541             A._IDs = A._IDs.union(B._IDs)
542         finally:
543             A.release()
544             B.release()
545
Note: See TracBrowser for help on using the browser.