Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 26 (checked in by fumanchu, 9 years ago)

1. Use error trapping instead of asserts.
2. Bug in engines--can't .copy() a list, use [:]

Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 from dejavu import logic
17 import xray
18 import sets
19
20
21 class UnitCollection(dejavu.Unit):
22     """A Set of Unit IDs.
23     
24     Type: Unit Type of all Units referenced by this collection.
25     
26     The Unit Collection is primarily for use as an index for Units.
27     Unit Engines use Expressions and other rules to transform a Collection
28     as a whole. These classes consume and produce Unit Collections.
29     The Unit Collection provides special methods for iteration, whether
30     reading or writing, to avoid errors common with multi-process/
31     multi-threaded access.
32     
33     UnitCollection is a subclass of Unit, so that it can be managed by
34     Sandboxes. However, due to the structure of the data contained in a
35     UnitCollection, it is recommended that Storage Managers use different
36     techniques to store and retrieve Unit Collections. They do not need
37     more than the ID's of their contained Units stored, since they will
38     recall such Units as needed. Not every Storage Manager is going to be
39     able to handle this kind of dynamic storage; deployers-- examine your
40     Storage Managers and make sure they can!
41     """
42    
43     Members = dejavu.UnitProperty(u'Members', list)
44     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
45     Type = dejavu.UnitProperty(u'Type')
46     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
47     Timestamp = dejavu.UnitProperty(u'Timestamp', datetime.datetime)
48    
49     def __init__(self, **kwargs):
50         dejavu.Unit.__init__(self)
51         self.Members = []
52         self._mutex = threading.RLock()
53        
54         for k, v in kwargs.iteritems():
55             setattr(self, k, v)
56    
57     def __getstate__(self):
58         return (self._properties, self.dirty)
59    
60     def __setstate__(self, state):
61         self.sandbox = None
62         self._mutex = threading.RLock()
63         self._properties, self.dirty = state
64    
65     def acquire(self):
66         self._mutex.acquire(True)
67    
68     def release(self):
69         self._mutex.release()
70    
71     def __len__(self):
72         return len(self.Members)
73    
74     def add(self, ID):
75         self.acquire()
76         try:
77             if ID not in self.Members:
78                 self.Members.append(ID)
79         finally:
80             self.release()
81    
82     def unit_class(self):
83         return self.sandbox.arena.class_by_name(self.Type)
84    
85     def ids(self):
86         self.acquire()
87         try:
88             return self.Members[:]
89         finally:
90             self.release()
91    
92     def units(self, quota=None):
93         cls = self.unit_class()
94         output = []
95         self.acquire()
96         try:
97             for i, eachID in enumerate(self.Members):
98                 if quota and i >= quota:
99                     break
100                 unit = self.sandbox.unit(cls, ID=eachID)
101                 if unit:
102                     output.append(unit)
103         finally:
104             self.release()
105         return output
106    
107     def xdict(self, attr):
108         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
109         product = {}
110         self.acquire()
111         try:
112             for unit in self.units():
113                 key = getattr(unit, attr)
114                 product.setdefault(key, []).append(unit)
115         finally:
116             self.release()
117         return product
118    
119     def __copy__(self):
120         newUnit = dejavu.Unit.__copy__(self)
121         newUnit.Members = self.Members[:]
122         return newUnit
123    
124     def on_recall(self):
125         if self.Expiration is not None:
126             if self.Expiration <= datetime.datetime.now():
127                 self.forget()
128                 raise dejavu.UnrecallableError
129             else:
130                 self.decay(minutes=15)
131    
132     def decay(self, **kw):
133         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
134         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
135
136
137 operations = [              #       OPERAND
138     'COPY',                 # SetID of mixin
139     'CREATE',               # New type (= class.__name__)
140     'DIFFERENCE',           # SetID of mixin
141     'FILTER',               # logic.Expression
142     'FUNCTION',             # key into arena.engine_functions dict
143     'INTERSECTION',         # SetID of mixin
144     'RETURN',               #
145     'TRANSFORM',            # New type (= class.__name__)
146     'UNION',                # SetID of mixin
147     ]
148
149
150 class RuleProperty(dejavu.UnitProperty):
151     def post(self, unit, value):
152         eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
153         if eng:
154             eng.update_final_class()
155
156 class UnitEngineRule(dejavu.Unit):
157     """A Rule for Unit Engines."""
158    
159     Operation = RuleProperty(u'Operation', str)
160     SetID = RuleProperty(u'SetID', int)
161     Operand = RuleProperty(u'Operand', str, False, hints = {u'Size': 0})
162     Sequence = RuleProperty(u'Sequence', int)
163     EngineID = dejavu.UnitProperty(u'EngineID', int, index=True)
164     Expiration = dejavu.UnitProperty(u'Expiration', datetime.datetime)
165    
166     def __init__(self, **kwargs):
167         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
168         
169         FILTER:
170             If the Operation is 'FILTER', the Operand shall be a
171             logic.Expression, and the snapshot will consist of the IDs of
172             Units which match the Expression.
173         
174         Everything else:
175             transforms: the snapshot will consist of IDs of all units
176                 which are associated with the current snapshot.
177             union, difference, and intersection: these all take a setID.
178         
179         So, a typical Engine might have a set of rules which look like:
180             --Operation--   --Set-- --Operand--
181             CREATE          1       Invoice         # Full set
182             FILTER          1       (Expression)    # modifies Set 1
183             CREATE          2       Inventory       # Full set
184             FILTER          2       (Expression)    # modifies Set 2
185             FILTER          2       (Expression)    # modifies Set 2
186             TRANSFORM       2       Invoice         # modifies Set 2
187             DIFFERENCE      1       2               # Set1 -= Set2
188             RETURN          1                       # This is optional.
189         
190         The last RETURN statement is optional. If omitted, the last Set
191         touched will be returned.
192         
193         For all operations, the Set ID indicates which Set will be
194             modified by the operation. Using the above example, you can
195             see that for the DIFFERENCE operation, the Set which is modified
196             is Set 1.
197         """
198         dejavu.Unit.__init__(self)
199        
200         if kwargs.get('Operation', '') == 'FILTER':
201             if not isinstance(kwargs.get('Operand'), (str, unicode)):
202                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
203        
204         for k, v in kwargs.iteritems():
205             setattr(self, k, v)
206    
207     def __repr__(self):
208         op = self.Operand
209         if self.Operation == 'FILTER':
210             op = pickle.loads(op)
211         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
212                 % (self.Operation, self.SetID, repr(op)))
213    
214     def expr(self):
215         """expr() -> If a FILTER rule, return the Expression, else None."""
216         if self.Operation == 'FILTER':
217             op = self.Operand
218             return pickle.loads(op)
219         return None
220    
221     def on_recall(self):
222         if self.Expiration is not None:
223             if self.Expiration <= datetime.datetime.now():
224                 self.forget()
225                 raise dejavu.UnrecallableError
226             else:
227                 self.decay(minutes=15)
228    
229     def decay(self, **kw):
230         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
231         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
232
233
234 class UnitEngine(dejavu.Unit):
235     """A factory for Unit Collections."""
236    
237     Owner = dejavu.UnitProperty(u'Owner')
238     Name = dejavu.UnitProperty(u'Name')
239     Created = dejavu.UnitProperty(u'Created', datetime.datetime)
240     FinalClassName = dejavu.UnitProperty(u'FinalClassName')
241     Expiration = dejavu.UnitProperty('Expiration', datetime.datetime)
242    
243     def __init__(self, **kwargs):
244         dejavu.Unit.__init__(self)
245         self.Created = datetime.datetime.today()
246         self.Owner = u''
247        
248         for k, v in kwargs.iteritems():
249             setattr(self, k, v)
250    
251     def on_forget(self):
252         # Rules and Snapshots shouldn't persist past
253         # the life of their Engines. Forget them.
254         for rule in self.rules():
255             rule.forget()
256         for snap in self.snapshots():
257             snap.forget()
258    
259     def on_recall(self):
260         if self.Expiration is not None:
261             if self.Expiration <= datetime.datetime.now():
262                 msg = ("Forgetting Engine %s. Exp = %s. Now = %s" %
263                        (self.ID, self.Expiration, datetime.datetime.now()))
264                 self.sandbox.arena.application.logger.info(msg)
265                 self.forget()
266                 raise dejavu.UnrecallableError
267             else:
268                 self.decay(minutes=15)
269    
270     def decay(self, **kw):
271         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
272         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
273    
274     def update_final_class(self):
275         results = {}
276         last_set = 1
277         for rule in self.rules():
278             last_set = rule.SetID
279             operation = rule.Operation
280             if operation in ('CREATE', 'TRANSFORM'):
281                 results[last_set] = rule.Operand
282             if operation == 'RETURN':
283                 break
284         if last_set in results:
285             self.FinalClassName = results[last_set]
286    
287     def final_class(self):
288         return self.sandbox.arena.class_by_name(self.FinalClassName)
289    
290     def rules(self):
291         """An ordered list of Rules for this Engine."""
292         f = logic.filter(EngineID=self.ID)
293         allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)]
294         allrules.sort(dejavu.sort(u'Sequence'))
295         return allrules
296    
297     def add_rule(self, Operation, SetID=None, Operand=None):
298         allrules = self.rules()
299         if isinstance(Operation, UnitEngineRule):
300             newRule = Operation
301         else:
302             if SetID is None:
303                 try:
304                     SetID = allrules[-1].SetID
305                 except IndexError:
306                     SetID = 1
307             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
308                                      Operand=Operand)
309        
310         try:
311             nextSeq = allrules[-1].Sequence + 1
312         except IndexError:
313             nextSeq = 0
314         newRule.Sequence = nextSeq
315        
316         newRule.EngineID = self.ID
317         newRule.Expiration = self.Expiration
318         self.sandbox.memorize(newRule)
319         self.update_final_class()
320    
321     def snapshots(self):
322         """Unit Collections obtained by executing the rules sometime in the past."""
323         f = logic.filter(EngineID=self.ID)
324         allSnap = [x for x in self.sandbox.recall(UnitCollection, f)]
325         allSnap.sort(dejavu.sort(u'Timestamp'))
326         return allSnap
327    
328     def take_snapshot(self, args={}):
329         """Execute the rules and return a Unit Collection (or None)."""
330         allrules = self.rules()
331         snap = RuleProcessor(self.sandbox).process(allrules, args)
332         if snap is not None:
333             snap.EngineID = self.ID
334             now = datetime.datetime.now()
335             snap.Timestamp = now
336             snap.decay(minutes=15)
337             self.sandbox.memorize(snap)
338         return snap
339    
340     def last_snapshot(self, args={}):
341         allSnaps = self.snapshots()
342         if len(allSnaps) == 0:
343             aSnap = self.take_snapshot(args)
344         else:
345             aSnap = allSnaps[-1]
346         return aSnap
347    
348     def immortalize(self):
349         self.Expiration = None
350         for rule in self.rules():
351             rule.Expiration = None
352    
353     def __copy__(self):
354         newUnit = dejavu.Unit.__copy__(self)
355         newUnit.Name = "Copy of %s" % newUnit.Name
356         newUnit.Created = datetime.datetime.now()
357         return newUnit
358    
359     def clone(self, user, temporary=True):
360         """Copy self and all Rules of self. Memorize automatically."""
361         newUnit = self.__copy__()
362         newUnit.Owner = user
363         if temporary:
364             newUnit.decay(minutes=15)
365         else:
366             newUnit.Expiration = None
367         self.sandbox.memorize(newUnit)
368         for rule in self.rules():
369             newRule = rule.__copy__()
370             newRule.EngineID = newUnit.ID
371             newRule.Expiration = newUnit.Expiration
372             self.sandbox.memorize(newRule)
373         return newUnit
374    
375     def permit(self, user, write_access=True):
376         if write_access:
377             return self.Owner in (u'Public', user)
378         else:
379             return self.Owner in ('System', 'Public', user)
380
381
382 class RuleProcessor(object):
383     """Processor for the Rules of a Unit Engine."""
384    
385     def __init__(self, sandbox):
386         self.sandbox = sandbox
387         self.arena = sandbox.arena
388    
389     def process(self, rules, args):
390         """Execute the rules and return a Unit Collection (or None)."""
391         self.sets = {}
392         self.args = args
393         final = None
394         for rule in rules:
395             operation = rule.Operation
396             func = getattr(self, 'visit_' + operation)
397             final = rule.SetID
398             func(final, rule.Operand)
399         if final is None:
400             return None
401         else:
402             return self.sets[final]
403    
404     def visit_COPY(self, setID, operand):
405         """Copy the set whose ID = operand into another set, whose ID = setID."""
406         A = self.sets[setID]
407         operand = int(operand)
408         if operand in self.sets:
409             # Overwrite the existing set.
410             B = self.sets[operand]
411         else:
412             # Create a new set.
413             B = UnitCollection(Type=A.Type)
414             self.sets[operand] = B
415         B.empty = A.empty
416         A.acquire()
417         B.acquire()
418         try:
419             B.Members = A.Members[:]
420         finally:
421             A.release()
422             B.release()
423    
424     def visit_CREATE(self, setID, operand):
425         """Create an empty set. The next instruction is responsible to fill it."""
426         newset = UnitCollection(Type=operand)
427         newset.empty = True
428         self.sets[setID] = newset
429    
430     def realize_empty(self, setID):
431         """realize_empty(setID). Populate the specified set only if empty."""
432         A = self.sets[setID]
433         if hasattr(A, 'empty') and A.empty:
434             A.empty = False
435             A.acquire()
436             try:
437                 mem = A.Members
438                 cls = self.arena.class_by_name(A.Type)
439                 for unit in self.sandbox.recall(cls):
440                     id = unit.ID
441                     if id not in mem:
442                         mem.append(id)
443             finally:
444                 A.release()
445    
446     def visit_DIFFERENCE(self, setID, operand):
447         self.realize_empty(setID)
448         A = self.sets[setID]
449         B = self.sets[int(operand)]
450         A.acquire()
451         B.acquire()
452         try:
453             A.Members = [x for x in A.Members if x not in B.Members]
454         finally:
455             A.release()
456             B.release()
457    
458     def visit_FILTER(self, setID, operand):
459         expr = pickle.loads(operand)
460         expr.bind_args(**self.args)
461         A = self.sets[setID]
462         if hasattr(A, 'empty') and A.empty:
463             A.empty = False
464             A.acquire()
465             try:
466                 cls = self.arena.class_by_name(A.Type)
467                 mem = A.Members
468                 for unit in self.sandbox.recall(cls, expr):
469                     id = unit.ID
470                     if id not in mem:
471                         mem.append(unit.ID)
472             finally:
473                 A.release()
474         else:
475             A.acquire()
476             try:
477                 cls = self.arena.class_by_name(A.Type)
478                 newset = []
479                 for id in A.Members:
480                     unit = self.sandbox.unit(cls, ID=id)
481                     if unit and expr.evaluate(unit):
482                         newset.append(id)
483                 A.Members = newset
484             finally:
485                 A.release()
486    
487     def visit_FUNCTION(self, setID, operand):
488         func = self.arena.engine_functions[operand]
489        
490         A = self.sets[setID]
491         A.acquire()
492         try:
493             func(self.sandbox, A)
494         finally:
495             A.release()
496    
497     def visit_INTERSECTION(self, setID, operand):
498         self.realize_empty(setID)
499         A = self.sets[setID]
500         B = self.sets[int(operand)]
501         A.acquire()
502         B.acquire()
503         try:
504             A.Members = [x for x in A.Members if x in B.Members]
505         finally:
506             A.release()
507             B.release()
508    
509     def visit_RETURN(self, setID, operand):
510         self.realize_empty(setID)
511    
512     def visit_TRANSFORM(self, setID, operand):
513         """operand=far class name. Multiple hops are supported."""
514         self.realize_empty(setID)
515         A = self.sets[setID]
516         start = self.arena.class_by_name(A.Type)
517         end = self.arena.class_by_name(operand)
518         nodes = self.arena.associations.shortest_path(start, end)
519         if nodes is None:
520             raise KeyError("No association found between '%s' and '%s'"
521                            % (start, end))
522        
523         # Skip the first node, which should be A.Type
524         nodes.pop(0)
525         A.acquire()
526         try:
527             for eachType in nodes:
528                 # Add all associated Units to the collection A.
529                 oppfunc = getattr(start, eachType.__name__)
530                 cls = self.arena.class_by_name(A.Type)
531                 newset = []
532                 for id in A.Members:
533                     unit = self.sandbox.unit(cls, ID=id)
534                     if unit:
535                         for farUnit in oppfunc(unit):
536                             farid = farUnit.ID
537                             if farid not in newset:
538                                 newset.append(farid)
539                 A.Members = newset
540                 start = eachType
541                 A.Type = eachType.__name__
542         finally:
543             A.release()
544    
545     def visit_UNION(self, setID, operand):
546         self.realize_empty(setID)
547         A = self.sets[setID]
548         B = self.sets[int(operand)]
549         A.acquire()
550         B.acquire()
551         try:
552             amem = A.Members
553             for id in B.Members:
554                 if id not in amem:
555                     amem.append(id)
556         finally:
557             A.release()
558             B.release()
559
Note: See TracBrowser for help on using the browser.