Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/arenas.py

Revision 201 (checked in by fumanchu, 7 years ago)

Separated connection factories (pools, etc) out of the StorageManager? class. SM's now have a "_del_conn" method instead of a "close_connection_method" attribute. Also, the LOGCONN logflag has been removed.

  • Property svn:eol-style set to native
Line 
1
2 import ConfigParser
3
4 from containers import Graph
5 import logic
6 import errors
7 import xray
8
9 __all__ = ['Arena', 'Sandbox',
10            'LOGFORGET', 'LOGMEMORIZE', 'LOGRECALL',
11            'LOGREPRESS', 'LOGSANDBOX', 'LOGSQL', 'LOGVIEW',
12            ]
13
14
15 # logging flags (see Arena.logflags)
16 LOGSQL = 4
17
18 LOGMEMORIZE = 128
19 LOGRECALL = 256
20 LOGVIEW = 512
21 LOGREPRESS = 1024
22 LOGFORGET = 2048
23 LOGSANDBOX = LOGMEMORIZE | LOGRECALL | LOGVIEW | LOGREPRESS | LOGFORGET
24
25
26 class Arena(object):
27     """Arena(). A namespace/workspace for a Dejavu application."""
28    
29     def __init__(self):
30         self.defaultStore = None
31         self.stores = {}
32         self._registered_classes = {}
33         self.associations = Graph(directed=False)
34         self.engine_functions = {}
35         self.logflags = 0
36    
37     def log(self, message, flag):
38         """Default logger (writes to stdout). Feel free to replace."""
39         if flag & self.logflags:
40             print message
41    
42     def load(self, configFileName):
43         """Load StorageManagers."""
44         parser = ConfigParser.ConfigParser()
45         # Make names case-sensitive by overriding optionxform.
46         parser.optionxform = unicode
47         parser.read(configFileName)
48        
49         stores = []
50         for section in parser.sections():
51             opts = dict(parser.items(section))
52             stores.append((int(opts.get("Load Order", "0")), section, opts))
53         stores.sort()
54        
55         for order, name, options in stores:
56             self.add_store(name, options[u'Class'], options)
57    
58     def add_store(self, name, store, options=None):
59         """add_store(name, store, options=None). Register a StorageManager.
60         
61         The 'store' argument may be the name of a Storage Manager class;
62         if so, it must be importable (that is, it must have the full dotted
63         package name).
64         """
65        
66         if isinstance(store, basestring):
67             store = xray.classes(store)(name, self, options or {})
68        
69         self.stores[name] = store
70         if not store.classnames:
71             # This store has no "classnames" list, which signals that it
72             # handles all classes which are not handled by other stores.
73             self.defaultStore = store
74         return store
75    
76     def remove_store(self, name):
77         if name in self.stores:
78             store = self.stores[name]
79            
80             # Disassociate all registered classes with this store.
81             for c in self._registered_classes.keys():
82                 if self._registered_classes[c] is store:
83                     self._registered_classes[c] = None
84            
85             del self.stores[name]
86    
87     def shutdown(self):
88         """Shutdown the arena."""
89         # Tell all stores to shut down.
90         stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()]
91         stores.sort()
92         for order, store, name in stores:
93             store.shutdown()
94    
95     def new_sandbox(self):
96         return Sandbox(self)
97    
98     ###########################################
99     ##        Unit Class Registration        ##
100     ###########################################
101    
102     def register(self, cls):
103         """register(cls) -> Assert that Units of class 'cls' will be handled."""
104         # We must allow modules to register classes before any stores have
105         # been added, but not overwrite a store which has already been found.
106         if cls not in self._registered_classes:
107             self._registered_classes[cls] = None
108        
109         # Register any association(s) in an undirected graph.
110         for ua in cls._associations.itervalues():
111             if getattr(ua, "register", True):
112                 self.associations.connect(cls, ua.farClass)
113    
114     def register_all(self, globals):
115         import dejavu
116         for obj in globals.itervalues():
117             if isinstance(obj, type) and issubclass(obj, dejavu.Unit):
118                 self.register(obj)
119    
120     def class_by_name(self, classname):
121         for cls in self._registered_classes:
122             if cls.__name__ == classname:
123                 return cls
124         raise KeyError("No registered class found for '%s'." % classname)
125    
126     def storage(self, cls):
127         """Return the StorageManager which handles Units of the given class."""
128         found = self._registered_classes.get(cls)
129        
130         if found:
131             return found
132        
133         # Search all stores for the class name.
134         clsname = cls.__name__
135         for store in self.stores.itervalues():
136             if clsname in store.classnames:
137                 found = store
138                 break
139         found = found or self.defaultStore
140         if found is None:
141             raise KeyError("No store found for '%s' and no "
142                            "default store." % clsname)
143        
144         self._registered_classes[cls] = found
145         return found
146    
147     def create_storage(self, cls):
148         """create_storage(cls). Create storage space for cls."""
149         self.storage(cls).create_storage(cls)
150    
151     def has_storage(self, cls):
152         return self.storage(cls).has_storage(cls)
153    
154     def drop_storage(self, cls):
155         self.storage(cls).drop_storage(cls)
156    
157     def add_property(self, cls, name):
158         self.storage(cls).add_property(cls, name)
159    
160     def drop_property(self, cls, name):
161         self.storage(cls).drop_property(cls, name)
162    
163     def rename_property(self, cls, oldname, newname):
164         self.storage(cls).rename_property(cls, oldname, newname)
165    
166     def migrate_class(self, cls, new_store):
167         """migrate_class(cls, new_store). Copy all units of cls to new_store."""
168         new_store.create_storage(cls)
169         for unit in self.new_sandbox().xrecall(cls):
170             new_store.reserve(unit)
171             new_store.save(unit, True)
172    
173     def migrate(self, new_store, old_store=None, copy_only=False):
174         """migrate(new_store, old_store=None). Copy all units (of old_store) to new_store."""
175         for cls in self._registered_classes:
176             store = self.storage(cls)
177             if old_store is None or old_store is store:
178                 self.migrate_class(cls, new_store)
179                 if not copy_only:
180                     self._registered_classes[cls] = new_store
181
182
183 ###########################################################################
184 ##                                                                       ##
185 ##                              Sandboxes                                ##
186 ##                                                                       ##
187 ###########################################################################
188
189
190 class Sandbox(object):
191     """Sandbox(arena). Data sandbox for Dejavu arenas.
192     
193     Each consumer (that is, each UI process) maintains a Sandbox for
194     managing Units. Sandboxes populate themselves with Units on a lazy
195     basis, allowing UI code to request data as it's needed. However, once
196     obtained, such Units are persisted (usually for the lifetime of the
197     thread); this important detail means that multiple requests for the
198     same Units result in multiple references to the same objects, rather
199     than multiple objects. Sandboxes are basically what Fowler calls
200     Identity Maps.
201     
202     The *REALLY* important thing to understand if you're customizing this
203     is that Sandboxes won't survive sharing across threads--DON'T TRY IT.
204     If you need to share unit data across requests, use or make an SM which
205     persists the data, and chain it with another, more normal SM.
206     
207     _cache(), _caches, and _stores are private for a reason--don't access
208     them from interface code--tell the Sandbox to do it for you.
209     """
210    
211     def __init__(self, arena):
212         self.arena = arena
213         self._caches = {}
214    
215     def __getattr__(self, key):
216         # Support "magic recaller" methods on self.
217         for cls in self.arena._registered_classes.iterkeys():
218             name = cls.__name__
219             if name == key:
220                 def recaller(*args, **kwargs):
221                     # Allow identifiers to be supplied as args or kwargs
222                     # (since the common case will be a single identifier).
223                     for arg, key in zip(args, cls.identifiers):
224                         kwargs[str(key)] = arg
225                     expr = logic.filter(**kwargs)
226                     try:
227                         return self.xrecall(cls, expr).next()
228                     except StopIteration:
229                         return None
230                 recaller.__doc__ = "A single %s Unit, else None." % name
231                 return recaller
232         raise AttributeError("Sandbox object has no attribute '%s'" % key)
233    
234     def memorize(self, unit):
235         """memorize(unit). Persist unit in storage."""
236         cls = unit.__class__
237         unit.sandbox = self
238        
239         # Ask the store to accept the unit, assigning it primary key values
240         # if necessary. The store should also call unit.cleanse() if it
241         # saves the whole unit state on this call.
242         self.arena.storage(cls).reserve(unit)
243        
244         # Insert the unit into the cache.
245         id = unit.identity()
246         self._cache(cls)[id] = unit
247         self.arena.log("MEMORIZE %s: %s" % (cls.__name__, id), LOGMEMORIZE)
248        
249         # Do this at the end of the func, since most on_memorize
250         # will want to have an identity when called.
251         if hasattr(unit, "on_memorize"):
252             unit.on_memorize()
253    
254     def forget(self, unit):
255         """Destroy unit, both in the cache and storage."""
256         cls = unit.__class__
257        
258         id = unit.identity()
259         self.arena.log("FORGET %s: %s" % (cls.__name__, id), LOGFORGET)
260         self.arena.storage(cls).destroy(unit)
261        
262         del self._cache(cls)[id]
263        
264         # This must be done after the destroy() call, so that a
265         # related unit can poll all instances of this class.
266         if hasattr(unit, "on_forget"):
267             unit.on_forget()
268        
269         unit.sandbox = None
270    
271     def xrecall(self, classes, expr=None):
272         """Iterator over units of cls which match expr."""
273         if classes.__class__.__name__ == "UnitJoin":
274             for unitrow in self.xmulti(classes, expr):
275                 yield unitrow
276             return
277        
278         cls = classes
279         self.arena.log("RECALL %s: %s" % (cls.__name__, expr), LOGRECALL)
280        
281         # Collect all registered subclasses of cls.
282         # Note that cls is a subclass of itself.
283         classes = [c for c in self.arena._registered_classes.iterkeys()
284                    if issubclass(c, cls)]
285         if not classes:
286             # Even the requested class is not registered.
287             raise errors.UnrecallableError("The '%s' class is not registered."
288                                            % cls.__name__)
289        
290         for cls in classes:
291             finished = False
292             cache = self._cache(cls)
293            
294             # Special-case the scenario where one Unit is expected
295             # and called by ID. We should be able to save a database hit.
296             if expr:
297                 fc = expr.func.func_code
298                 if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S'
299                     and fc.co_names[-1] == 'ID'):
300                     ID = fc.co_consts[-1]
301                     unit = cache.get((ID,))
302                     if unit is not None:
303                         # Do NOT call on_recall here. That should be called
304                         # only at the Sandbox-SM boundary.
305                         yield unit
306                         finished = True
307            
308             if not finished:
309                 # Query the cache. We have to use a static copy of the
310                 # keys, to ensure that our cache doesn't change size
311                 # during iteration (due to overlapping xrecalls).
312                 keys = cache.keys()
313                 for id in keys:
314                     unit = cache.get(id)
315                     if unit and ((expr is None) or expr.evaluate(unit)):
316                         # Do NOT call on_recall here. That should be called
317                         # only at the Sandbox-SM boundary.
318                         yield unit
319                
320                 # Query Storage.
321                 for unit in self.arena.storage(cls).recall(cls, expr):
322                     id = unit.identity()
323                     # Don't offer up a unit that was already checked in our cache
324                     # (whether it matched the expr() or not--we assume the cache
325                     # has the freshest data).
326                     # A list is 2% to 4% faster than a dict, by the way (in Py 2.3).
327                     if id not in keys:
328                         # Very important that we check for existing unit, as its
329                         # state may have changed in memory but not in storage.
330                         # Make sure the cache lookup and get happens atomically.
331                         existing = cache.get(id)
332                         if existing:
333                             yield existing
334                         else:
335                             unit.sandbox = self
336                             confirmed = True
337                             cache[id] = unit
338                             if hasattr(unit, 'on_recall'):
339                                 try:
340                                     unit.on_recall()
341                                 except errors.UnrecallableError:
342                                     confirmed = False
343                             if confirmed:
344                                 yield unit
345    
346     def recall(self, classes, expr=None):
347         """List of units of class 'cls' which match expr."""
348         return [x for x in self.xrecall(classes, expr)]
349    
350     def xmulti(self, classes, expr=None):
351         """xmulti(classes, expr) -> [[unit, ...], [unit, ...], ...]
352         Recall units of each cls if they together match the expr.
353         
354         Each yielded value will be a list of Units, in the same order as
355         the classes arg. This facilitates unpacking in iterative consumer
356         code like:
357         
358         for invoice, price in sandbox.xmulti(Invoice & Price, f):
359             deal_with(invoice)
360             deal_with(price)
361         """
362        
363         self.arena.log("RECALL %s %s" %
364                        (", ".join([c.__name__ for c in classes]), expr),
365                        LOGRECALL)
366        
367         stores = [self.arena.storage(cls) for cls in classes]
368         firststore = stores[0]
369         for s in stores:
370             if s is not firststore:
371                 raise ValueError(u"xmulti() does not support multiple"
372                                  u" classes in disparate stores.")
373        
374         # This is broken. If a filter expr is supplied, then the store may
375         # not return rows which our cache would, and those won't be included
376         # in the resultset. If you're using xmulti with no expr's, or
377         # in read-only scripts, it should be OK for now. But if you mutate
378         # Units and then call multirecall, expect inconsistent results.
379         for unitset in firststore.multirecall(classes, expr):
380             confirmed = True
381             for index in xrange(len(unitset)):
382                 unit = unitset[index]
383                 id = unit.identity()
384                 cache = self._cache(unit.__class__)
385                 if id in cache:
386                     # Keep the unit which is in our cache!
387                     unitset[index] = cache[id]
388                 else:
389                     cache[id] = unit
390                     unit.sandbox = self
391                     if hasattr(unit, 'on_recall'):
392                         try:
393                             unit.on_recall()
394                         except errors.UnrecallableError:
395                             confirmed = False
396                             break
397             if confirmed:
398                 yield unitset
399    
400     def unit(self, cls, **kwargs):
401         """unit(cls, **kwargs) -> A single matching Unit, else None.
402         
403         **kwargs will be combined into an Expression via logic.filter.
404             The first Unit matching that expression is returned; if no
405             Units match, None is returned.
406         
407         If you need a single Unit which matches a more complex
408             expression, use recall()[0] or xrecall().next().
409         """
410         expr = None
411         if kwargs:
412             expr = logic.filter(**kwargs)
413         try:
414             return self.xrecall(cls, expr).next()
415         except StopIteration:
416             return None
417    
418     def view(self, cls, attrs, expr=None):
419         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
420         self.arena.log("VIEW %s [%s]: %s" % (cls.__name__, attrs, expr), LOGVIEW)
421        
422         cache = self._cache(cls)
423        
424         for unit in cache.itervalues():
425             if expr is None or expr(unit):
426                 yield tuple([getattr(unit, attr) for attr in attrs])
427        
428         # Add the identity attribute(s) if not present. This is necessary
429         # to avoid duplicating objects which are already in our cache.
430         fields = list(attrs)
431         indices = []
432         added_fields = 0
433         for key in cls.identifiers:
434             if key not in fields:
435                 added_fields += 1
436                 fields.append(key)
437             indices.append(fields.index(key))
438        
439         for row in self.arena.storage(cls).view(cls, fields, expr):
440             id = tuple([row[x] for x in indices])
441             if id not in cache:
442                 if added_fields:
443                     # Remove the added identifier columns from the row.
444                     row = row[:len(row) - added_fields]
445                 yield row
446    
447     def distinct(self, cls, attrs, expr=None):
448         """distinct(cls, attrs, expr=None) -> List of distinct Property tuples.
449         
450         If only one attribute is specified, a list of values will be returned.
451         If more than one attribute is specified, a zipped list will be returned.
452         
453         Notice that you can also use this function as a count() function
454         (in fact it's the only way to do it) by using attrs = ['ID'].
455         """
456         self.arena.log("DISTINCT %s [%s]: %s" % (cls.__name__, attrs, expr), LOGVIEW)
457        
458         seen = {}
459         cache = self._cache(cls)
460         for unit in cache.itervalues():
461             if expr is None or expr(unit):
462                 row = tuple([getattr(unit, attr) for attr in attrs])
463                 if row not in seen:
464                     seen[row] = None
465        
466         for row in self.arena.storage(cls).distinct(cls, attrs, expr):
467             if row not in seen:
468                 seen[row] = None
469        
470         seen = seen.keys()
471         seen.sort()
472         if len(attrs) == 1:
473             seen = [x[0] for x in seen]
474         return seen
475    
476     def count(self, cls, expr):
477         """count(cls, expr) -> Number of Units of class 'cls'."""
478         return len(self.distinct(cls, cls.identifiers, expr))
479    
480     ####################################
481     ##        Cache Management        ##
482     ####################################
483    
484     def _cache(self, cls):
485         """cache(cls). Return the cache for the specified class.
486         
487         This base class creates a new cache for each cls per request.
488         """
489         if cls not in self._caches:
490             self._caches[cls] = {}
491         return self._caches[cls]
492    
493     def purge(self, cls):
494         """purge(cls). Drop all cached Units of class 'cls'. Do not save."""
495         del self._caches[cls]
496    
497     def repress(self, unit):
498         """repress(unit). Remove unit from cache (but don't destroy)."""
499         cls = unit.__class__
500         id = unit.identity()
501         self.arena.log("REPRESS %s: %s" % (cls.__name__, id), LOGREPRESS)
502        
503         if hasattr(unit, "on_repress"):
504             unit.on_repress()
505        
506         # Save after on_repress in case on_repress modified the unit.
507         self.arena.storage(cls).save(unit)
508        
509         del self._cache(cls)[id]
510    
511     def flush_all(self):
512         """flush_all(). Repress all units."""
513        
514         for cls in self._caches.keys():
515             # Call all on_repress methods first! There are truly horrible
516             # interdependency chains in most on_repress methods, and
517             # it's best to resolve them all at once BEFORE flushing
518             # any units from the cache.
519             # Note we use values instead of itervalues, since the
520             # cache may change size during iteration.
521             for unit in self._cache(cls).values():
522                 if hasattr(unit, "on_repress"):
523                     unit.on_repress()
524        
525         for cls in self._caches.keys():
526             cache = self._cache(cls)
527             store = self.arena.storage(cls)
528             while cache:
529                 unitid, unit = cache.popitem()
530                 self.arena.log("REPRESS %s: %s" % (cls.__name__, unitid), LOGREPRESS)
531                 store.save(unit)
532
Note: See TracBrowser for help on using the browser.