Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 27 (checked in by fumanchu, 9 years ago)

Inlined handling of universal sets for better strategies/performance.

Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 from dejavu import logic
17 import xray
18 import sets
19
20
21 class UnitCollection(dejavu.Unit):
22     """A Set of Unit IDs.
23     
24     Type: Unit Type of all Units referenced by this collection.
25     
26     The Unit Collection is primarily for use as an index for Units.
27     Unit Engines use Expressions and other rules to transform a Collection
28     as a whole. These classes consume and produce Unit Collections.
29     The Unit Collection provides special methods for iteration, whether
30     reading or writing, to avoid errors common with multi-process/
31     multi-threaded access.
32     
33     UnitCollection is a subclass of Unit, so that it can be managed by
34     Sandboxes. However, due to the structure of the data contained in a
35     UnitCollection, it is recommended that Storage Managers use different
36     techniques to store and retrieve Unit Collections. They do not need
37     more than the ID's of their contained Units stored, since they will
38     recall such Units as needed. Not every Storage Manager is going to be
39     able to handle this kind of dynamic storage; deployers-- examine your
40     Storage Managers and make sure they can!
41     """
42    
43     Members = dejavu.UnitProperty(u'Members', list)
44     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
45     Type = dejavu.UnitProperty(u'Type')
46     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
47     Timestamp = dejavu.UnitProperty(u'Timestamp', datetime.datetime)
48    
49     def __init__(self, **kwargs):
50         dejavu.Unit.__init__(self)
51         self.Members = []
52         self._mutex = threading.RLock()
53        
54         for k, v in kwargs.iteritems():
55             setattr(self, k, v)
56    
57     def __getstate__(self):
58         return (self._properties, self.dirty)
59    
60     def __setstate__(self, state):
61         self.sandbox = None
62         self._mutex = threading.RLock()
63         self._properties, self.dirty = state
64    
65     def acquire(self):
66         self._mutex.acquire(True)
67    
68     def release(self):
69         self._mutex.release()
70    
71     def __len__(self):
72         return len(self.Members)
73    
74     def add(self, ID):
75         self.acquire()
76         try:
77             if ID not in self.Members:
78                 self.Members.append(ID)
79         finally:
80             self.release()
81    
82     def unit_class(self):
83         return self.sandbox.arena.class_by_name(self.Type)
84    
85     def ids(self):
86         self.acquire()
87         try:
88             return self.Members[:]
89         finally:
90             self.release()
91    
92     def units(self, quota=None):
93         cls = self.unit_class()
94         output = []
95         self.acquire()
96         try:
97             for i, eachID in enumerate(self.Members):
98                 if quota and i >= quota:
99                     break
100                 unit = self.sandbox.unit(cls, ID=eachID)
101                 if unit:
102                     output.append(unit)
103         finally:
104             self.release()
105         return output
106    
107     def xdict(self, attr):
108         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
109         product = {}
110         self.acquire()
111         try:
112             for unit in self.units():
113                 key = getattr(unit, attr)
114                 product.setdefault(key, []).append(unit)
115         finally:
116             self.release()
117         return product
118    
119     def __copy__(self):
120         newUnit = dejavu.Unit.__copy__(self)
121         newUnit.Members = self.Members[:]
122         return newUnit
123    
124     def on_recall(self):
125         if self.Expiration is not None:
126             if self.Expiration <= datetime.datetime.now():
127                 self.forget()
128                 raise dejavu.UnrecallableError
129             else:
130                 self.decay(minutes=15)
131    
132     def decay(self, **kw):
133         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
134         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
135
136
137 operations = [              #       OPERAND
138     'COPY',                 # SetID of mixin
139     'CREATE',               # New type (= class.__name__)
140     'DIFFERENCE',           # SetID of mixin
141     'FILTER',               # logic.Expression
142     'FUNCTION',             # key into arena.engine_functions dict
143     'INTERSECTION',         # SetID of mixin
144     'RETURN',               #
145     'TRANSFORM',            # New type (= class.__name__)
146     'UNION',                # SetID of mixin
147     ]
148
149
150 class RuleProperty(dejavu.UnitProperty):
151     def post(self, unit, value):
152         eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
153         if eng:
154             eng.update_final_class()
155
156 class UnitEngineRule(dejavu.Unit):
157     """A Rule for Unit Engines."""
158    
159     Operation = RuleProperty(u'Operation', str)
160     SetID = RuleProperty(u'SetID', int)
161     Operand = RuleProperty(u'Operand', str, False, hints = {u'Size': 0})
162     Sequence = RuleProperty(u'Sequence', int)
163     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
164     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
165    
166     def __init__(self, **kwargs):
167         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
168         
169         FILTER:
170             If the Operation is 'FILTER', the Operand shall be a
171             logic.Expression, and the snapshot will consist of the IDs of
172             Units which match the Expression.
173         
174         Everything else:
175             transforms: the snapshot will consist of IDs of all units
176                 which are associated with the current snapshot.
177             union, difference, and intersection: these all take a setID.
178         
179         So, a typical Engine might have a set of rules which look like:
180             --Operation--   --Set-- --Operand--
181             CREATE          1       Invoice         # Full set
182             FILTER          1       (Expression)    # modifies Set 1
183             CREATE          2       Inventory       # Full set
184             FILTER          2       (Expression)    # modifies Set 2
185             FILTER          2       (Expression)    # modifies Set 2
186             TRANSFORM       2       Invoice         # modifies Set 2
187             DIFFERENCE      1       2               # Set1 -= Set2
188             RETURN          1                       # This is optional.
189         
190         The last RETURN statement is optional. If omitted, the last Set
191         touched will be returned.
192         
193         For all operations, the Set ID indicates which Set will be
194             modified by the operation. Using the above example, you can
195             see that for the DIFFERENCE operation, the Set which is modified
196             is Set 1.
197         """
198         dejavu.Unit.__init__(self)
199        
200         if kwargs.get('Operation', '') == 'FILTER':
201             if not isinstance(kwargs.get('Operand'), (str, unicode)):
202                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
203        
204         for k, v in kwargs.iteritems():
205             setattr(self, k, v)
206    
207     def __repr__(self):
208         op = self.Operand
209         if self.Operation == 'FILTER':
210             op = pickle.loads(op)
211         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
212                 % (self.Operation, self.SetID, repr(op)))
213    
214     def expr(self):
215         """expr() -> If a FILTER rule, return the Expression, else None."""
216         if self.Operation == 'FILTER':
217             op = self.Operand
218             return pickle.loads(op)
219         return None
220    
221     def on_recall(self):
222         if self.Expiration is not None:
223             if self.Expiration <= datetime.datetime.now():
224                 self.forget()
225                 raise dejavu.UnrecallableError
226             else:
227                 self.decay(minutes=15)
228    
229     def decay(self, **kw):
230         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
231         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
232
233
234 class UnitEngine(dejavu.Unit):
235     """A factory for Unit Collections."""
236    
237     Owner = dejavu.UnitProperty(u'Owner')
238     Name = dejavu.UnitProperty(u'Name')
239     Created = dejavu.UnitProperty(u'Created', datetime.datetime)
240     FinalClassName = dejavu.UnitProperty(u'FinalClassName')
241     Expiration = dejavu.UnitProperty('Expiration', datetime.datetime)
242    
243     def __init__(self, **kwargs):
244         dejavu.Unit.__init__(self)
245         self.Created = datetime.datetime.today()
246         self.Owner = u''
247        
248         for k, v in kwargs.iteritems():
249             setattr(self, k, v)
250    
251     def on_forget(self):
252         # Rules and Snapshots shouldn't persist past
253         # the life of their Engines. Forget them.
254         for rule in self.rules():
255             rule.forget()
256         for snap in self.snapshots():
257             snap.forget()
258    
259     def on_recall(self):
260         if self.Expiration is not None:
261             if self.Expiration <= datetime.datetime.now():
262                 msg = ("Forgetting Engine %s. Exp = %s. Now = %s" %
263                        (self.ID, self.Expiration, datetime.datetime.now()))
264                 self.sandbox.arena.application.logger.info(msg)
265                 self.forget()
266                 raise dejavu.UnrecallableError
267             else:
268                 self.decay(minutes=15)
269    
270     def decay(self, **kw):
271         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
272         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
273    
274     def update_final_class(self):
275         results = {}
276         last_set = 1
277         for rule in self.rules():
278             last_set = rule.SetID
279             operation = rule.Operation
280             if operation in ('CREATE', 'TRANSFORM'):
281                 results[last_set] = rule.Operand
282             if operation == 'RETURN':
283                 break
284         if last_set in results:
285             self.FinalClassName = results[last_set]
286    
287     def final_class(self):
288         return self.sandbox.arena.class_by_name(self.FinalClassName)
289    
290     def rules(self):
291         """An ordered list of Rules for this Engine."""
292         f = logic.filter(EngineID=self.ID)
293         allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)]
294         allrules.sort(dejavu.sort(u'Sequence'))
295         return allrules
296    
297     def add_rule(self, Operation, SetID=None, Operand=None):
298         allrules = self.rules()
299         if isinstance(Operation, UnitEngineRule):
300             newRule = Operation
301         else:
302             if SetID is None:
303                 try:
304                     SetID = allrules[-1].SetID
305                 except IndexError:
306                     SetID = 1
307             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
308                                      Operand=Operand)
309        
310         try:
311             nextSeq = allrules[-1].Sequence + 1
312         except IndexError:
313             nextSeq = 0
314         newRule.Sequence = nextSeq
315        
316         newRule.EngineID = self.ID
317         newRule.Expiration = self.Expiration
318         self.sandbox.memorize(newRule)
319         self.update_final_class()
320    
321     def snapshots(self):
322         """Unit Collections obtained by executing the rules sometime in the past."""
323         f = logic.filter(EngineID=self.ID)
324         allSnap = [x for x in self.sandbox.recall(UnitCollection, f)]
325         allSnap.sort(dejavu.sort(u'Timestamp'))
326         return allSnap
327    
328     def take_snapshot(self, args={}):
329         """Execute the rules and return a Unit Collection (or None)."""
330         allrules = self.rules()
331         snap = RuleProcessor(self.sandbox).process(allrules, args)
332         if snap is not None:
333             snap.EngineID = self.ID
334             now = datetime.datetime.now()
335             snap.Timestamp = now
336             snap.decay(minutes=15)
337             self.sandbox.memorize(snap)
338         return snap
339    
340     def last_snapshot(self, args={}):
341         allSnaps = self.snapshots()
342         if len(allSnaps) == 0:
343             aSnap = self.take_snapshot(args)
344         else:
345             aSnap = allSnaps[-1]
346         return aSnap
347    
348     def immortalize(self):
349         self.Expiration = None
350         for rule in self.rules():
351             rule.Expiration = None
352    
353     def __copy__(self):
354         newUnit = dejavu.Unit.__copy__(self)
355         newUnit.Name = "Copy of %s" % newUnit.Name
356         newUnit.Created = datetime.datetime.now()
357         return newUnit
358    
359     def clone(self, user, temporary=True):
360         """Copy self and all Rules of self. Memorize automatically."""
361         newUnit = self.__copy__()
362         newUnit.Owner = user
363         if temporary:
364             newUnit.decay(minutes=15)
365         else:
366             newUnit.Expiration = None
367         self.sandbox.memorize(newUnit)
368         for rule in self.rules():
369             newRule = rule.__copy__()
370             newRule.EngineID = newUnit.ID
371             newRule.Expiration = newUnit.Expiration
372             self.sandbox.memorize(newRule)
373         return newUnit
374    
375     def permit(self, user, write_access=True):
376         if write_access:
377             return self.Owner in (u'Public', user)
378         else:
379             return self.Owner in ('System', 'Public', user)
380
381
382 class RuleProcessor(object):
383     """Processor for the Rules of a Unit Engine."""
384    
385     def __init__(self, sandbox):
386         self.sandbox = sandbox
387         self.arena = sandbox.arena
388    
389     def process(self, rules, args):
390         """Execute the rules and return a Unit Collection (or None)."""
391         self.sets = {}
392         self.args = args
393         final = None
394         for rule in rules:
395             operation = rule.Operation
396             func = getattr(self, 'visit_' + operation)
397             final = rule.SetID
398             func(final, rule.Operand)
399         if final is None:
400             return None
401         else:
402             return self.sets[final]
403    
404     def visit_COPY(self, setID, operand):
405         """Copy the set whose ID = operand into another set, whose ID = setID."""
406         A = self.sets[setID]
407         setID2 = int(operand)
408         if setID2 in self.sets:
409             # Overwrite the existing set.
410             B = self.sets[setID2]
411         else:
412             # Create a new set.
413             B = UnitCollection(Type=A.Type)
414             self.sets[setID2] = B
415         B.universal = A.universal
416         A.acquire()
417         B.acquire()
418         try:
419             B.Members = A.Members[:]
420         finally:
421             A.release()
422             B.release()
423    
424     def visit_CREATE(self, setID, operand):
425         """Create a universal set. Actual population may be deferred."""
426         newset = UnitCollection(Type=operand)
427         newset.universal = True
428         self.sets[setID] = newset
429    
430     def visit_DIFFERENCE(self, setID, operand):
431         A = self.sets[setID]
432        
433         setID2 = int(operand)
434         self.realize_universal(setID2)
435         B = self.sets[setID2]
436        
437         A.acquire()
438         B.acquire()
439         try:
440             if B.universal:
441                 # B should be every Unit, which means the difference
442                 # will always be an empty set.
443                 A.Members = []
444             else:
445                 # B is a subset of A.
446                 if A.universal:
447                     cls = self.arena.class_by_name(A.Type)
448                     mem = A.Members
449                     for unit in self.sandbox.recall(cls):
450                         id = unit.ID
451                         if id not in B.Members:
452                             mem.append(id)
453                 else:
454                     A.Members = [x for x in A.Members if x not in B.Members]
455             A.universal = False
456         finally:
457             A.release()
458             B.release()
459    
460     def visit_FILTER(self, setID, operand):
461         expr = pickle.loads(operand)
462         expr.bind_args(**self.args)
463         A = self.sets[setID]
464         A.acquire()
465         try:
466             cls = self.arena.class_by_name(A.Type)
467             mem = A.Members
468             if A.universal:
469                 A.universal = False
470                 for unit in self.sandbox.recall(cls, expr):
471                     id = unit.ID
472                     if id not in mem:
473                         mem.append(id)
474             else:
475                 newset = []
476                 for id in mem:
477                     unit = self.sandbox.unit(cls, ID=id)
478                     if unit and expr.evaluate(unit):
479                         newset.append(id)
480                 A.Members = newset
481         finally:
482             A.release()
483    
484     def visit_FUNCTION(self, setID, operand):
485         func = self.arena.engine_functions[operand]
486        
487         A = self.sets[setID]
488         A.acquire()
489         try:
490             # Notice we do not populate universals before passing to func.
491             func(self.sandbox, A)
492         finally:
493             A.release()
494    
495     def visit_INTERSECTION(self, setID, operand):
496         A = self.sets[setID]
497        
498         setID2 = int(operand)
499         B = self.sets[setID2]
500        
501         A.acquire()
502         B.acquire()
503         try:
504             if B.universal:
505                 # If A is universal, (A and B) = universal. Defer.
506                 # If A is not universal, (A and B) = A. Pass.
507                 pass
508             else:
509                 if A.universal:
510                     # B is a subset of A. (A and B) = B. Copy B to A.
511                     A.Members = B.Members[:]
512                     A.universal = False
513                 else:
514                     A.Members = [x for x in A.Members if x in B.Members]
515         finally:
516             A.release()
517             B.release()
518    
519     def visit_RETURN(self, setID, operand):
520         self.realize_universal(setID)
521    
522     def realize_universal(self, setID):
523         A = self.sets[setID]
524         if A.universal:
525             A.acquire()
526             try:
527                 A.universal = False
528                 cls = self.arena.class_by_name(A.Type)
529                 mem = A.Members
530                 for unit in self.sandbox.recall(cls):
531                     mem.append(unit.ID)
532             finally:
533                 A.release()
534    
535     def visit_TRANSFORM(self, setID, operand):
536         """operand=far class name. Multiple hops are supported."""
537         A = self.sets[setID]
538         start = self.arena.class_by_name(A.Type)
539         end = self.arena.class_by_name(operand)
540         nodes = self.arena.associations.shortest_path(start, end)
541         if nodes is None:
542             raise KeyError("No association found between '%s' and '%s'"
543                            % (start, end))
544        
545         # Skip the first node, which should be A.Type
546         nodes.pop(0)
547         A.acquire()
548         try:
549             for eachType in nodes:
550                 # Add all associated Units to the collection A.
551                 oppfunc = getattr(start, eachType.__name__)
552                 cls = self.arena.class_by_name(A.Type)
553                 newset = []
554                 if A.universal:
555                     for unit in self.sandbox.recall(cls):
556                         for farUnit in oppfunc(unit):
557                             farid = farUnit.ID
558                             if farid not in newset:
559                                 newset.append(farid)
560                     A.universal = False
561                 else:
562                     for id in A.Members:
563                         unit = self.sandbox.unit(cls, ID=id)
564                         if unit:
565                             for farUnit in oppfunc(unit):
566                                 farid = farUnit.ID
567                                 if farid not in newset:
568                                     newset.append(farid)
569                 A.Members = newset
570                 start = eachType
571                 A.Type = eachType.__name__
572         finally:
573             A.release()
574    
575     def visit_UNION(self, setID, operand):
576         A = self.sets[setID]
577         setID2 = int(operand)
578         B = self.sets[setID2]
579        
580         A.acquire()
581         B.acquire()
582         try:
583             if B.universal:
584                 # (A or B) = Universal set. Make A universal.
585                 A.universal = True
586                 A.Members = []
587             else:
588                 if A.universal:
589                     pass
590                 else:
591                     amem = A.Members
592                     for id in B.Members:
593                         if id not in amem:
594                             amem.append(id)
595         finally:
596             A.release()
597             B.release()
598
Note: See TracBrowser for help on using the browser.