Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/branches/ldap/engines.py

Revision 404 (checked in by fumanchu, 5 years ago)

Better superclass calls inside engine classes.

  • Property svn:eol-style set to native
Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15
16 import dejavu
17 from dejavu import errors, logic, recur
18
19
20 class TemporaryUnit(dejavu.Unit):
21    
22     Expiration = dejavu.UnitProperty(datetime.datetime)
23    
24     def on_recall(self):
25         if self.Expiration is not None:
26             if self.Expiration <= datetime.datetime.now():
27                 self.forget()
28                 raise errors.UnrecallableError
29             else:
30                 self.decay(minutes=15)
31    
32     def decay(self, **kw):
33         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
34         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
35
36
37 class TemporarySweeper(recur.Worker):
38     """A worker to sweep out expired TemporaryUnit's."""
39    
40     def work(self):
41         """Start a cycle of scheduled work."""
42         now = datetime.datetime.now()
43         f = lambda x: x.Expiration != None and x.Expiration <= now
44         box = self.arena.new_sandbox()
45        
46         for cls in self.arena._registered_classes:
47             if issubclass(cls, TemporaryUnit):
48                 # Running box.recall will call TemporaryUnit.on_recall,
49                 # which should forget expired units.
50                 box.recall(cls, f)
51         box.flush_all()
52
53
54 class UnitCollection(TemporaryUnit):
55     """A Set of Unit identifiers.
56     
57     Type: Unit Type of all Units referenced by this collection.
58     
59     The Unit Collection is primarily for use as an index for Units.
60     Unit Engines use Expressions and other rules to transform a Collection
61     as a whole. These classes consume and produce Unit Collections.
62     The Unit Collection provides special methods for iteration, whether
63     reading or writing, to avoid errors common with multi-process/
64     multi-threaded access.
65     
66     UnitCollection is a subclass of Unit, so that it can be managed by
67     Sandboxes. However, due to the structure of the data contained in a
68     UnitCollection, it is recommended that Storage Managers use different
69     techniques to store and retrieve Unit Collections. They do not need
70     more than the ID's of their contained Units stored, since they will
71     recall such Units as needed. Not every Storage Manager is going to be
72     able to handle this kind of dynamic storage; deployers-- examine your
73     Storage Managers and make sure they can!
74     """
75    
76     Members = dejavu.UnitProperty(list)
77     EngineID = dejavu.UnitProperty(int, index=True)
78     Type = dejavu.UnitProperty()
79     Timestamp = dejavu.UnitProperty(datetime.datetime)
80    
81     def __init__(self, Members=None, **kwargs):
82         if Members is None:
83             Members = []
84         self._mutex = threading.RLock()
85         TemporaryUnit.__init__(self, Members=Members, **kwargs)
86    
87     def __setstate__(self, state):
88         self._mutex = threading.RLock()
89         TemporaryUnit.__setstate__(self, state)
90    
91     def acquire(self):
92         self._mutex.acquire(True)
93    
94     def release(self):
95         self._mutex.release()
96    
97     def __len__(self):
98         return len(self.Members)
99    
100     def add(self, ID):
101         self.acquire()
102         try:
103             if ID not in self.Members:
104                 self.Members.append(ID)
105         finally:
106             self.release()
107    
108     def unit_class(self):
109         return self.sandbox.arena.class_by_name(self.Type)
110    
111     def ids(self):
112         self.acquire()
113         try:
114             return self.Members[:]
115         finally:
116             self.release()
117    
118     def units(self, quota=None):
119         cls = self.unit_class()
120        
121         output = []
122         self.acquire()
123         try:
124             for i, eachID in enumerate(self.Members):
125                 if quota and i >= quota:
126                     break
127                 filter = dict(zip(cls.identifiers, eachID))
128                 unit = self.sandbox.unit(cls, **filter)
129                 if unit:
130                     output.append(unit)
131         finally:
132             self.release()
133         return output
134    
135     def xdict(self, attr):
136         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
137         product = {}
138         self.acquire()
139         try:
140             for unit in self.units():
141                 key = getattr(unit, attr)
142                 product.setdefault(key, []).append(unit)
143         finally:
144             self.release()
145         return product
146    
147     def __copy__(self):
148         newUnit = TemporaryUnit.__copy__(self)
149         newUnit.Members = self.Members[:]
150         return newUnit
151
152
153 operations = [              #       OPERAND
154     'COPY',                 # SetID of mixin
155     'CREATE',               # New type (= class.__name__)
156     'DIFFERENCE',           # SetID of mixin
157     'FILTER',               # logic.Expression
158     'FUNCTION',             # key into arena.engine_functions dict
159     'INTERSECTION',         # SetID of mixin
160     'RETURN',               #
161     'TRANSFORM',            # New type (= class.__name__)
162     'UNION',                # SetID of mixin
163     ]
164
165
166 class RuleProperty(dejavu.TriggerProperty):
167     def on_set(self, unit, oldvalue):
168         eng = unit.UnitEngine()
169         if eng:
170             eng.update_final_class()
171
172 class EngIDProperty(dejavu.TriggerProperty):
173     def on_set(self, unit, oldvalue):
174         eng = unit.sandbox.unit(UnitEngine, ID=oldvalue)
175         if eng:
176             eng.update_final_class()
177        
178         eng = unit.UnitEngine()
179         if eng:
180             eng.update_final_class()
181
182 class UnitEngineRule(TemporaryUnit):
183     """A Rule for Unit Engines."""
184    
185     Operation = RuleProperty(str)
186     SetID = RuleProperty(int)
187     Operand = RuleProperty(str, False, hints = {u'bytes': 0})
188     Sequence = RuleProperty(int)
189     EngineID = EngIDProperty(int, index=True)
190    
191     def __init__(self, Operation=None, SetID=None, Operand=None, **kwargs):
192         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
193         
194         TRANSFORM:
195             If the Operation is 'TRANSFORM', the Operand shall be the name of
196             a Unit type. The snapshot will consist of the identifiers of all
197             units of that Type which are associated with the current snapshot.
198         
199         FILTER:
200             If the Operation is 'FILTER', the Operand shall be a
201             logic.Expression, and the snapshot will consist of the
202             identifiers of Units which match the Expression.
203         
204         So, a typical Engine might have a set of rules which look like:
205             --Operation--   --Set-- --Operand--
206             CREATE          1       Invoice         # Full set
207             FILTER          1       (Expression)    # modifies Set 1
208             CREATE          2       Inventory       # Full set
209             FILTER          2       (Expression)    # modifies Set 2
210             FILTER          2       (Expression)    # modifies Set 2
211             TRANSFORM       2       Invoice         # modifies Set 2
212             DIFFERENCE      1       2               # Set1 -= Set2
213             RETURN          1                       # This is optional.
214         
215         The last RETURN statement is optional. If omitted, the last Set
216         touched will be returned.
217         
218         For all operations, the Set ID indicates which Set will be
219             modified by the operation. Using the above example, you can
220             see that for the DIFFERENCE operation, the Set which is modified
221             is Set 1.
222         """
223         if Operation == 'FILTER':
224             if not isinstance(Operand, basestring):
225                 if not isinstance(Operand, logic.Expression):
226                     # op can be a function
227                     Operand= logic.Expression(Operand)
228                 Operand = pickle.dumps(Operand)
229         TemporaryUnit.__init__(self, Operation=Operation, SetID=SetID,
230                                Operand=Operand, **kwargs)
231    
232     def __repr__(self):
233         op = self.Operand
234         if self.Operation == 'FILTER':
235             op = pickle.loads(op)
236         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
237                 % (self.Operation, self.SetID, repr(op)))
238    
239     def on_memorize(self):
240         eng = self.UnitEngine()
241         if eng:
242             eng.update_final_class()
243    
244     def on_forget(self):
245         eng = self.UnitEngine()
246         if eng:
247             eng.update_final_class()
248    
249     def expr(self):
250         """expr() -> If a FILTER rule, return the Expression, else None."""
251         if self.Operation == 'FILTER':
252             op = self.Operand
253             return pickle.loads(op)
254         return None
255
256
257 class UnitEngine(TemporaryUnit):
258     """A factory for Unit Collections."""
259    
260     Owner = dejavu.UnitProperty()
261     Name = dejavu.UnitProperty()
262     Created = dejavu.UnitProperty(datetime.datetime)
263     FinalClassName = dejavu.UnitProperty()
264    
265     def __init__(self, Created=None, Owner=u'', **kwargs):
266         if Created is None:
267             Created = datetime.datetime.today()
268         TemporaryUnit.__init__(self, Created=Created, Owner=Owner, **kwargs)
269    
270     def on_forget(self):
271         # Rules and Snapshots shouldn't persist past
272         # the life of their Engines. Forget them.
273         for rule in self.rules():
274             rule.forget()
275         for snap in self.snapshots():
276             snap.forget()
277    
278     def update_final_class(self):
279         results = {}
280         last_set = 1
281         for rule in self.rules():
282             last_set = rule.SetID
283             operation = rule.Operation
284             if operation in ('CREATE', 'TRANSFORM'):
285                 results[last_set] = rule.Operand
286             if operation == 'RETURN':
287                 break
288         if last_set in results:
289             self.FinalClassName = results[last_set]
290    
291     def final_class(self):
292         return self.sandbox.arena.class_by_name(self.FinalClassName)
293    
294     def rules(self):
295         """An ordered list of Rules for this Engine."""
296         allrules = [x for x in self.UnitEngineRule()]
297         allrules.sort(dejavu.sort(u'Sequence'))
298         return allrules
299    
300     def add_rule(self, Operation, SetID=None, Operand=None):
301         allrules = self.rules()
302         if isinstance(Operation, UnitEngineRule):
303             newRule = Operation
304         else:
305             if SetID is None:
306                 try:
307                     SetID = allrules[-1].SetID
308                 except IndexError:
309                     SetID = 1
310             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
311                                      Operand=Operand)
312        
313         try:
314             nextSeq = allrules[-1].Sequence + 1
315         except IndexError:
316             nextSeq = 0
317         newRule.Sequence = nextSeq
318        
319         newRule.EngineID = self.ID
320         newRule.Expiration = self.Expiration
321         self.sandbox.memorize(newRule)
322    
323     def snapshots(self):
324         """Unit Collections obtained by executing the rules sometime in the past."""
325         allSnap = self.sandbox.recall(UnitCollection, EngineID=self.ID)
326         allSnap.sort(dejavu.sort(u'Timestamp'))
327         return allSnap
328    
329     def take_snapshot(self, args={}):
330         """Execute the rules and return a Unit Collection (or None)."""
331         allrules = self.rules()
332         snap = RuleProcessor(self.sandbox).process(allrules, args)
333         if snap is not None:
334             snap.EngineID = self.ID
335             now = datetime.datetime.now()
336             snap.Timestamp = now
337             snap.decay(minutes=15)
338             self.sandbox.memorize(snap)
339         return snap
340    
341     def last_snapshot(self, args={}):
342         allSnaps = self.snapshots()
343         if len(allSnaps) == 0:
344             aSnap = self.take_snapshot(args)
345         else:
346             aSnap = allSnaps[-1]
347         return aSnap
348    
349     def immortalize(self):
350         self.Expiration = None
351         for rule in self.rules():
352             rule.Expiration = None
353    
354     def __copy__(self):
355         newUnit = TemporaryUnit.__copy__(self)
356         newUnit.Name = "Copy of %s" % newUnit.Name
357         newUnit.Created = datetime.datetime.now()
358         return newUnit
359    
360     def clone(self, user, temporary=True):
361         """Copy self and all Rules of self. Memorize automatically."""
362         newUnit = self.__copy__()
363         newUnit.Owner = user
364         if temporary:
365             newUnit.decay(minutes=15)
366         else:
367             newUnit.Expiration = None
368         self.sandbox.memorize(newUnit)
369         for rule in self.rules():
370             newRule = rule.__copy__()
371             newRule.EngineID = newUnit.ID
372             newRule.Expiration = newUnit.Expiration
373             self.sandbox.memorize(newRule)
374         return newUnit
375    
376     def permit(self, user, write_access=True):
377         if write_access:
378             return self.Owner in (u'Public', user)
379         else:
380             return self.Owner in ('System', 'Public', user)
381
382 UnitEngine.one_to_many('ID', UnitEngineRule, 'EngineID')
383 UnitEngine.one_to_many('ID', UnitCollection, 'EngineID')
384
385
386 class RuleProcessor(object):
387     """Processor for the Rules of a Unit Engine."""
388    
389     def __init__(self, sandbox):
390         self.sandbox = sandbox
391         self.arena = sandbox.arena
392    
393     def process(self, rules, args):
394         """Execute the rules and return a Unit Collection (or None)."""
395         self.sets = {}
396         self.args = args
397         final = None
398         for rule in rules:
399             operation = rule.Operation
400             func = getattr(self, 'visit_' + operation)
401             final = rule.SetID
402             func(final, rule.Operand)
403         if final is None:
404             return None
405         else:
406             return self.sets[final]
407    
408     def visit_COPY(self, setID, operand):
409         """Copy the set whose ID = operand into another set, whose ID = setID."""
410         A = self.sets[setID]
411         setID2 = int(operand)
412         if setID2 in self.sets:
413             # Overwrite the existing set.
414             B = self.sets[setID2]
415         else:
416             # Create a new set.
417             B = UnitCollection(Type=A.Type)
418             self.sets[setID2] = B
419         B.universal = A.universal
420         A.acquire()
421         B.acquire()
422         try:
423             B.Members = A.Members[:]
424         finally:
425             A.release()
426             B.release()
427    
428     def visit_CREATE(self, setID, operand):
429         """Create a universal set. Actual population may be deferred."""
430         newset = UnitCollection(Type=operand)
431         newset.universal = True
432         self.sets[setID] = newset
433    
434     def visit_DIFFERENCE(self, setID, operand):
435         A = self.sets[setID]
436        
437         setID2 = int(operand)
438         self.realize_universal(setID2)
439         B = self.sets[setID2]
440        
441         A.acquire()
442         B.acquire()
443         try:
444             if B.universal:
445                 # B should be every Unit, which means the difference
446                 # will always be an empty set.
447                 A.Members = []
448             else:
449                 # B is a subset of A.
450                 if A.universal:
451                     cls = self.arena.class_by_name(A.Type)
452                     mem = A.Members
453                     for unit in self.sandbox.recall(cls):
454                         id = unit.identity()
455                         if id not in B.Members:
456                             mem.append(id)
457                 else:
458                     A.Members = [x for x in A.Members if x not in B.Members]
459             A.universal = False
460         finally:
461             A.release()
462             B.release()
463    
464     def visit_FILTER(self, setID, operand):
465         expr = pickle.loads(operand)
466         expr.bind_args(**self.args)
467        
468         A = self.sets[setID]
469         A.acquire()
470         try:
471             cls = self.arena.class_by_name(A.Type)
472             mem = A.Members
473             if A.universal:
474                 A.universal = False
475                 for unit in self.sandbox.recall(cls, expr):
476                     id = unit.identity()
477                     if id not in mem:
478                         mem.append(id)
479             else:
480                 newset = []
481                 for id in mem:
482                     filter = dict(zip(cls.identifiers, id))
483                     unit = self.sandbox.unit(cls, **filter)
484                     if unit and expr(unit):
485                         newset.append(id)
486                 A.Members = newset
487         finally:
488             A.release()
489    
490     def visit_FUNCTION(self, setID, operand):
491         func = self.arena.engine_functions[operand]
492        
493         A = self.sets[setID]
494         A.acquire()
495         try:
496             # Notice we do not populate universals before passing to func.
497             func(self.sandbox, A)
498         finally:
499             A.release()
500    
501     def visit_INTERSECTION(self, setID, operand):
502         A = self.sets[setID]
503        
504         setID2 = int(operand)
505         B = self.sets[setID2]
506        
507         A.acquire()
508         B.acquire()
509         try:
510             if B.universal:
511                 # If A is universal, (A and B) = universal. Defer.
512                 # If A is not universal, (A and B) = A. Pass.
513                 pass
514             else:
515                 if A.universal:
516                     # B is a subset of A. (A and B) = B. Copy B to A.
517                     A.Members = B.Members[:]
518                     A.universal = False
519                 else:
520                     A.Members = [x for x in A.Members if x in B.Members]
521         finally:
522             A.release()
523             B.release()
524    
525     def visit_RETURN(self, setID, operand):
526         self.realize_universal(setID)
527    
528     def realize_universal(self, setID):
529         A = self.sets[setID]
530         if A.universal:
531             A.acquire()
532             try:
533                 A.universal = False
534                 cls = self.arena.class_by_name(A.Type)
535                 mem = A.Members
536                 for unit in self.sandbox.recall(cls):
537                     mem.append(unit.identity())
538             finally:
539                 A.release()
540    
541     def visit_TRANSFORM(self, setID, operand):
542         """visit_TRANSFORM(setID, operand=farClass name). Multiple hops OK."""
543         A = self.sets[setID]
544         start = self.arena.class_by_name(A.Type)
545         end = self.arena.class_by_name(operand)
546         nodes = self.arena.associations.shortest_path(start, end)
547         if nodes is None:
548             raise KeyError("No association found between '%s' and '%s'"
549                            % (start, end))
550        
551         # Skip the first node, which should be A.Type
552         nodes.pop(0)
553         A.acquire()
554         try:
555             for eachType in nodes:
556                 # Add all associated Units to the collection A.
557                 ua = start._associations[eachType.__name__]
558                 cls = self.arena.class_by_name(A.Type)
559                 newset = []
560                 if A.universal:
561                     for unit in self.sandbox.recall(cls):
562                         farUnits = ua.__get__(unit)()
563                         if not ua.to_many:
564                             if farUnits is None:
565                                 farUnits = []
566                             else:
567                                 farUnits = [farUnits]
568                         for farUnit in farUnits:
569                             farid = farUnit.identity()
570                             if farid not in newset:
571                                 newset.append(farid)
572                     A.universal = False
573                 else:
574                     for id in A.Members:
575                         filter = dict(zip(cls.identifiers, id))
576                         unit = self.sandbox.unit(cls, **filter)
577                         if unit:
578                             farUnits = ua.__get__(unit)()
579                             if not ua.to_many:
580                                 if farUnits is None:
581                                     farUnits = []
582                                 else:
583                                     farUnits = [farUnits]
584                             for farUnit in farUnits:
585                                 farid = farUnit.identity()
586                                 if farid not in newset:
587                                     newset.append(farid)
588                 A.Members = newset
589                 start = eachType
590                 A.Type = eachType.__name__
591         finally:
592             A.release()
593    
594     def visit_UNION(self, setID, operand):
595         A = self.sets[setID]
596         setID2 = int(operand)
597         B = self.sets[setID2]
598        
599         A.acquire()
600         B.acquire()
601         try:
602             if B.universal:
603                 # (A or B) = Universal set. Make A universal.
604                 A.universal = True
605                 A.Members = []
606             else:
607                 if A.universal:
608                     pass
609                 else:
610                     amem = A.Members
611                     for id in B.Members:
612                         if id not in amem:
613                             amem.append(id)
614         finally:
615             A.release()
616             B.release()
617
618
619 def register_classes(arena):
620     arena.register(UnitCollection)
621     arena.register(UnitEngine)
622     arena.register(UnitEngineRule)
623
Note: See TracBrowser for help on using the browser.