Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/trunk/dejavu/engines.py

Revision 543 (checked in by fumanchu, 1 year ago)

Changed to unit(cls, **kwargs) sig throughout. Also changed sandbox.recall to include order, limit, and offset args; removed **kwargs from recall, but the 'expr' arg may now be a dict for all x/multi/recall methods throughout. Removed inheritance code. Added a 'sum' method to StorageManager?.

  • Property svn:eol-style set to native
Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you set each instance's Expiration to None. If you
5 use UnitEngine.immortalize(), it will make all of its rules immortal
6 (no Expiration) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15
16 import dejavu
17 from dejavu import errors, recur
18 from geniusql import logic
19
20
21 class TemporaryUnit(dejavu.Unit):
22    
23     Expiration = dejavu.UnitProperty(datetime.datetime)
24    
25     def on_recall(self):
26         if self.Expiration is not None:
27             if self.Expiration <= datetime.datetime.now():
28                 self.forget()
29                 raise errors.UnrecallableError
30             else:
31                 self.decay(minutes=15)
32    
33     def decay(self, **kw):
34         """decay(**kw) -> Set Expiration to now() + timedelta(**kw)."""
35         self.Expiration = datetime.datetime.now() + datetime.timedelta(**kw)
36
37
38 class TemporarySweeper(recur.Worker):
39     """A worker to sweep out expired TemporaryUnit's."""
40    
41     def work(self):
42         """Start a cycle of scheduled work."""
43         now = datetime.datetime.now()
44         f = lambda x: x.Expiration != None and x.Expiration <= now
45         box = self.store.new_sandbox()
46        
47         for cls in self.store.classes:
48             if issubclass(cls, TemporaryUnit):
49                 # Running box.recall will call TemporaryUnit.on_recall,
50                 # which should forget expired units.
51                 box.recall(cls, f)
52         box.flush_all()
53
54
55 class UnitCollection(TemporaryUnit):
56     """A Set of Unit identifiers.
57     
58     Type: Unit Type of all Units referenced by this collection.
59     
60     The Unit Collection is primarily for use as an index for Units.
61     Unit Engines use Expressions and other rules to transform a Collection
62     as a whole. These classes consume and produce Unit Collections.
63     The Unit Collection provides special methods for iteration, whether
64     reading or writing, to avoid errors common with multi-process/
65     multi-threaded access.
66     
67     UnitCollection is a subclass of Unit, so that it can be managed by
68     Sandboxes. However, due to the structure of the data contained in a
69     UnitCollection, it is recommended that Storage Managers use different
70     techniques to store and retrieve Unit Collections. They do not need
71     more than the ID's of their contained Units stored, since they will
72     recall such Units as needed. Not every Storage Manager is going to be
73     able to handle this kind of dynamic storage; deployers-- examine your
74     Storage Managers and make sure they can!
75     """
76    
77     Members = dejavu.UnitProperty(list)
78     EngineID = dejavu.UnitProperty(int, index=True)
79     Type = dejavu.UnitProperty()
80     Timestamp = dejavu.UnitProperty(datetime.datetime)
81    
82     def __init__(self, Members=None, **kwargs):
83         if Members is None:
84             Members = []
85         self._mutex = threading.RLock()
86         TemporaryUnit.__init__(self, Members=Members, **kwargs)
87    
88     def __setstate__(self, state):
89         self._mutex = threading.RLock()
90         TemporaryUnit.__setstate__(self, state)
91    
92     def acquire(self):
93         self._mutex.acquire(True)
94    
95     def release(self):
96         self._mutex.release()
97    
98     def __len__(self):
99         return len(self.Members)
100    
101     def add(self, ID):
102         self.acquire()
103         try:
104             if ID not in self.Members:
105                 self.Members.append(ID)
106         finally:
107             self.release()
108    
109     def unit_class(self):
110         return self.sandbox.store.class_by_name(self.Type)
111    
112     def ids(self):
113         self.acquire()
114         try:
115             return self.Members[:]
116         finally:
117             self.release()
118    
119     def units(self, quota=None):
120         cls = self.unit_class()
121        
122         output = []
123         self.acquire()
124         try:
125             for i, eachID in enumerate(self.Members):
126                 if quota and i >= quota:
127                     break
128                 filter = dict(zip(cls.identifiers, eachID))
129                 unit = self.sandbox.unit(cls, **filter)
130                 if unit:
131                     output.append(unit)
132         finally:
133             self.release()
134         return output
135    
136     def xdict(self, attr):
137         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
138         product = {}
139         self.acquire()
140         try:
141             for unit in self.units():
142                 key = getattr(unit, attr)
143                 product.setdefault(key, []).append(unit)
144         finally:
145             self.release()
146         return product
147    
148     def __copy__(self):
149         newUnit = TemporaryUnit.__copy__(self)
150         newUnit.Members = self.Members[:]
151         return newUnit
152
153
154 operations = [              #       OPERAND
155     'COPY',                 # SetID of mixin
156     'CREATE',               # New type (= class.__name__)
157     'DIFFERENCE',           # SetID of mixin
158     'FILTER',               # logic.Expression
159     'FUNCTION',             # key into store.engine_functions dict
160     'INTERSECTION',         # SetID of mixin
161     'RETURN',               #
162     'TRANSFORM',            # New type (= class.__name__)
163     'UNION',                # SetID of mixin
164     ]
165
166
167 class RuleProperty(dejavu.TriggerProperty):
168     def on_set(self, unit, oldvalue):
169         eng = unit.UnitEngine()
170         if eng:
171             eng.update_final_class()
172
173 class EngIDProperty(dejavu.TriggerProperty):
174     def on_set(self, unit, oldvalue):
175         eng = unit.sandbox.unit(UnitEngine, ID=oldvalue)
176         if eng:
177             eng.update_final_class()
178        
179         eng = unit.UnitEngine()
180         if eng:
181             eng.update_final_class()
182
183 class UnitEngineRule(TemporaryUnit):
184     """A Rule for Unit Engines."""
185    
186     Operation = RuleProperty(str)
187     SetID = RuleProperty(int)
188     Operand = RuleProperty(str, False, hints = {u'bytes': 0})
189     Sequence = RuleProperty(int)
190     EngineID = EngIDProperty(int, index=True)
191    
192     def __init__(self, Operation=None, SetID=None, Operand=None, **kwargs):
193         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
194         
195         TRANSFORM:
196             If the Operation is 'TRANSFORM', the Operand shall be the name of
197             a Unit type. The snapshot will consist of the identifiers of all
198             units of that Type which are associated with the current snapshot.
199         
200         FILTER:
201             If the Operation is 'FILTER', the Operand shall be a
202             logic.Expression, and the snapshot will consist of the
203             identifiers of Units which match the Expression.
204         
205         So, a typical Engine might have a set of rules which look like:
206             --Operation--   --Set-- --Operand--
207             CREATE          1       Invoice         # Full set
208             FILTER          1       (Expression)    # modifies Set 1
209             CREATE          2       Inventory       # Full set
210             FILTER          2       (Expression)    # modifies Set 2
211             FILTER          2       (Expression)    # modifies Set 2
212             TRANSFORM       2       Invoice         # modifies Set 2
213             DIFFERENCE      1       2               # Set1 -= Set2
214             RETURN          1                       # This is optional.
215         
216         The last RETURN statement is optional. If omitted, the last Set
217         touched will be returned.
218         
219         For all operations, the Set ID indicates which Set will be
220             modified by the operation. Using the above example, you can
221             see that for the DIFFERENCE operation, the Set which is modified
222             is Set 1.
223         """
224         if Operation == 'FILTER':
225             if not isinstance(Operand, basestring):
226                 if not isinstance(Operand, logic.Expression):
227                     # op can be a function
228                     Operand= logic.Expression(Operand)
229                 Operand = pickle.dumps(Operand)
230         TemporaryUnit.__init__(self, Operation=Operation, SetID=SetID,
231                                Operand=Operand, **kwargs)
232    
233     def __repr__(self):
234         op = self.Operand
235         if self.Operation == 'FILTER':
236             op = pickle.loads(op)
237         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
238                 % (self.Operation, self.SetID, repr(op)))
239    
240     def on_memorize(self):
241         eng = self.UnitEngine()
242         if eng:
243             eng.update_final_class()
244    
245     def on_forget(self):
246         eng = self.UnitEngine()
247         if eng:
248             eng.update_final_class()
249    
250     def expr(self):
251         """expr() -> If a FILTER rule, return the Expression, else None."""
252         if self.Operation == 'FILTER':
253             op = self.Operand
254             return pickle.loads(op)
255         return None
256
257
258 class UnitEngine(TemporaryUnit):
259     """A factory for Unit Collections."""
260    
261     Owner = dejavu.UnitProperty()
262     Name = dejavu.UnitProperty()
263     Created = dejavu.UnitProperty(datetime.datetime)
264     FinalClassName = dejavu.UnitProperty()
265    
266     def __init__(self, Created=None, Owner=u'', **kwargs):
267         if Created is None:
268             Created = datetime.datetime.today()
269         TemporaryUnit.__init__(self, Created=Created, Owner=Owner, **kwargs)
270    
271     def on_forget(self):
272         # Rules and Snapshots shouldn't persist past
273         # the life of their Engines. Forget them.
274         for rule in self.rules():
275             rule.forget()
276         for snap in self.snapshots():
277             snap.forget()
278    
279     def update_final_class(self):
280         results = {}
281         last_set = 1
282         for rule in self.rules():
283             last_set = rule.SetID
284             operation = rule.Operation
285             if operation in ('CREATE', 'TRANSFORM'):
286                 results[last_set] = rule.Operand
287             if operation == 'RETURN':
288                 break
289         if last_set in results:
290             self.FinalClassName = results[last_set]
291    
292     def final_class(self):
293         return self.sandbox.store.class_by_name(self.FinalClassName)
294    
295     def rules(self):
296         """An ordered list of Rules for this Engine."""
297         allrules = [x for x in self.UnitEngineRule()]
298         allrules.sort(dejavu.sort(u'Sequence'))
299         return allrules
300    
301     def add_rule(self, Operation, SetID=None, Operand=None):
302         allrules = self.rules()
303         if isinstance(Operation, UnitEngineRule):
304             newRule = Operation
305         else:
306             if SetID is None:
307                 try:
308                     SetID = allrules[-1].SetID
309                 except IndexError:
310                     SetID = 1
311             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
312                                      Operand=Operand)
313        
314         try:
315             nextSeq = allrules[-1].Sequence + 1
316         except IndexError:
317             nextSeq = 0
318         newRule.Sequence = nextSeq
319        
320         newRule.EngineID = self.ID
321         newRule.Expiration = self.Expiration
322         self.sandbox.memorize(newRule)
323    
324     def snapshots(self):
325         """Unit Collections obtained by executing the rules sometime in the past."""
326         allSnap = self.sandbox.recall(UnitCollection, dict(EngineID=self.ID))
327         allSnap.sort(dejavu.sort(u'Timestamp'))
328         return allSnap
329    
330     def take_snapshot(self, args={}):
331         """Execute the rules and return a Unit Collection (or None)."""
332         allrules = self.rules()
333         snap = RuleProcessor(self.sandbox).process(allrules, args)
334         if snap is not None:
335             snap.EngineID = self.ID
336             now = datetime.datetime.now()
337             snap.Timestamp = now
338             snap.decay(minutes=15)
339             self.sandbox.memorize(snap)
340         return snap
341    
342     def last_snapshot(self, args={}):
343         allSnaps = self.snapshots()
344         if len(allSnaps) == 0:
345             aSnap = self.take_snapshot(args)
346         else:
347             aSnap = allSnaps[-1]
348         return aSnap
349    
350     def immortalize(self):
351         self.Expiration = None
352         for rule in self.rules():
353             rule.Expiration = None
354    
355     def __copy__(self):
356         newUnit = TemporaryUnit.__copy__(self)
357         newUnit.Name = "Copy of %s" % newUnit.Name
358         newUnit.Created = datetime.datetime.now()
359         return newUnit
360    
361     def clone(self, user, temporary=True):
362         """Copy self and all Rules of self. Memorize automatically."""
363         newUnit = self.__copy__()
364         newUnit.Owner = user
365         if temporary:
366             newUnit.decay(minutes=15)
367         else:
368             newUnit.Expiration = None
369         self.sandbox.memorize(newUnit)
370         for rule in self.rules():
371             newRule = rule.__copy__()
372             newRule.EngineID = newUnit.ID
373             newRule.Expiration = newUnit.Expiration
374             self.sandbox.memorize(newRule)
375         return newUnit
376    
377     def permit(self, user, write_access=True):
378         if write_access:
379             return self.Owner in (u'Public', user)
380         else:
381             return self.Owner in ('System', 'Public', user)
382
383 UnitEngine.one_to_many('ID', UnitEngineRule, 'EngineID')
384 UnitEngine.one_to_many('ID', UnitCollection, 'EngineID')
385
386
387 class RuleProcessor(object):
388     """Processor for the Rules of a Unit Engine."""
389    
390     def __init__(self, sandbox):
391         self.sandbox = sandbox
392         self.store = sandbox.store
393    
394     def process(self, rules, args):
395         """Execute the rules and return a Unit Collection (or None)."""
396         self.sets = {}
397         self.args = args
398         final = None
399         for rule in rules:
400             operation = rule.Operation
401             func = getattr(self, 'visit_' + operation)
402             final = rule.SetID
403             func(final, rule.Operand)
404         if final is None:
405             return None
406         else:
407             return self.sets[final]
408    
409     def visit_COPY(self, setID, operand):
410         """Copy the set whose ID = operand into another set, whose ID = setID."""
411         A = self.sets[setID]
412         setID2 = int(operand)
413         if setID2 in self.sets:
414             # Overwrite the existing set.
415             B = self.sets[setID2]
416         else:
417             # Create a new set.
418             B = UnitCollection(Type=A.Type)
419             self.sets[setID2] = B
420         B.universal = A.universal
421         A.acquire()
422         B.acquire()
423         try:
424             B.Members = A.Members[:]
425         finally:
426             A.release()
427             B.release()
428    
429     def visit_CREATE(self, setID, operand):
430         """Create a universal set. Actual population may be deferred."""
431         newset = UnitCollection(Type=operand)
432         newset.universal = True
433         self.sets[setID] = newset
434    
435     def visit_DIFFERENCE(self, setID, operand):
436         A = self.sets[setID]
437        
438         setID2 = int(operand)
439         self.realize_universal(setID2)
440         B = self.sets[setID2]
441        
442         A.acquire()
443         B.acquire()
444         try:
445             if B.universal:
446                 # B should be every Unit, which means the difference
447                 # will always be an empty set.
448                 A.Members = []
449             else:
450                 # B is a subset of A.
451                 if A.universal:
452                     cls = self.store.class_by_name(A.Type)
453                     mem = A.Members
454                     for unit in self.sandbox.recall(cls):
455                         id = unit.identity()
456                         if id not in B.Members:
457                             mem.append(id)
458                 else:
459                     A.Members = [x for x in A.Members if x not in B.Members]
460             A.universal = False
461         finally:
462             A.release()
463             B.release()
464    
465     def visit_FILTER(self, setID, operand):
466         expr = pickle.loads(operand)
467         expr.bind_args(**self.args)
468        
469         A = self.sets[setID]
470         A.acquire()
471         try:
472             cls = self.store.class_by_name(A.Type)
473             mem = A.Members
474             if A.universal:
475                 A.universal = False
476                 for unit in self.sandbox.recall(cls, expr):
477                     id = unit.identity()
478                     if id not in mem:
479                         mem.append(id)
480             else:
481                 newset = []
482                 for id in mem:
483                     filter = dict(zip(cls.identifiers, id))
484                     unit = self.sandbox.unit(cls, **filter