Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 88 (checked in by fumanchu, 8 years ago)

Set eol-style:native on all .py files.

  • Property svn:eol-style set to native
Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 from dejavu import logic, associate
17 import sets
18
19
20 class UnitCollection(dejavu.Unit):
21     """A Set of Unit IDs.
22     
23     Type: Unit Type of all Units referenced by this collection.
24     
25     The Unit Collection is primarily for use as an index for Units.
26     Unit Engines use Expressions and other rules to transform a Collection
27     as a whole. These classes consume and produce Unit Collections.
28     The Unit Collection provides special methods for iteration, whether
29     reading or writing, to avoid errors common with multi-process/
30     multi-threaded access.
31     
32     UnitCollection is a subclass of Unit, so that it can be managed by
33     Sandboxes. However, due to the structure of the data contained in a
34     UnitCollection, it is recommended that Storage Managers use different
35     techniques to store and retrieve Unit Collections. They do not need
36     more than the ID's of their contained Units stored, since they will
37     recall such Units as needed. Not every Storage Manager is going to be
38     able to handle this kind of dynamic storage; deployers-- examine your
39     Storage Managers and make sure they can!
40     """
41    
42     Members = dejavu.UnitProperty(list)
43     EngineID = dejavu.UnitProperty(int, index=True)
44     Type = dejavu.UnitProperty()
45     Expiration = dejavu.UnitProperty(datetime.datetime)
46     Timestamp = dejavu.UnitProperty(datetime.datetime)
47    
48     def __init__(self, **kwargs):
49         dejavu.Unit.__init__(self)
50         self.Members = []
51         self._mutex = threading.RLock()
52        
53         for k, v in kwargs.iteritems():
54             setattr(self, k, v)
55    
56     def __getstate__(self):
57         return (self._properties, self._initial_property_hash)
58    
59     def __setstate__(self, state):
60         self.sandbox = None
61         self._mutex = threading.RLock()
62         self._properties, self._initial_property_hash = state
63    
64     def acquire(self):
65         self._mutex.acquire(True)
66    
67     def release(self):
68         self._mutex.release()
69    
70     def __len__(self):
71         return len(self.Members)
72    
73     def add(self, ID):
74         self.acquire()
75         try:
76             if ID not in self.Members:
77                 self.Members.append(ID)
78         finally:
79             self.release()
80    
81     def unit_class(self):
82         return self.sandbox.arena.class_by_name(self.Type)
83    
84     def ids(self):
85         self.acquire()
86         try:
87             return self.Members[:]
88         finally:
89             self.release()
90    
91     def units(self, quota=None):
92         cls = self.unit_class()
93         output = []
94         self.acquire()
95         try:
96             for i, eachID in enumerate(self.Members):
97                 if quota and i >= quota:
98                     break
99                 unit = self.sandbox.unit(cls, ID=eachID)
100                 if unit:
101                     output.append(unit)
102         finally:
103             self.release()
104         return output
105    
106     def xdict(self, attr):
107         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
108         product = {}
109         self.acquire()
110         try:
111             for unit in self.units():
112                 key = getattr(unit, attr)
113                 product.setdefault(key, []).append(unit)
114         finally:
115             self.release()
116         return product
117    
118     def __copy__(self):
119         newUnit = dejavu.Unit.__copy__(self)
120         newUnit.Members = self.Members[:]
121         return newUnit
122    
123     def on_recall(self):
124         if self.Expiration is not None:
125             if self.Expiration <= datetime.datetime.now():
126                 self.forget()
127                 raise dejavu.UnrecallableError
128             else:
129                 self.decay(minutes=15)
130    
131     def decay(self, **kw):
132         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
133         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
134
135
136 operations = [              #       OPERAND
137     'COPY',                 # SetID of mixin
138     'CREATE',               # New type (= class.__name__)
139     'DIFFERENCE',           # SetID of mixin
140     'FILTER',               # logic.Expression
141     'FUNCTION',             # key into arena.engine_functions dict
142     'INTERSECTION',         # SetID of mixin
143     'RETURN',               #
144     'TRANSFORM',            # New type (= class.__name__)
145     'UNION',                # SetID of mixin
146     ]
147
148
149 class RuleProperty(dejavu.UnitProperty):
150     def __set__(self, unit, value):
151         if self.coerce:
152             value = self.coerce(unit, value)
153         oldvalue = unit._properties[self.key]
154         if oldvalue != value:
155             unit._properties[self.key] = value
156             if unit.sandbox:
157                 eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
158                 if eng:
159                     eng.update_final_class()
160
161 class UnitEngineRule(dejavu.Unit):
162     """A Rule for Unit Engines."""
163    
164     Operation = RuleProperty(str)
165     SetID = RuleProperty(int)
166     Operand = RuleProperty(str, False, hints = {u'bytes': 0})
167     Sequence = RuleProperty(int)
168     EngineID = dejavu.UnitProperty(int, index=True)
169     Expiration = dejavu.UnitProperty(datetime.datetime)
170    
171     def __init__(self, **kwargs):
172         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
173         
174         TRANSFORM:
175             If the Operation is 'TRANSFORM', the Operand shall be the name
176             of a Unit type. The snapshot will consist of IDs of all units
177             of that Type which are associated with the current snapshot.
178         
179         FILTER:
180             If the Operation is 'FILTER', the Operand shall be a
181             logic.Expression, and the snapshot will consist of the IDs of
182             Units which match the Expression.
183         
184         So, a typical Engine might have a set of rules which look like:
185             --Operation--   --Set-- --Operand--
186             CREATE          1       Invoice         # Full set
187             FILTER          1       (Expression)    # modifies Set 1
188             CREATE          2       Inventory       # Full set
189             FILTER          2       (Expression)    # modifies Set 2
190             FILTER          2       (Expression)    # modifies Set 2
191             TRANSFORM       2       Invoice         # modifies Set 2
192             DIFFERENCE      1       2               # Set1 -= Set2
193             RETURN          1                       # This is optional.
194         
195         The last RETURN statement is optional. If omitted, the last Set
196         touched will be returned.
197         
198         For all operations, the Set ID indicates which Set will be
199             modified by the operation. Using the above example, you can
200             see that for the DIFFERENCE operation, the Set which is modified
201             is Set 1.
202         """
203         dejavu.Unit.__init__(self)
204        
205         if kwargs.get('Operation', '') == 'FILTER':
206             if not isinstance(kwargs.get('Operand'), (str, unicode)):
207                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
208        
209         for k, v in kwargs.iteritems():
210             setattr(self, k, v)
211    
212     def __repr__(self):
213         op = self.Operand
214         if self.Operation == 'FILTER':
215             op = pickle.loads(op)
216         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
217                 % (self.Operation, self.SetID, repr(op)))
218    
219     def expr(self):
220         """expr() -> If a FILTER rule, return the Expression, else None."""
221         if self.Operation == 'FILTER':
222             op = self.Operand
223             return pickle.loads(op)
224         return None
225    
226     def on_recall(self):
227         if self.Expiration is not None:
228             if self.Expiration <= datetime.datetime.now():
229                 self.forget()
230                 raise dejavu.UnrecallableError
231             else:
232                 self.decay(minutes=15)
233    
234     def decay(self, **kw):
235         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
236         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
237
238
239 class UnitEngine(dejavu.Unit):
240     """A factory for Unit Collections."""
241    
242     Owner = dejavu.UnitProperty()
243     Name = dejavu.UnitProperty()
244     Created = dejavu.UnitProperty(datetime.datetime)
245     FinalClassName = dejavu.UnitProperty()
246     Expiration = dejavu.UnitProperty(datetime.datetime)
247    
248     def __init__(self, **kwargs):
249         dejavu.Unit.__init__(self)
250         self.Created = datetime.datetime.today()
251         self.Owner = u''
252        
253         for k, v in kwargs.iteritems():
254             setattr(self, k, v)
255    
256     def on_forget(self):
257         # Rules and Snapshots shouldn't persist past
258         # the life of their Engines. Forget them.
259         for rule in self.rules():
260             rule.forget()
261         for snap in self.snapshots():
262             snap.forget()
263    
264     def on_recall(self):
265         if self.Expiration is not None:
266             if self.Expiration <= datetime.datetime.now():
267                 self.forget()
268                 raise dejavu.UnrecallableError
269             else:
270                 self.decay(minutes=15)
271    
272     def decay(self, **kw):
273         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
274         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
275    
276     def update_final_class(self):
277         results = {}
278         last_set = 1
279         for rule in self.rules():
280             last_set = rule.SetID
281             operation = rule.Operation
282             if operation in ('CREATE', 'TRANSFORM'):
283                 results[last_set] = rule.Operand
284             if operation == 'RETURN':
285                 break
286         if last_set in results:
287             self.FinalClassName = results[last_set]
288    
289     def final_class(self):
290         return self.sandbox.arena.class_by_name(self.FinalClassName)
291    
292     def rules(self):
293         """An ordered list of Rules for this Engine."""
294         allrules = [x for x in self.UnitEngineRule()]
295         allrules.sort(dejavu.sort(u'Sequence'))
296         return allrules
297    
298     def add_rule(self, Operation, SetID=None, Operand=None):
299         allrules = self.rules()
300         if isinstance(Operation, UnitEngineRule):
301             newRule = Operation
302         else:
303             if SetID is None:
304                 try:
305                     SetID = allrules[-1].SetID
306                 except IndexError:
307                     SetID = 1
308             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
309                                      Operand=Operand)
310        
311         try:
312             nextSeq = allrules[-1].Sequence + 1
313         except IndexError:
314             nextSeq = 0
315         newRule.Sequence = nextSeq
316        
317         newRule.EngineID = self.ID
318         newRule.Expiration = self.Expiration
319         self.sandbox.memorize(newRule)
320         self.update_final_class()
321    
322     def snapshots(self):
323         """Unit Collections obtained by executing the rules sometime in the past."""
324         f = logic.filter(EngineID=self.ID)
325         allSnap = self.sandbox.recall(UnitCollection, f)
326         allSnap.sort(dejavu.sort(u'Timestamp'))
327         return allSnap
328    
329     def take_snapshot(self, args={}):
330         """Execute the rules and return a Unit Collection (or None)."""
331         allrules = self.rules()
332         snap = RuleProcessor(self.sandbox).process(allrules, args)
333         if snap is not None:
334             snap.EngineID = self.ID
335             now = datetime.datetime.now()
336             snap.Timestamp = now
337             snap.decay(minutes=15)
338             self.sandbox.memorize(snap)
339         return snap
340    
341     def last_snapshot(self, args={}):
342         allSnaps = self.snapshots()
343         if len(allSnaps) == 0:
344             aSnap = self.take_snapshot(args)
345         else:
346             aSnap = allSnaps[-1]
347         return aSnap
348    
349     def immortalize(self):
350         self.Expiration = None
351         for rule in self.rules():
352             rule.Expiration = None
353    
354     def __copy__(self):
355         newUnit = dejavu.Unit.__copy__(self)
356         newUnit.Name = "Copy of %s" % newUnit.Name
357         newUnit.Created = datetime.datetime.now()
358         return newUnit
359    
360     def clone(self, user, temporary=True):
361         """Copy self and all Rules of self. Memorize automatically."""
362         newUnit = self.__copy__()
363         newUnit.Owner = user
364         if temporary:
365             newUnit.decay(minutes=15)
366         else:
367             newUnit.Expiration = None
368         self.sandbox.memorize(newUnit)
369         for rule in self.rules():
370             newRule = rule.__copy__()
371             newRule.EngineID = newUnit.ID
372             newRule.Expiration = newUnit.Expiration
373             self.sandbox.memorize(newRule)
374         return newUnit
375    
376     def permit(self, user, write_access=True):
377         if write_access:
378             return self.Owner in (u'Public', user)
379         else:
380             return self.Owner in ('System', 'Public', user)
381
382 associate(UnitEngine, 'ID', UnitEngineRule, 'EngineID')
383 associate(UnitEngine, 'ID', UnitCollection, 'EngineID')
384
385
386 class RuleProcessor(object):
387     """Processor for the Rules of a Unit Engine."""
388    
389     def __init__(self, sandbox):
390         self.sandbox = sandbox
391         self.arena = sandbox.arena
392    
393     def process(self, rules, args):
394         """Execute the rules and return a Unit Collection (or None)."""
395         self.sets = {}
396         self.args = args
397         final = None
398         for rule in rules:
399             operation = rule.Operation
400             func = getattr(self, 'visit_' + operation)
401             final = rule.SetID
402             func(final, rule.Operand)
403         if final is None:
404             return None
405         else:
406             return self.sets[final]
407    
408     def visit_COPY(self, setID, operand):
409         """Copy the set whose ID = operand into another set, whose ID = setID."""
410         A = self.sets[setID]
411         setID2 = int(operand)
412         if setID2 in self.sets:
413             # Overwrite the existing set.
414             B = self.sets[setID2]
415         else:
416             # Create a new set.
417             B = UnitCollection(Type=A.Type)
418             self.sets[setID2] = B
419         B.universal = A.universal
420         A.acquire()
421         B.acquire()
422         try:
423             B.Members = A.Members[:]
424         finally:
425             A.release()
426             B.release()
427    
428     def visit_CREATE(self, setID, operand):
429         """Create a universal set. Actual population may be deferred."""
430         newset = UnitCollection(Type=operand)
431         newset.universal = True
432         self.sets[setID] = newset
433    
434     def visit_DIFFERENCE(self, setID, operand):
435         A = self.sets[setID]
436        
437         setID2 = int(operand)
438         self.realize_universal(setID2)
439         B = self.sets[setID2]
440        
441         A.acquire()
442         B.acquire()
443         try:
444             if B.universal:
445                 # B should be every Unit, which means the difference
446                 # will always be an empty set.
447                 A.Members = []
448             else:
449                 # B is a subset of A.
450                 if A.universal:
451                     cls = self.arena.class_by_name(A.Type)
452                     mem = A.Members
453                     for unit in self.sandbox.recall(cls):
454                         id = unit.ID
455                         if id not in B.Members:
456                             mem.append(id)
457                 else:
458                     A.Members = [x for x in A.Members if x not in B.Members]
459             A.universal = False
460         finally:
461             A.release()
462             B.release()
463    
464     def visit_FILTER(self, setID, operand):
465         expr = pickle.loads(operand)
466         expr.bind_args(**self.args)
467        
468         A = self.sets[setID]
469         A.acquire()
470         try:
471             cls = self.arena.class_by_name(A.Type)
472             mem = A.Members
473             if A.universal:
474                 A.universal = False
475                 for unit in self.sandbox.recall(cls, expr):
476                     id = unit.ID
477                     if id not in mem:
478                         mem.append(id)
479             else:
480                 newset = []
481                 for id in mem:
482                     unit = self.sandbox.unit(cls, ID=id)
483                     if unit and expr.evaluate(unit):
484                         newset.append(id)
485                 A.Members = newset
486         finally:
487             A.release()
488    
489     def visit_FUNCTION(self, setID, operand):
490         func = self.arena.engine_functions[operand]
491        
492         A = self.sets[setID]
493         A.acquire()
494         try:
495             # Notice we do not populate universals before passing to func.
496             func(self.sandbox, A)
497         finally:
498             A.release()
499    
500     def visit_INTERSECTION(self, setID, operand):
501         A = self.sets[setID]
502        
503         setID2 = int(operand)
504         B = self.sets[setID2]
505        
506         A.acquire()
507         B.acquire()
508         try:
509             if B.universal:
510                 # If A is universal, (A and B) = universal. Defer.
511                 # If A is not universal, (A and B) = A. Pass.
512                 pass
513             else:
514                 if A.universal:
515                     # B is a subset of A. (A and B) = B. Copy B to A.
516                     A.Members = B.Members[:]
517                     A.universal = False
518                 else:
519                     A.Members = [x for x in A.Members if x in B.Members]
520         finally:
521             A.release()
522             B.release()
523    
524     def visit_RETURN(self, setID, operand):
525         self.realize_universal(setID)
526    
527     def realize_universal(self, setID):
528         A = self.sets[setID]
529         if A.universal:
530             A.acquire()
531             try:
532                 A.universal = False
533                 cls = self.arena.class_by_name(A.Type)
534                 mem = A.Members
535                 for unit in self.sandbox.recall(cls):
536                     mem.append(unit.ID)
537             finally:
538                 A.release()
539    
540     def visit_TRANSFORM(self, setID, operand):
541         """visit_TRANSFORM(setID, operand=farClass name). Multiple hops OK."""
542         A = self.sets[setID]
543         start = self.arena.class_by_name(A.Type)
544         end = self.arena.class_by_name(operand)
545         nodes = self.arena.associations.shortest_path(start, end)
546         if nodes is None:
547             raise KeyError("No association found between '%s' and '%s'"
548                            % (start, end))
549        
550         # Skip the first node, which should be A.Type
551         nodes.pop(0)
552         A.acquire()
553         try:
554             for eachType in nodes:
555                 # Add all associated Units to the collection A.
556                 oppfunc = getattr(start, eachType.__name__)
557                 cls = self.arena.class_by_name(A.Type)
558                 newset = []
559                 if A.universal:
560                     for unit in self.sandbox.recall(cls):
561                         for farUnit in oppfunc(unit):
562                             farid = farUnit.ID
563                             if farid not in newset:
564                                 newset.append(farid)
565                     A.universal = False
566                 else:
567                     for id in A.Members:
568                         unit = self.sandbox.unit(cls, ID=id)
569                         if unit:
570                             for farUnit in oppfunc(unit):
571                                 farid = farUnit.ID
572                                 if farid not in newset:
573                                     newset.append(farid)
574                 A.Members = newset
575                 start = eachType
576                 A.Type = eachType.__name__
577         finally:
578             A.release()
579    
580     def visit_UNION(self, setID, operand):
581         A = self.sets[setID]
582         setID2 = int(operand)
583         B = self.sets[setID2]
584        
585         A.acquire()
586         B.acquire()
587         try:
588             if B.universal:
589                 # (A or B) = Universal set. Make A universal.
590                 A.universal = True
591                 A.Members = []
592             else:
593                 if A.universal:
594                     pass
595                 else:
596                     amem = A.Members
597                     for id in B.Members:
598                         if id not in amem:
599                             amem.append(id)
600         finally:
601             A.release()
602             B.release()
603
604
605 def register_classes(arena):
606     arena.register(UnitCollection)
607     arena.register(UnitEngine)
608     arena.register(UnitEngineRule)
609
Note: See TracBrowser for help on using the browser.