Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/tags/1.4.0/engines.py

Revision 165 (checked in by fumanchu, 3 years ago)

Now distributing recur module with dejavu.

  • Property svn:eol-style set to native
Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import recur
16
17 import dejavu
18 from dejavu import logic
19
20
21 class TemporaryUnit(dejavu.Unit):
22    
23     Expiration = dejavu.UnitProperty(datetime.datetime)
24    
25     def on_recall(self):
26         if self.Expiration is not None:
27             if self.Expiration <= datetime.datetime.now():
28                 self.forget()
29                 raise dejavu.UnrecallableError
30             else:
31                 self.decay(minutes=15)
32    
33     def decay(self, **kw):
34         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
35         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
36
37
38 class TemporarySweeper(recur.Worker):
39     """A worker to sweep out expired TemporaryUnit's."""
40    
41     def work(self):
42         """Start a cycle of scheduled work."""
43         now = datetime.datetime.now()
44         f = logic.Expression(lambda x: x.Expiration != None
45                                    and x.Expiration <= now)
46         box = self.arena.new_sandbox()
47        
48         for cls in self.arena._registered_classes:
49             if issubclass(cls, TemporaryUnit):
50                 # Running box.recall will call TemporaryUnit.on_recall,
51                 # which should forget expired units.
52                 box.recall(cls, f)
53         box.flush_all()
54
55
56 class UnitCollection(TemporaryUnit):
57     """A Set of Unit identifiers.
58     
59     Type: Unit Type of all Units referenced by this collection.
60     
61     The Unit Collection is primarily for use as an index for Units.
62     Unit Engines use Expressions and other rules to transform a Collection
63     as a whole. These classes consume and produce Unit Collections.
64     The Unit Collection provides special methods for iteration, whether
65     reading or writing, to avoid errors common with multi-process/
66     multi-threaded access.
67     
68     UnitCollection is a subclass of Unit, so that it can be managed by
69     Sandboxes. However, due to the structure of the data contained in a
70     UnitCollection, it is recommended that Storage Managers use different
71     techniques to store and retrieve Unit Collections. They do not need
72     more than the ID's of their contained Units stored, since they will
73     recall such Units as needed. Not every Storage Manager is going to be
74     able to handle this kind of dynamic storage; deployers-- examine your
75     Storage Managers and make sure they can!
76     """
77    
78     Members = dejavu.UnitProperty(list)
79     EngineID = dejavu.UnitProperty(int, index=True)
80     Type = dejavu.UnitProperty()
81     Timestamp = dejavu.UnitProperty(datetime.datetime)
82    
83     def __init__(self, **kwargs):
84         dejavu.Unit.__init__(self)
85         self.Members = []
86         self._mutex = threading.RLock()
87        
88         for k, v in kwargs.iteritems():
89             setattr(self, k, v)
90    
91     def __getstate__(self):
92         return (self._properties, self._initial_property_hash)
93    
94     def __setstate__(self, state):
95         self.sandbox = None
96         self._mutex = threading.RLock()
97         self._properties, self._initial_property_hash = state
98    
99     def acquire(self):
100         self._mutex.acquire(True)
101    
102     def release(self):
103         self._mutex.release()
104    
105     def __len__(self):
106         return len(self.Members)
107    
108     def add(self, ID):
109         self.acquire()
110         try:
111             if ID not in self.Members:
112                 self.Members.append(ID)
113         finally:
114             self.release()
115    
116     def unit_class(self):
117         return self.sandbox.arena.class_by_name(self.Type)
118    
119     def ids(self):
120         self.acquire()
121         try:
122             return self.Members[:]
123         finally:
124             self.release()
125    
126     def units(self, quota=None):
127         cls = self.unit_class()
128         idnames = [prop.key for prop in cls.identifiers]
129        
130         output = []
131         self.acquire()
132         try:
133             for i, eachID in enumerate(self.Members):
134                 if quota and i >= quota:
135                     break
136                 unit = self.sandbox.unit(cls, **dict(zip(idnames, eachID)))
137                 if unit:
138                     output.append(unit)
139         finally:
140             self.release()
141         return output
142    
143     def xdict(self, attr):
144         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
145         product = {}
146         self.acquire()
147         try:
148             for unit in self.units():
149                 key = getattr(unit, attr)
150                 product.setdefault(key, []).append(unit)
151         finally:
152             self.release()
153         return product
154    
155     def __copy__(self):
156         newUnit = dejavu.Unit.__copy__(self)
157         newUnit.Members = self.Members[:]
158         return newUnit
159
160
161 operations = [              #       OPERAND
162     'COPY',                 # SetID of mixin
163     'CREATE',               # New type (= class.__name__)
164     'DIFFERENCE',           # SetID of mixin
165     'FILTER',               # logic.Expression
166     'FUNCTION',             # key into arena.engine_functions dict
167     'INTERSECTION',         # SetID of mixin
168     'RETURN',               #
169     'TRANSFORM',            # New type (= class.__name__)
170     'UNION',                # SetID of mixin
171     ]
172
173
174 class RuleProperty(dejavu.TriggerProperty):
175     def on_set(self, unit, oldvalue):
176         eng = unit.UnitEngine()
177         if eng:
178             eng.update_final_class()
179
180 class EngIDProperty(dejavu.TriggerProperty):
181     def on_set(self, unit, oldvalue):
182         eng = unit.sandbox.unit(UnitEngine, ID=oldvalue)
183         if eng:
184             eng.update_final_class()
185        
186         eng = unit.UnitEngine()
187         if eng:
188             eng.update_final_class()
189
190 class UnitEngineRule(TemporaryUnit):
191     """A Rule for Unit Engines."""
192    
193     Operation = RuleProperty(str)
194     SetID = RuleProperty(int)
195     Operand = RuleProperty(str, False, hints = {u'bytes': 0})
196     Sequence = RuleProperty(int)
197     EngineID = EngIDProperty(int, index=True)
198    
199     def __init__(self, **kwargs):
200         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
201         
202         TRANSFORM:
203             If the Operation is 'TRANSFORM', the Operand shall be the name of
204             a Unit type. The snapshot will consist of the identifiers of all
205             units of that Type which are associated with the current snapshot.
206         
207         FILTER:
208             If the Operation is 'FILTER', the Operand shall be a
209             logic.Expression, and the snapshot will consist of the
210             identifiers of Units which match the Expression.
211         
212         So, a typical Engine might have a set of rules which look like:
213             --Operation--   --Set-- --Operand--
214             CREATE          1       Invoice         # Full set
215             FILTER          1       (Expression)    # modifies Set 1
216             CREATE          2       Inventory       # Full set
217             FILTER          2       (Expression)    # modifies Set 2
218             FILTER          2       (Expression)    # modifies Set 2
219             TRANSFORM       2       Invoice         # modifies Set 2
220             DIFFERENCE      1       2               # Set1 -= Set2
221             RETURN          1                       # This is optional.
222         
223         The last RETURN statement is optional. If omitted, the last Set
224         touched will be returned.
225         
226         For all operations, the Set ID indicates which Set will be
227             modified by the operation. Using the above example, you can
228             see that for the DIFFERENCE operation, the Set which is modified
229             is Set 1.
230         """
231         dejavu.Unit.__init__(self)
232        
233         if kwargs.get('Operation', '') == 'FILTER':
234             if not isinstance(kwargs.get('Operand'), (str, unicode)):
235                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
236        
237         for k, v in kwargs.iteritems():
238             setattr(self, k, v)
239    
240     def __repr__(self):
241         op = self.Operand
242         if self.Operation == 'FILTER':
243             op = pickle.loads(op)
244         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
245                 % (self.Operation, self.SetID, repr(op)))
246    
247     def on_memorize(self):
248         eng = self.UnitEngine()
249         if eng:
250             eng.update_final_class()
251    
252     def on_forget(self):
253         eng = self.UnitEngine()
254         if eng:
255             eng.update_final_class()
256    
257     def expr(self):
258         """expr() -> If a FILTER rule, return the Expression, else None."""
259         if self.Operation == 'FILTER':
260             op = self.Operand
261             return pickle.loads(op)
262         return None
263
264
265 class UnitEngine(TemporaryUnit):
266     """A factory for Unit Collections."""
267    
268     Owner = dejavu.UnitProperty()
269     Name = dejavu.UnitProperty()
270     Created = dejavu.UnitProperty(datetime.datetime)
271     FinalClassName = dejavu.UnitProperty()
272    
273     def __init__(self, **kwargs):
274         dejavu.Unit.__init__(self)
275         self.Created = datetime.datetime.today()
276         self.Owner = u''
277        
278         for k, v in kwargs.iteritems():
279             setattr(self, k, v)
280    
281     def on_forget(self):
282         # Rules and Snapshots shouldn't persist past
283         # the life of their Engines. Forget them.
284         for rule in self.rules():
285             rule.forget()
286         for snap in self.snapshots():
287             snap.forget()
288    
289     def update_final_class(self):
290         results = {}
291         last_set = 1
292         for rule in self.rules():
293             last_set = rule.SetID
294             operation = rule.Operation
295             if operation in ('CREATE', 'TRANSFORM'):
296                 results[last_set] = rule.Operand
297             if operation == 'RETURN':
298                 break
299         if last_set in results:
300             self.FinalClassName = results[last_set]
301    
302     def final_class(self):
303         return self.sandbox.arena.class_by_name(self.FinalClassName)
304    
305     def rules(self):
306         """An ordered list of Rules for this Engine."""
307         allrules = [x for x in self.UnitEngineRule()]
308         allrules.sort(dejavu.sort(u'Sequence'))
309         return allrules
310    
311     def add_rule(self, Operation, SetID=None, Operand=None):
312         allrules = self.rules()
313         if isinstance(Operation, UnitEngineRule):
314             newRule = Operation
315         else:
316             if SetID is None:
317                 try:
318                     SetID = allrules[-1].SetID
319                 except IndexError:
320                     SetID = 1
321             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
322                                      Operand=Operand)
323        
324         try:
325             nextSeq = allrules[-1].Sequence + 1
326         except IndexError:
327             nextSeq = 0
328         newRule.Sequence = nextSeq
329        
330         newRule.EngineID = self.ID
331         newRule.Expiration = self.Expiration
332         self.sandbox.memorize(newRule)
333    
334     def snapshots(self):
335         """Unit Collections obtained by executing the rules sometime in the past."""
336         f = logic.filter(EngineID=self.ID)
337         allSnap = self.sandbox.recall(UnitCollection, f)
338         allSnap.sort(dejavu.sort(u'Timestamp'))
339         return allSnap
340    
341     def take_snapshot(self, args={}):
342         """Execute the rules and return a Unit Collection (or None)."""
343         allrules = self.rules()
344         snap = RuleProcessor(self.sandbox).process(allrules, args)
345         if snap is not None:
346             snap.EngineID = self.ID
347             now = datetime.datetime.now()
348             snap.Timestamp = now
349             snap.decay(minutes=15)
350             self.sandbox.memorize(snap)
351         return snap
352    
353     def last_snapshot(self, args={}):
354         allSnaps = self.snapshots()
355         if len(allSnaps) == 0:
356             aSnap = self.take_snapshot(args)
357         else:
358             aSnap = allSnaps[-1]
359         return aSnap
360    
361     def immortalize(self):
362         self.Expiration = None
363         for rule in self.rules():
364             rule.Expiration = None
365    
366     def __copy__(self):
367         newUnit = dejavu.Unit.__copy__(self)
368         newUnit.Name = "Copy of %s" % newUnit.Name
369         newUnit.Created = datetime.datetime.now()
370         return newUnit
371    
372     def clone(self, user, temporary=True):
373         """Copy self and all Rules of self. Memorize automatically."""
374         newUnit = self.__copy__()
375         newUnit.Owner = user
376         if temporary:
377             newUnit.decay(minutes=15)
378         else:
379             newUnit.Expiration = None
380         self.sandbox.memorize(newUnit)
381         for rule in self.rules():
382             newRule = rule.__copy__()
383             newRule.EngineID = newUnit.ID
384             newRule.Expiration = newUnit.Expiration
385             self.sandbox.memorize(newRule)
386         return newUnit
387    
388     def permit(self, user, write_access=True):
389         if write_access:
390             return self.Owner in (u'Public', user)
391         else:
392             return self.Owner in ('System', 'Public', user)
393
394 UnitEngine.one_to_many('ID', UnitEngineRule, 'EngineID')
395 UnitEngine.one_to_many('ID', UnitCollection, 'EngineID')
396
397
398 class RuleProcessor(object):
399     """Processor for the Rules of a Unit Engine."""
400    
401     def __init__(self, sandbox):
402         self.sandbox = sandbox
403         self.arena = sandbox.arena
404    
405     def process(self, rules, args):
406         """Execute the rules and return a Unit Collection (or None)."""
407         self.sets = {}
408         self.args = args
409         final = None
410         for rule in rules:
411             operation = rule.Operation
412             func = getattr(self, 'visit_' + operation)
413             final = rule.SetID
414             func(final, rule.Operand)
415         if final is None:
416             return None
417         else:
418             return self.sets[final]
419    
420     def visit_COPY(self, setID, operand):
421         """Copy the set whose ID = operand into another set, whose ID = setID."""
422         A = self.sets[setID]
423         setID2 = int(operand)
424         if setID2 in self.sets:
425             # Overwrite the existing set.
426             B = self.sets[setID2]
427         else:
428             # Create a new set.
429             B = UnitCollection(Type=A.Type)
430             self.sets[setID2] = B
431         B.universal = A.universal
432         A.acquire()
433         B.acquire()
434         try:
435             B.Members = A.Members[:]
436         finally:
437             A.release()
438             B.release()
439    
440     def visit_CREATE(self, setID, operand):
441         """Create a universal set. Actual population may be deferred."""
442         newset = UnitCollection(Type=operand)
443         newset.universal = True
444         self.sets[setID] = newset
445    
446     def visit_DIFFERENCE(self, setID, operand):
447         A = self.sets[setID]
448        
449         setID2 = int(operand)
450         self.realize_universal(setID2)
451         B = self.sets[setID2]
452        
453         A.acquire()
454         B.acquire()
455         try:
456             if B.universal:
457                 # B should be every Unit, which means the difference
458                 # will always be an empty set.
459                 A.Members = []
460             else:
461                 # B is a subset of A.
462                 if A.universal:
463                     cls = self.arena.class_by_name(A.Type)
464                     mem = A.Members
465                     for unit in self.sandbox.recall(cls):
466                         id = unit.identity()
467                         if id not in B.Members:
468                             mem.append(id)
469                 else:
470                     A.Members = [x for x in A.Members if x not in B.Members]
471             A.universal = False
472         finally:
473             A.release()
474             B.release()
475    
476     def visit_FILTER(self, setID, operand):
477         expr = pickle.loads(operand)
478         expr.bind_args(**self.args)
479        
480         A = self.sets[setID]
481         A.acquire()
482         try:
483             cls = self.arena.class_by_name(A.Type)
484             mem = A.Members
485             if A.universal:
486                 A.universal = False
487                 for unit in self.sandbox.recall(cls, expr):
488                     id = unit.identity()
489                     if id not in mem:
490                         mem.append(id)