Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 51 (checked in by fumanchu, 8 years ago)

1. Changed arena.roster to ._registered_classes. Dropped containers.Prism and .Index
2. First attempt at arena.migrate (migrate class data from one store to another).
3. Fixed weakref bug in db pooling.
4. db.SM.reserve() now writes all data, not just ID.
5. Various store bugs (adapters, mostly).

Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 from dejavu import logic, associate
17 import sets
18
19
20 class UnitCollection(dejavu.Unit):
21     """A Set of Unit IDs.
22     
23     Type: Unit Type of all Units referenced by this collection.
24     
25     The Unit Collection is primarily for use as an index for Units.
26     Unit Engines use Expressions and other rules to transform a Collection
27     as a whole. These classes consume and produce Unit Collections.
28     The Unit Collection provides special methods for iteration, whether
29     reading or writing, to avoid errors common with multi-process/
30     multi-threaded access.
31     
32     UnitCollection is a subclass of Unit, so that it can be managed by
33     Sandboxes. However, due to the structure of the data contained in a
34     UnitCollection, it is recommended that Storage Managers use different
35     techniques to store and retrieve Unit Collections. They do not need
36     more than the ID's of their contained Units stored, since they will
37     recall such Units as needed. Not every Storage Manager is going to be
38     able to handle this kind of dynamic storage; deployers-- examine your
39     Storage Managers and make sure they can!
40     """
41    
42     Members = dejavu.UnitProperty(list)
43     EngineID = dejavu.UnitProperty(int, index=True)
44     Type = dejavu.UnitProperty()
45     Expiration = dejavu.UnitProperty(datetime.datetime)
46     Timestamp = dejavu.UnitProperty(datetime.datetime)
47    
48     def __init__(self, **kwargs):
49         dejavu.Unit.__init__(self)
50         self.Members = []
51         self._mutex = threading.RLock()
52        
53         for k, v in kwargs.iteritems():
54             setattr(self, k, v)
55    
56     def __getstate__(self):
57         return (self._properties, self._initial_property_hash)
58    
59     def __setstate__(self, state):
60         self.sandbox = None
61         self._mutex = threading.RLock()
62         self._properties, self._initial_property_hash = state
63    
64     def acquire(self):
65         self._mutex.acquire(True)
66    
67     def release(self):
68         self._mutex.release()
69    
70     def __len__(self):
71         return len(self.Members)
72    
73     def add(self, ID):
74         self.acquire()
75         try:
76             if ID not in self.Members:
77                 self.Members.append(ID)
78         finally:
79             self.release()
80    
81     def unit_class(self):
82         return self.sandbox.arena.class_by_name(self.Type)
83    
84     def ids(self):
85         self.acquire()
86         try:
87             return self.Members[:]
88         finally:
89             self.release()
90    
91     def units(self, quota=None):
92         cls = self.unit_class()
93         output = []
94         self.acquire()
95         try:
96             for i, eachID in enumerate(self.Members):
97                 if quota and i >= quota:
98                     break
99                 unit = self.sandbox.unit(cls, ID=eachID)
100                 if unit:
101                     output.append(unit)
102         finally:
103             self.release()
104         return output
105    
106     def xdict(self, attr):
107         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
108         product = {}
109         self.acquire()
110         try:
111             for unit in self.units():
112                 key = getattr(unit, attr)
113                 product.setdefault(key, []).append(unit)
114         finally:
115             self.release()
116         return product
117    
118     def __copy__(self):
119         newUnit = dejavu.Unit.__copy__(self)
120         newUnit.Members = self.Members[:]
121         return newUnit
122    
123     def on_recall(self):
124         if self.Expiration is not None:
125             if self.Expiration <= datetime.datetime.now():
126                 self.forget()
127                 raise dejavu.UnrecallableError
128             else:
129                 self.decay(minutes=15)
130    
131     def decay(self, **kw):
132         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
133         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
134
135
136 operations = [              #       OPERAND
137     'COPY',                 # SetID of mixin
138     'CREATE',               # New type (= class.__name__)
139     'DIFFERENCE',           # SetID of mixin
140     'FILTER',               # logic.Expression
141     'FUNCTION',             # key into arena.engine_functions dict
142     'INTERSECTION',         # SetID of mixin
143     'RETURN',               #
144     'TRANSFORM',            # New type (= class.__name__)
145     'UNION',                # SetID of mixin
146     ]
147
148
149 class RuleProperty(dejavu.UnitProperty):
150     def post(self, unit, value):
151         eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
152         if eng:
153             eng.update_final_class()
154
155 class UnitEngineRule(dejavu.Unit):
156     """A Rule for Unit Engines."""
157    
158     Operation = RuleProperty(str)
159     SetID = RuleProperty(int)
160     Operand = RuleProperty(str, False, hints = {u'bytes': 0})
161     Sequence = RuleProperty(int)
162     EngineID = dejavu.UnitProperty(int, index=True)
163     Expiration = dejavu.UnitProperty(datetime.datetime)
164    
165     def __init__(self, **kwargs):
166         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
167         
168         FILTER:
169             If the Operation is 'FILTER', the Operand shall be a
170             logic.Expression, and the snapshot will consist of the IDs of
171             Units which match the Expression.
172         
173         Everything else:
174             transforms: the snapshot will consist of IDs of all units
175                 which are associated with the current snapshot.
176             union, difference, and intersection: these all take a setID.
177         
178         So, a typical Engine might have a set of rules which look like:
179             --Operation--   --Set-- --Operand--
180             CREATE          1       Invoice         # Full set
181             FILTER          1       (Expression)    # modifies Set 1
182             CREATE          2       Inventory       # Full set
183             FILTER          2       (Expression)    # modifies Set 2
184             FILTER          2       (Expression)    # modifies Set 2
185             TRANSFORM       2       Invoice         # modifies Set 2
186             DIFFERENCE      1       2               # Set1 -= Set2
187             RETURN          1                       # This is optional.
188         
189         The last RETURN statement is optional. If omitted, the last Set
190         touched will be returned.
191         
192         For all operations, the Set ID indicates which Set will be
193             modified by the operation. Using the above example, you can
194             see that for the DIFFERENCE operation, the Set which is modified
195             is Set 1.
196         """
197         dejavu.Unit.__init__(self)
198        
199         if kwargs.get('Operation', '') == 'FILTER':
200             if not isinstance(kwargs.get('Operand'), (str, unicode)):
201                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
202        
203         for k, v in kwargs.iteritems():
204             setattr(self, k, v)
205    
206     def __repr__(self):
207         op = self.Operand
208         if self.Operation == 'FILTER':
209             op = pickle.loads(op)
210         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
211                 % (self.Operation, self.SetID, repr(op)))
212    
213     def expr(self):
214         """expr() -> If a FILTER rule, return the Expression, else None."""
215         if self.Operation == 'FILTER':
216             op = self.Operand
217             return pickle.loads(op)
218         return None
219    
220     def on_recall(self):
221         if self.Expiration is not None:
222             if self.Expiration <= datetime.datetime.now():
223                 self.forget()
224                 raise dejavu.UnrecallableError
225             else:
226                 self.decay(minutes=15)
227    
228     def decay(self, **kw):
229         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
230         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
231
232
233 class UnitEngine(dejavu.Unit):
234     """A factory for Unit Collections."""
235    
236     Owner = dejavu.UnitProperty()
237     Name = dejavu.UnitProperty()
238     Created = dejavu.UnitProperty(datetime.datetime)
239     FinalClassName = dejavu.UnitProperty()
240     Expiration = dejavu.UnitProperty(datetime.datetime)
241    
242     def __init__(self, **kwargs):
243         dejavu.Unit.__init__(self)
244         self.Created = datetime.datetime.today()
245         self.Owner = u''
246        
247         for k, v in kwargs.iteritems():
248             setattr(self, k, v)
249    
250     def on_forget(self):
251         # Rules and Snapshots shouldn't persist past
252         # the life of their Engines. Forget them.
253         for rule in self.rules():
254             rule.forget()
255         for snap in self.snapshots():
256             snap.forget()
257    
258     def on_recall(self):
259         if self.Expiration is not None:
260             if self.Expiration <= datetime.datetime.now():
261                 self.forget()
262                 raise dejavu.UnrecallableError
263             else:
264                 self.decay(minutes=15)
265    
266     def decay(self, **kw):
267         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
268         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
269    
270     def update_final_class(self):
271         results = {}
272         last_set = 1
273         for rule in self.rules():
274             last_set = rule.SetID
275             operation = rule.Operation
276             if operation in ('CREATE', 'TRANSFORM'):
277                 results[last_set] = rule.Operand
278             if operation == 'RETURN':
279                 break
280         if last_set in results:
281             self.FinalClassName = results[last_set]
282    
283     def final_class(self):
284         return self.sandbox.arena.class_by_name(self.FinalClassName)
285    
286     def rules(self):
287         """An ordered list of Rules for this Engine."""
288         allrules = [x for x in self.UnitEngineRule()]
289         allrules.sort(dejavu.sort(u'Sequence'))
290         return allrules
291    
292     def add_rule(self, Operation, SetID=None, Operand=None):
293         allrules = self.rules()
294         if isinstance(Operation, UnitEngineRule):
295             newRule = Operation
296         else:
297             if SetID is None:
298                 try:
299                     SetID = allrules[-1].SetID
300                 except IndexError:
301                     SetID = 1
302             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
303                                      Operand=Operand)
304        
305         try:
306             nextSeq = allrules[-1].Sequence + 1
307         except IndexError:
308             nextSeq = 0
309         newRule.Sequence = nextSeq
310        
311         newRule.EngineID = self.ID
312         newRule.Expiration = self.Expiration
313         self.sandbox.memorize(newRule)
314         self.update_final_class()
315    
316     def snapshots(self):
317         """Unit Collections obtained by executing the rules sometime in the past."""
318         f = logic.filter(EngineID=self.ID)
319         allSnap = [x for x in self.sandbox.recall(UnitCollection, f)]
320         allSnap.sort(dejavu.sort(u'Timestamp'))
321         return allSnap
322    
323     def take_snapshot(self, args={}):
324         """Execute the rules and return a Unit Collection (or None)."""
325         allrules = self.rules()
326         snap = RuleProcessor(self.sandbox).process(allrules, args)
327         if snap is not None:
328             snap.EngineID = self.ID
329             now = datetime.datetime.now()
330             snap.Timestamp = now
331             snap.decay(minutes=15)
332             self.sandbox.memorize(snap)
333         return snap
334    
335     def last_snapshot(self, args={}):
336         allSnaps = self.snapshots()
337         if len(allSnaps) == 0:
338             aSnap = self.take_snapshot(args)
339         else:
340             aSnap = allSnaps[-1]
341         return aSnap
342    
343     def immortalize(self):
344         self.Expiration = None
345         for rule in self.rules():
346             rule.Expiration = None
347    
348     def __copy__(self):
349         newUnit = dejavu.Unit.__copy__(self)
350         newUnit.Name = "Copy of %s" % newUnit.Name
351         newUnit.Created = datetime.datetime.now()
352         return newUnit
353    
354     def clone(self, user, temporary=True):
355         """Copy self and all Rules of self. Memorize automatically."""
356         newUnit = self.__copy__()
357         newUnit.Owner = user
358         if temporary:
359             newUnit.decay(minutes=15)
360         else:
361             newUnit.Expiration = None
362         self.sandbox.memorize(newUnit)
363         for rule in self.rules():
364             newRule = rule.__copy__()
365             newRule.EngineID = newUnit.ID
366             newRule.Expiration = newUnit.Expiration
367             self.sandbox.memorize(newRule)
368         return newUnit
369    
370     def permit(self, user, write_access=True):
371         if write_access:
372             return self.Owner in (u'Public', user)
373         else:
374             return self.Owner in ('System', 'Public', user)
375
376 associate(UnitEngine, 'ID', UnitEngineRule, 'EngineID')
377 associate(UnitEngine, 'ID', UnitCollection, 'EngineID')
378
379
380 class RuleProcessor(object):
381     """Processor for the Rules of a Unit Engine."""
382    
383     def __init__(self, sandbox):
384         self.sandbox = sandbox
385         self.arena = sandbox.arena
386    
387     def process(self, rules, args):
388         """Execute the rules and return a Unit Collection (or None)."""
389         self.sets = {}
390         self.args = args
391         final = None
392         for rule in rules:
393             operation = rule.Operation
394             func = getattr(self, 'visit_' + operation)
395             final = rule.SetID
396             func(final, rule.Operand)
397         if final is None:
398             return None
399         else:
400             return self.sets[final]
401    
402     def visit_COPY(self, setID, operand):
403         """Copy the set whose ID = operand into another set, whose ID = setID."""
404         A = self.sets[setID]
405         setID2 = int(operand)
406         if setID2 in self.sets:
407             # Overwrite the existing set.
408             B = self.sets[setID2]
409         else:
410             # Create a new set.
411             B = UnitCollection(Type=A.Type)
412             self.sets[setID2] = B
413         B.universal = A.universal
414         A.acquire()
415         B.acquire()
416         try:
417             B.Members = A.Members[:]
418         finally:
419             A.release()
420             B.release()
421    
422     def visit_CREATE(self, setID, operand):
423         """Create a universal set. Actual population may be deferred."""
424         newset = UnitCollection(Type=operand)
425         newset.universal = True
426         self.sets[setID] = newset
427    
428     def visit_DIFFERENCE(self, setID, operand):
429         A = self.sets[setID]
430        
431         setID2 = int(operand)
432         self.realize_universal(setID2)
433         B = self.sets[setID2]
434        
435         A.acquire()
436         B.acquire()
437         try:
438             if B.universal:
439                 # B should be every Unit, which means the difference
440                 # will always be an empty set.
441                 A.Members = []
442             else:
443                 # B is a subset of A.
444                 if A.universal:
445                     cls = self.arena.class_by_name(A.Type)
446                     mem = A.Members
447                     for unit in self.sandbox.recall(cls):
448                         id = unit.ID
449                         if id not in B.Members:
450                             mem.append(id)
451                 else:
452                     A.Members = [x for x in A.Members if x not in B.Members]
453             A.universal = False
454         finally:
455             A.release()
456             B.release()
457    
458     def visit_FILTER(self, setID, operand):
459         expr = pickle.loads(operand)
460         expr.bind_args(**self.args)
461        
462         A = self.sets[setID]
463         A.acquire()
464         try:
465             cls = self.arena.class_by_name(A.Type)
466             mem = A.Members
467             if A.universal:
468                 A.universal = False
469                 for unit in self.sandbox.recall(cls, expr):
470                     id = unit.ID
471                     if id not in mem:
472                         mem.append(id)
473             else:
474                 newset = []
475                 for id in mem:
476                     unit = self.sandbox.unit(cls, ID=id)
477                     if unit and expr.evaluate(unit):
478                         newset.append(id)
479                 A.Members = newset
480         finally:
481             A.release()
482    
483     def visit_FUNCTION(self, setID, operand):
484         func = self.arena.engine_functions[operand]
485        
486         A = self.sets[setID]
487         A.acquire()
488         try:
489             # Notice we do not populate universals before passing to func.
490             func(self.sandbox, A)
491         finally:
492             A.release()
493    
494     def visit_INTERSECTION(self, setID, operand):
495         A = self.sets[setID]
496        
497         setID2 = int(operand)
498         B = self.sets[setID2]
499        
500         A.acquire()
501         B.acquire()
502         try:
503             if B.universal:
504                 # If A is universal, (A and B) = universal. Defer.
505                 # If A is not universal, (A and B) = A. Pass.
506                 pass
507             else:
508                 if A.universal:
509                     # B is a subset of A. (A and B) = B. Copy B to A.
510                     A.Members = B.Members[:]
511                     A.universal = False
512                 else:
513                     A.Members = [x for x in A.Members if x in B.Members]
514         finally:
515             A.release()
516             B.release()
517    
518     def visit_RETURN(self, setID, operand):
519         self.realize_universal(setID)
520    
521     def realize_universal(self, setID):
522         A = self.sets[setID]
523         if A.universal:
524             A.acquire()
525             try:
526                 A.universal = False
527                 cls = self.arena.class_by_name(A.Type)
528                 mem = A.Members
529                 for unit in self.sandbox.recall(cls):
530                     mem.append(unit.ID)
531             finally:
532                 A.release()
533    
534     def visit_TRANSFORM(self, setID, operand):
535         """operand=far class name. Multiple hops are supported."""
536         A = self.sets[setID]
537         start = self.arena.class_by_name(A.Type)
538         end = self.arena.class_by_name(operand)
539         nodes = self.arena.associations.shortest_path(start, end)
540         if nodes is None:
541             raise KeyError("No association found between '%s' and '%s'"
542                            % (start, end))
543        
544         # Skip the first node, which should be A.Type
545         nodes.pop(0)
546         A.acquire()
547         try:
548             for eachType in nodes:
549                 # Add all associated Units to the collection A.
550                 oppfunc = getattr(start, eachType.__name__)
551                 cls = self.arena.class_by_name(A.Type)
552                 newset = []
553                 if A.universal:
554                     for unit in self.sandbox.recall(cls):
555                         for farUnit in oppfunc(unit):
556                             farid = farUnit.ID
557                             if farid not in newset:
558                                 newset.append(farid)
559                     A.universal = False
560                 else:
561                     for id in A.Members:
562                         unit = self.sandbox.unit(cls, ID=id)
563                         if unit:
564                             for farUnit in oppfunc(unit):
565                                 farid = farUnit.ID
566                                 if farid not in newset:
567                                     newset.append(farid)
568                 A.Members = newset
569                 start = eachType
570                 A.Type = eachType.__name__
571         finally:
572             A.release()
573    
574     def visit_UNION(self, setID, operand):
575         A = self.sets[setID]
576         setID2 = int(operand)
577         B = self.sets[setID2]
578        
579         A.acquire()
580         B.acquire()
581         try:
582             if B.universal:
583                 # (A or B) = Universal set. Make A universal.
584                 A.universal = True
585                 A.Members = []
586             else:
587                 if A.universal:
588                     pass
589                 else:
590                     amem = A.Members
591                     for id in B.Members:
592                         if id not in amem:
593                             amem.append(id)
594         finally:
595             A.release()
596             B.release()
597
598
599 def register_classes(arena):
600     arena.register(UnitCollection)
601     arena.register(UnitEngine)
602     arena.register(UnitEngineRule)
603
Note: See TracBrowser for help on using the browser.