Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/engines.py

Revision 5 (checked in by fumanchu, 9 years ago)

Fixed visit_FUNCTION rule to pass a sandbox.

Line 
1 """
2 Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine
3 are all _temporary_ Units. Even when you memorize them, they won't be
4 persistent unless you mark each instance as no longer temporary. If you
5 use UnitEngine.permanent(), it will make all of its rules permanent
6 (not temporary) as well.
7 """
8
9 import threading
10 import datetime
11 try:
12     import cPickle as pickle
13 except ImportError:
14     import pickle
15 import dejavu
16 import logic
17 import sets
18 import xray
19
20
21 class UnitCollection(dejavu.Unit):
22     """A Set of Unit IDs.
23     
24     Type: Unit Type of all Units referenced by this collection.
25     
26     The Unit Collection is primarily for use as an index for Units.
27     Unit Engines use Expressions and other rules to transform a Collection
28     as a whole. These classes consume and produce Unit Collections.
29     The Unit Collection provides special methods for iteration, whether
30     reading or writing, to avoid errors common with multi-process/
31     multi-threaded access.
32     
33     UnitCollection is a subclass of Unit, so that it can be managed by
34     Sandboxes. However, due to the structure of the data contained in a
35     UnitCollection, it is recommended that Storage Managers use different
36     techniques to store and retrieve Unit Collections. They do not need
37     more than the ID's of their contained Units stored, since they will
38     recall such Units as needed. Not every Storage Manager is going to be
39     able to handle this kind of dynamic storage; deployers-- examine your
40     Storage Managers and make sure they can!
41     """
42    
43     _IDs = None
44    
45     def __init__(self, **kwargs):
46         dejavu.Unit.__init__(self)
47         self._IDs = sets.Set()
48         self._mutex = threading.RLock()
49        
50         for k, v in kwargs.iteritems():
51             setattr(self, k, v)
52    
53     def acquire(self):
54         self._mutex.acquire(True)
55    
56     def release(self):
57         self._mutex.release()
58    
59     def __len__(self):
60         return len(self._IDs)
61    
62     def add(self, ID):
63         self.acquire()
64         try:
65             self._IDs.add(ID)
66         finally:
67             self.release()
68    
69     def unit_class(self):
70         return self.sandbox.arena.class_by_name(self.Type)
71    
72     def ids(self):
73         self.acquire()
74         for eachID in self._IDs:
75             yield eachID
76         self.release()
77    
78     def units(self, quota=None):
79         cls = self.unit_class()
80         self.acquire()
81         for i, eachID in enumerate(self._IDs):
82             if quota and i >= quota:
83                 break
84             unit = self.sandbox.unit(cls, ID=eachID)
85             if unit:
86                 yield unit
87         self.release()
88    
89     def xdict(self, attr):
90         """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}."""
91         product = {}
92         self.acquire()
93         try:
94             for unit in self.units():
95                 key = getattr(unit, attr)
96                 product.setdefault(key, []).append(unit)
97         finally:
98             self.release()
99         return product
100    
101     def __copy__(self):
102         newUnit = dejavu.Unit.__copy__(self)
103         newUnit._IDs = self._IDs.copy()
104         return newUnit
105
106 UnitCollection.set_property(u'EngineID', int, index=True)
107 UnitCollection.set_properties({u'Type': unicode,
108                                u'Timestamp': datetime.datetime,
109                                })
110
111
112 operations = [              #       OPERAND
113     'COPY',                 # SetID of mixin
114     'CREATE',               # New type (= class.__name__)
115     'DIFFERENCE',           # SetID of mixin
116     'FILTER',               # logic.Expression
117     'FUNCTION',             # key into arena.engine_functions dict
118     'INTERSECTION',         # SetID of mixin
119     'RETURN',               #
120     'TRANSFORM',            # New type (= class.__name__)
121     'UNION',                # SetID of mixin
122     ]
123
124
125 class UnitEngineRule(dejavu.Unit):
126     """A Rule for Unit Engines."""
127    
128     def __init__(self, **kwargs):
129         """kw: Operation, SetID, Operand=(Type | logic.Expression | otherSet)
130         
131         Expressions:
132             If the Operation is a logic.Expression, then the snapshot
133             will consist of the IDs of units which match the Expression.
134         
135         Everything else:
136             transforms: the snapshot will consist of IDs of all units
137                 which are associated with the current snapshot.
138             union, difference, and intersection: these all take a setID.
139         
140         So, a typical Engine might have a set of rules which look like:
141             --Operation--   --Set-- --Operand--
142             CREATE          1       Invoice         # Full set
143             FILTER          1       (Expression)    # modifies Set 1
144             CREATE          2       Inventory       # Full set
145             FILTER          2       (Expression)    # modifies Set 2
146             FILTER          2       (Expression)    # modifies Set 2
147             TRANSFORM       2       Invoice         # modifies Set 2
148             DIFFERENCE      1       2               # Set1 -= Set2
149             RETURN          1                       # This is optional.
150         
151         The last RETURN statement is optional. If omitted, the last Set
152         touched will be returned.
153         
154         For all operations, the Set ID indicates which Set will be
155             modified by the operation. Using the above example, you can
156             see that for the DIFFERENCE operation, the Set which is modified
157             is Set 1.
158         """
159         dejavu.Unit.__init__(self)
160        
161         if kwargs.get('Operation', '') == 'FILTER':
162             if not isinstance(kwargs.get('Operand'), (str, unicode)):
163                 kwargs['Operand'] = pickle.dumps(kwargs['Operand'])
164        
165         for k, v in kwargs.iteritems():
166             setattr(self, k, v)
167    
168     def __repr__(self):
169         op = self.Operand
170         if self.Operation == 'FILTER':
171             op = pickle.loads(op)
172         return ("dejavu.engines.UnitEngineRule(%s, %s, %s)"
173                 % (self.Operation, self.SetID, repr(op)))
174    
175     def expr(self):
176         if self.Operation == 'FILTER':
177             op = self.Operand
178             return pickle.loads(op)
179         return None
180
181 class RuleProperty(dejavu.UnitProperty):
182     def post(self, unit, value):
183         eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID)
184         if eng:
185             eng.update_final_class()
186 UnitEngineRule.set_property(u'Operation', str, descriptor=RuleProperty)
187 UnitEngineRule.set_property(u'SetID', int, descriptor=RuleProperty)
188 UnitEngineRule.set_property(u'Operand', str, descriptor=RuleProperty)
189 UnitEngineRule.Operand.hints = {u'Size': 0}
190 UnitEngineRule.set_property(u'Sequence', int, descriptor=RuleProperty)
191 UnitEngineRule.set_property(u'EngineID', int, index=True)
192
193
194 class UnitEngine(dejavu.Unit):
195     """A factory for Unit Collections."""
196    
197     def __init__(self, **kwargs):
198         dejavu.Unit.__init__(self)
199         self.Created = datetime.datetime.today()
200         self.Owner = u''
201        
202         for k, v in kwargs.iteritems():
203             setattr(self, k, v)
204    
205     def on_forget(self):
206         # Rules and Snapshots shouldn't persist past
207         # the life of their Engines. Forget them.
208         for rule in self.rules():
209             rule.forget()
210         for snap in self.snapshots():
211             snap.forget()
212    
213     def update_final_class(self):
214         results = {}
215         last_set = 1
216         for rule in self.rules():
217             last_set = rule.SetID
218             operation = rule.Operation
219             if operation in ('CREATE', 'TRANSFORM'):
220                 results[last_set] = rule.Operand
221             if operation == 'RETURN':
222                 break
223         if last_set in results:
224             self.FinalClassName = results[last_set]
225    
226     def final_class(self):
227         return self.sandbox.arena.class_by_name(self.FinalClassName)
228    
229     def rules(self):
230         """An ordered list of Rules for this Engine."""
231         f = logic.filter(EngineID=self.ID)
232         allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)]
233         allrules.sort(dejavu.sort(u'Sequence'))
234         return allrules
235    
236     def add_rule(self, Operation, SetID=None, Operand=None):
237         allrules = self.rules()
238         if isinstance(Operation, UnitEngineRule):
239             newRule = Operation
240         else:
241             if SetID is None:
242                 try:
243                     SetID = allrules[-1].SetID
244                 except IndexError:
245                     SetID = 1
246             newRule = UnitEngineRule(Operation=Operation, SetID=SetID,
247                                      Operand=Operand)
248        
249         try:
250             nextSeq = allrules[-1].Sequence + 1
251         except IndexError:
252             nextSeq = 0
253         newRule.Sequence = nextSeq
254        
255         newRule.EngineID = self.ID
256         newRule.temporary = self.temporary
257         self.sandbox.memorize(newRule)
258         self.update_final_class()
259    
260     def snapshots(self):
261         """Unit Collections obtained by executing the rules sometime in the past."""
262         f = logic.filter(EngineID=self.ID)
263         allSnap = [x for x in self.sandbox.recall(UnitCollection, f)]
264         allSnap.sort(dejavu.sort(u'Timestamp'))
265         return allSnap
266    
267     def take_snapshot(self, args={}):
268         """Execute the rules and return a Unit Collection (or None)."""
269         allrules = self.rules()
270         snap = RuleProcessor(self.sandbox).process(allrules, args)
271         if snap is not None:
272             snap.EngineID = self.ID
273             snap.Timestamp = datetime.datetime.now()
274             snap.temporary = True
275             self.sandbox.memorize(snap)
276         return snap
277    
278     def last_snapshot(self, args={}):
279         allSnaps = self.snapshots()
280         if len(allSnaps) == 0:
281             aSnap = self.take_snapshot(args)
282         else:
283             aSnap = allSnaps[-1]
284         return aSnap
285    
286     def permanent(self):
287         self.temporary = False
288         for rule in self.rules():
289             rule.temporary = False
290    
291     def __copy__(self):
292         newUnit = dejavu.Unit.__copy__(self)
293         newUnit.Name = "Copy of %s" % newUnit.Name
294         newUnit.Created = datetime.datetime.now()
295         return newUnit
296    
297     def clone(self, user, temporary=True):
298         """Copy self and all Rules of self. Memorize automatically."""
299         newUnit = self.__copy__()
300         newUnit.Owner = user
301         newUnit.temporary = temporary
302         self.sandbox.memorize(newUnit)
303         for rule in self.rules():
304             newRule = rule.__copy__()
305             newRule.EngineID = newUnit.ID
306             newRule.temporary = temporary
307             self.sandbox.memorize(newRule)
308         return newUnit
309    
310     def permit(self, user, write_access=True):
311         if write_access:
312             return self.Owner in (u'Public', user)
313         else:
314             return self.Owner in ('System', 'Public', user)
315
316 UnitEngine.set_properties({u'Owner': unicode,
317                            u'Name': unicode,
318                            u'Created': datetime.datetime,
319                            u'FinalClassName': unicode,
320                            })
321
322
323 class RuleProcessor(object):
324     """Processor for the Rules of a Unit Engine."""
325    
326     def __init__(self, sandbox):
327         self.sandbox = sandbox
328         self.arena = sandbox.arena
329    
330     def process(self, rules, args):
331         """Execute the rules and return a Unit Collection (or None)."""
332         self.sets = {}
333         self.args = args
334         final = None
335         for rule in rules:
336             operation = rule.Operation
337             func = getattr(self, 'visit_' + operation)
338             final = rule.SetID
339             func(final, rule.Operand)
340         if final is None:
341             return None
342         else:
343             return self.sets[final]
344    
345     def visit_COPY(self, setID, operand):
346         """Copy the set whose ID = operand into another set, whose ID = setID."""
347         A = self.sets[setID]
348         operand = int(operand)
349         if operand in self.sets:
350             # Overwrite the existing set.
351             B = self.sets[operand]
352         else:
353             # Create a new set.
354             B = UnitCollection(Type=A.Type)
355             self.sets[operand] = B
356         B.empty = A.empty
357         A.acquire()
358         B.acquire()
359         try:
360             B._IDs = A._IDs.copy()
361         finally:
362             A.release()
363             B.release()
364    
365     def visit_CREATE(self, setID, operand):
366         """Create an empty set. The next instruction is responsible to fill it."""
367         newset = UnitCollection(Type=operand)
368         newset.empty = True
369         self.sets[setID] = newset
370    
371     def realize_empty(self, setID):
372         """realize_empty(setID). Populate the specified set only if empty."""
373         A = self.sets[setID]
374         if hasattr(A, 'empty') and A.empty:
375             A.empty = False
376             A.acquire()
377             try:
378                 for unit in self.sandbox.recall(self.arena.class_by_name(A.Type)):
379                     A._IDs.add(unit.ID)
380             finally:
381                 A.release()
382    
383     def visit_DIFFERENCE(self, setID, operand):
384         self.realize_empty(setID)
385         A = self.sets[setID]
386         B = self.sets[int(operand)]
387         A.acquire()
388         B.acquire()
389         try:
390             A._IDs = A._IDs.difference(B._IDs)
391         finally:
392             A.release()
393             B.release()
394    
395     def visit_FILTER(self, setID, operand):
396         expr = pickle.loads(operand)
397         expr.bind_args(**self.args)
398         A = self.sets[setID]
399         if hasattr(A, 'empty') and A.empty:
400             A.empty = False
401             A.acquire()
402             try:
403                 cls = self.arena.class_by_name(A.Type)
404                 for unit in self.sandbox.recall(cls, expr):
405                     A._IDs.add(unit.ID)
406             finally:
407                 A.release()
408         else:
409             A.acquire()
410             try:
411                 cls = self.arena.class_by_name(A.Type)
412                 newset = sets.Set()
413                 for id in A._IDs:
414                     unit = self.sandbox.unit(cls, ID=id)
415                     if unit and expr.evaluate(unit):
416                         newset.add(id)
417                 A._IDs = newset
418             finally:
419                 A.release()
420    
421     def visit_FUNCTION(self, setID, operand):
422         func = self.arena.engine_functions[operand]
423        
424         A = self.sets[setID]
425         A.acquire()
426         try:
427             func(self.sandbox, A)
428         finally:
429             A.release()
430    
431     def visit_INTERSECTION(self, setID, operand):
432         self.realize_empty(setID)
433         A = self.sets[setID]
434         B = self.sets[int(operand)]
435         A.acquire()
436         B.acquire()
437         try:
438             A._IDs = A._IDs.intersection(B._IDs)
439         finally:
440             A.release()
441             B.release()
442    
443     def visit_RETURN(self, setID, operand):
444         self.realize_empty(setID)
445    
446     def visit_TRANSFORM(self, setID, operand):
447         """operand=far class name. Multiple hops are supported."""
448         self.realize_empty(setID)
449         A = self.sets[setID]
450         start = self.arena.class_by_name(A.Type)
451         end = self.arena.class_by_name(operand)
452         nodes = self.arena.associations.shortest_path(start, end)
453         if nodes is None:
454             raise KeyError("No association found between '%s' and '%s'"
455                            % (start, end))
456        
457         # Skip the first node, which should be A.Type
458         nodes.pop(0)
459         A.acquire()
460         try:
461             for eachType in nodes:
462                 # Add all associated Units to the collection A.
463                 oppfunc = getattr(start, eachType.__name__)
464                 cls = self.arena.class_by_name(A.Type)
465                 newset = sets.Set()
466                 for id in A._IDs:
467                     unit = self.sandbox.unit(cls, ID=id)
468                     if unit:
469                         for farUnit in oppfunc(unit):
470                             newset.add(farUnit.ID)
471                 A._IDs = newset
472                 start = eachType
473                 A.Type = eachType.__name__
474         finally:
475             A.release()
476    
477     def visit_UNION(self, setID, operand):
478         self.realize_empty(setID)
479         A = self.sets[setID]
480         B = self.sets[int(operand)]
481         A.acquire()
482         B.acquire()
483         try:
484             A._IDs = A._IDs.union(B._IDs)
485         finally:
486             A.release()
487             B.release()
488
489
Note: See TracBrowser for help on using the browser.