Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/tags/1.4.0/arenas.py

Revision 166 (checked in by fumanchu, 3 years ago)

Now distributing xray module with dejavu.

  • Property svn:eol-style set to native
Line 
1
2 import ConfigParser
3
4 from containers import Graph
5 import logic
6 import errors
7 import xray
8
9 __all__ = ['Arena', 'Sandbox',
10            'LOGCONN', 'LOGFORGET', 'LOGMEMORIZE', 'LOGRECALL',
11            'LOGREPRESS', 'LOGSANDBOX', 'LOGSQL', 'LOGVIEW',
12            ]
13
14
15 # logging flags (see Arena.logflags)
16 LOGSQL = 4
17 LOGCONN = 8
18
19 LOGMEMORIZE = 128
20 LOGRECALL = 256
21 LOGVIEW = 512
22 LOGREPRESS = 1024
23 LOGFORGET = 2048
24 LOGSANDBOX = LOGMEMORIZE | LOGRECALL | LOGVIEW | LOGREPRESS | LOGFORGET
25
26
27 class Arena(object):
28     """Arena(). A namespace/workspace for a Dejavu application."""
29    
30     def __init__(self):
31         self.defaultStore = None
32         self.stores = {}
33         self._registered_classes = {}
34         self.associations = Graph(directed=False)
35         self.engine_functions = {}
36         self.logflags = 0
37    
38     def log(self, message, flag):
39         """Default logger (writes to stdout). Feel free to replace."""
40         if flag & self.logflags:
41             print message
42    
43     def load(self, configFileName):
44         """Load StorageManagers."""
45         parser = ConfigParser.ConfigParser()
46         # Make names case-sensitive by overriding optionxform.
47         parser.optionxform = unicode
48         parser.read(configFileName)
49        
50         stores = []
51         for section in parser.sections():
52             opts = dict(parser.items(section))
53             stores.append((int(opts.get("Load Order", "0")), section, opts))
54         stores.sort()
55        
56         for order, name, options in stores:
57             self.add_store(name, options[u'Class'], options)
58    
59     def add_store(self, name, store, options=None):
60         """add_store(name, store, options=None). Register a StorageManager.
61         
62         The 'store' argument may be the name of a Storage Manager class;
63         if so, it must be importable (that is, it must have the full dotted
64         package name).
65         """
66        
67         if isinstance(store, basestring):
68             store = xray.classes(store)(name, self, options or {})
69        
70         self.stores[name] = store
71         if not store.classnames:
72             # This store has no "classnames" list, which signals that it
73             # handles all classes which are not handled by other stores.
74             self.defaultStore = store
75         return store
76    
77     def remove_store(self, name):
78         if name in self.stores:
79             store = self.stores[name]
80            
81             # Disassociate all registered classes with this store.
82             for c in self._registered_classes.keys():
83                 if self._registered_classes[c] is store:
84                     self._registered_classes[c] = None
85            
86             del self.stores[name]
87    
88     def shutdown(self):
89         """Shutdown the arena."""
90         # Tell all stores to shut down.
91         stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()]
92         stores.sort()
93         for order, store, name in stores:
94             store.shutdown()
95    
96     def new_sandbox(self):
97         return Sandbox(self)
98    
99     ###########################################
100     ##        Unit Class Registration        ##
101     ###########################################
102    
103     def register(self, cls):
104         """register(cls) -> Assert that Units of class 'cls' will be handled."""
105         # We must allow modules to register classes before any stores have
106         # been added, but not overwrite a store which has already been found.
107         if cls not in self._registered_classes:
108             self._registered_classes[cls] = None
109        
110         # Register any association(s) in an undirected graph.
111         for ua in cls._associations.itervalues():
112             if getattr(ua, "register", True):
113                 self.associations.connect(cls, ua.farClass)
114    
115     def register_all(self, globals):
116         import dejavu
117         for obj in globals.itervalues():
118             if isinstance(obj, type) and issubclass(obj, dejavu.Unit):
119                 self.register(obj)
120    
121     def class_by_name(self, classname):
122         for cls in self._registered_classes:
123             if cls.__name__ == classname:
124                 return cls
125         raise KeyError("No registered class found for '%s'." % classname)
126    
127     def storage(self, cls):
128         """Return the StorageManager which handles Units of the given class."""
129         found = self._registered_classes.get(cls)
130        
131         if found:
132             return found
133        
134         # Search all stores for the class name.
135         clsname = cls.__name__
136         for store in self.stores.itervalues():
137             if clsname in store.classnames:
138                 found = store
139                 break
140         found = found or self.defaultStore
141         if found is None:
142             raise KeyError("No store found for '%s' and no "
143                            "default store." % clsname)
144        
145         self._registered_classes[cls] = found
146         return found
147    
148     def create_storage(self, cls):
149         """create_storage(cls). Create storage space for cls."""
150         self.storage(cls).create_storage(cls)
151    
152     def has_storage(self, cls):
153         return self.storage(cls).has_storage(cls)
154    
155     def drop_storage(self, cls):
156         self.storage(cls).drop_storage(cls)
157    
158     def add_property(self, cls, name):
159         self.storage(cls).add_property(cls, name)
160    
161     def drop_property(self, cls, name):
162         self.storage(cls).drop_property(cls, name)
163    
164     def rename_property(self, cls, oldname, newname):
165         self.storage(cls).rename_property(cls, oldname, newname)
166    
167     def migrate_class(self, cls, new_store):
168         """migrate_class(cls, new_store). Copy all units of cls to new_store."""
169         new_store.create_storage(cls)
170         for unit in self.new_sandbox().xrecall(cls):
171             new_store.reserve(unit)
172             new_store.save(unit, True)
173    
174     def migrate(self, new_store, old_store=None, copy_only=False):
175         """migrate(new_store, old_store=None). Copy all units (of old_store) to new_store."""
176         for cls in self._registered_classes:
177             store = self.storage(cls)
178             if old_store is None or old_store is store:
179                 self.migrate_class(cls, new_store)
180                 if not copy_only:
181                     self._registered_classes[cls] = new_store
182
183
184 ###########################################################################
185 ##                                                                       ##
186 ##                              Sandboxes                                ##
187 ##                                                                       ##
188 ###########################################################################
189
190
191 class Sandbox(object):
192     """Sandbox(arena). Data sandbox for Dejavu arenas.
193     
194     Each consumer (that is, each UI process) maintains a Sandbox for
195     managing Units. Sandboxes populate themselves with Units on a lazy
196     basis, allowing UI code to request data as it's needed. However, once
197     obtained, such Units are persisted (usually for the lifetime of the
198     thread); this important detail means that multiple requests for the
199     same Units result in multiple references to the same objects, rather
200     than multiple objects. Sandboxes are basically what Fowler calls
201     Identity Maps.
202     
203     The *REALLY* important thing to understand if you're customizing this
204     is that Sandboxes won't survive sharing across threads--DON'T TRY IT.
205     If you need to share unit data across requests, use or make an SM which
206     persists the data, and chain it with another, more normal SM.
207     
208     _cache(), _caches, and _stores are private for a reason--don't access
209     them from interface code--tell the Sandbox to do it for you.
210     """
211    
212     def __init__(self, arena):
213         self.arena = arena
214         self._caches = {}
215         self._magify()
216    
217     def _magify(self):
218         """Add magic recaller methods to self."""
219         # I haven't decided yet if this technique is too slow; it might be
220         # better to memoize a base class in the arena, and just set the
221         # magic methods on it once, rather than anew for each instance.
222         for cls in self.arena._registered_classes.iterkeys():
223             name = cls.__name__
224             if not hasattr(self, name):
225                 def make_recaller(cls):
226                     def recaller(*args, **kwargs):
227                         # Allow identifiers to be supplied as args or kwargs
228                         # (since the common case will be a single identifier).
229                         for arg, id in zip(args, cls.identifiers):
230                             kwargs[str(id.key)] = arg
231                         expr = logic.filter(**kwargs)
232                         try:
233                             return self.xrecall(cls, expr).next()
234                         except StopIteration:
235                             return None
236                     recaller.__doc__ = "A single %s Unit, else None." % name
237                     return recaller
238                 setattr(self, name, make_recaller(cls))
239    
240     def memorize(self, unit):
241         """memorize(unit). Persist unit in storage."""
242         cls = unit.__class__
243         unit.sandbox = self
244        
245         # Ask the store to accept the unit, assigning it primary key values
246         # if necessary. The store should also call unit.cleanse() if it
247         # saves the whole unit state on this call.
248         self.arena.storage(cls).reserve(unit)
249        
250         # Insert the unit into the cache.
251         id = unit.identity()
252         self._cache(cls)[id] = unit
253         self.arena.log("MEMORIZE %s: %s" % (cls.__name__, id), LOGMEMORIZE)
254        
255         # Do this at the end of the func, since most on_memorize
256         # will want to have an identity when called.
257         if hasattr(unit, "on_memorize"):
258             unit.on_memorize()
259    
260     def forget(self, unit):
261         """Destroy unit, both in the cache and storage."""
262         cls = unit.__class__
263        
264         id = unit.identity()
265         self.arena.log("FORGET %s: %s" % (cls.__name__, id), LOGFORGET)
266         self.arena.storage(cls).destroy(unit)
267        
268         del self._cache(cls)[id]
269        
270         # This must be done after the destroy() call, so that a
271         # related unit can poll all instances of this class.
272         if hasattr(unit, "on_forget"):
273             unit.on_forget()
274        
275         unit.sandbox = None
276    
277     def xrecall(self, classes, expr=None):
278         """Iterator over units of cls which match expr."""
279         if classes.__class__.__name__ == "UnitJoin":
280             for unitrow in self.xmulti(classes, expr):
281                 yield unitrow
282             return
283        
284         cls = classes
285         self.arena.log("RECALL %s: %s" % (cls.__name__, expr), LOGRECALL)
286        
287         # Collect all registered subclasses of cls.
288         # Note that cls is a subclass of itself.
289         classes = [c for c in self.arena._registered_classes.iterkeys()
290                    if issubclass(c, cls)]
291         if not classes:
292             # Even the requested class is not registered.
293             raise errors.UnrecallableError("The '%s' class is not registered."
294                                            % cls.__name__)
295        
296         for cls in classes:
297             finished = False
298             cache = self._cache(cls)
299            
300             # Special-case the scenario where one Unit is expected
301             # and called by ID. We should be able to save a database hit.
302             if expr:
303                 fc = expr.func.func_code
304                 if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S'
305                     and fc.co_names[-1] == 'ID'):
306                     ID = fc.co_consts[-1]
307                     unit = cache.get((ID,))
308                     if unit is not None:
309                         # Do NOT call on_recall here. That should be called
310                         # only at the Sandbox-SM boundary.
311                         yield unit
312                         finished = True
313            
314             if not finished:
315                 # Query the cache. We have to use a static copy of the
316                 # keys, to ensure that our cache doesn't change size
317                 # during iteration (due to overlapping xrecalls).
318                 keys = cache.keys()
319                 for id in keys:
320                     unit = cache.get(id)
321                     if unit and (expr is None) or expr.evaluate(unit):
322                         # Do NOT call on_recall here. That should be called
323                         # only at the Sandbox-SM boundary.
324                         yield unit
325                
326                 # Query Storage.
327                 for unit in self.arena.storage(cls).recall(cls, expr):
328                     id = unit.identity()
329                     # Don't offer up a unit that was already checked in our cache
330                     # (whether it matched the expr() or not--we assume the cache
331                     # has the freshest data).
332                     # A list is 2% to 4% faster than a dict, by the way (in Py 2.3).
333                     if id not in keys:
334                         # Very important that we check for existing unit, as its
335                         # state may have changed in memory but not in storage.
336                         # Make sure the cache lookup and get happens atomically.
337                         existing = cache.get(id)
338                         if existing:
339                             yield existing
340                         else:
341                             unit.sandbox = self
342                             confirmed = True
343                             cache[id] = unit
344                             if hasattr(unit, 'on_recall'):
345                                 try:
346                                     unit.on_recall()
347                                 except errors.UnrecallableError:
348                                     confirmed = False
349                             if confirmed:
350                                 yield unit
351    
352     def recall(self, classes, expr=None):
353         """List of units of class 'cls' which match expr."""
354         return [x for x in self.xrecall(classes, expr)]
355    
356     def xmulti(self, classes, expr=None):
357         """xmulti(classes, expr) -> [[unit, ...], [unit, ...], ...]
358         Recall units of each cls if they together match the expr.
359         
360         Each yielded value will be a list of Units, in the same order as
361         the classes arg. This facilitates unpacking in iterative consumer
362         code like:
363         
364         for invoice, price in sandbox.xmulti(Invoice & Price, f):
365             deal_with(invoice)
366             deal_with(price)
367         """
368        
369         self.arena.log("RECALL %s %s" %
370                        (", ".join([c.__name__ for c in classes]), expr),
371                        LOGRECALL)
372        
373         stores = [self.arena.storage(cls) for cls in classes]
374         firststore = stores[0]
375         for s in stores:
376             if s is not firststore:
377                 raise ValueError(u"xmulti() does not support multiple"
378                                  u" classes in disparate stores.")
379        
380         # This is broken. If a filter expr is supplied, then the store may
381         # not return rows which our cache would, and those won't be included
382         # in the resultset. If you're using xmulti with no expr's, or
383         # in read-only scripts, it should be OK for now. But if you mutate
384         # Units and then call multirecall, expect inconsistent results.
385         for unitset in firststore.multirecall(classes, expr):
386             confirmed = True
387             for index in xrange(len(unitset)):
388                 unit = unitset[index]
389                 id = unit.identity()
390                 cache = self._cache(unit.__class__)
391                 if id in cache:
392                     # Keep the unit which is in our cache!
393                     unitset[index] = cache[id]
394                 else:
395                     cache[id] = unit
396                     unit.sandbox = self
397                     if hasattr(unit, 'on_recall'):
398                         try:
399                             unit.on_recall()
400                         except errors.UnrecallableError:
401                             confirmed = False
402                             break
403             if confirmed:
404                 yield unitset
405    
406     def unit(self, cls, **kwargs):
407         """unit(cls, **kwargs) -> A single matching Unit, else None.
408         
409         **kwargs will be combined into an Expression via logic.filter.
410             The first Unit matching that expression is returned; if no
411             Units match, None is returned.
412         
413         If you need a single Unit which matches a more complex
414             expression, use recall()[0] or xrecall().next().
415         """
416         expr = None
417         if kwargs:
418             expr = logic.filter(**kwargs)
419         try:
420             return self.xrecall(cls, expr).next()
421         except StopIteration:
422             return None
423    
424     def view(self, cls, attrs, expr=None):
425         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
426         self.arena.log("VIEW %s [%s]: %s" % (cls.__name__, attrs, expr), LOGVIEW)
427        
428         cache = self._cache(cls)
429        
430         for unit in cache.itervalues():
431             if expr is None or expr(unit):
432                 yield tuple([getattr(unit, attr) for attr in attrs])
433        
434         # Add the identity attribute(s) if not present. This is necessary
435         # to avoid duplicating objects which are already in our cache.
436         fields = list(attrs)
437         indices = []
438         added_fields = 0
439         for prop in cls.identifiers:
440             if prop.key not in fields:
441                 added_fields += 1
442                 fields.append(prop.key)
443             indices.append(fields.index(prop.key))
444        
445         for row in self.arena.storage(cls).view(cls, fields, expr):
446             id = tuple([row[x] for x in indices])
447             if id not in cache:
448                 if added_fields:
449                     # Remove the added identifier columns from the row.
450                     row = row[:len(row) - added_fields]
451                 yield row
452    
453     def distinct(self, cls, attrs, expr=None):
454         """distinct(cls, attrs, expr=None) -> List of distinct Property tuples.
455         
456         If only one attribute is specified, a list of values will be returned.
457         If more than one attribute is specified, a zipped list will be returned.
458         
459         Notice that you can also use this function as a count() function
460         (in fact it's the only way to do it) by using attrs = ['ID'].
461         """
462         self.arena.log("DISTINCT %s [%s]: %s" % (cls.__name__, attrs, expr), LOGVIEW)
463        
464         seen = {}
465         cache = self._cache(cls)
466         for unit in cache.itervalues():
467             if expr is None or expr(unit):
468                 row = tuple([getattr(unit, attr) for attr in attrs])
469                 if row not in seen:
470                     seen[row] = None
471        
472         for row in self.arena.storage(cls).distinct(cls, attrs, expr):
473             if row not in seen:
474                 seen[row] = None
475        
476         seen = seen.keys()
477