Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/arenas.py

Revision 367 (checked in by fumanchu, 6 years ago)

Added a "managers" registry to dejavu.storage. This allows short names to be used in config instead of full class names.

  • Property svn:eol-style set to native
Line 
1
2 import ConfigParser
3 import threading
4 from types import ClassType
5
6 from dejavu.containers import Graph
7 from dejavu import logic, errors, storage, xray
8
9 __all__ = ['Arena', 'Sandbox', 'logflags',
10            ]
11
12
13 class Enum(object):
14     pass
15
16 # logging flags (see Arena.logflags)
17 logflags = Enum()
18 logflags.ERROR = 1
19 logflags.IO = 2
20 logflags.SQL = 4
21
22 logflags.MEMORIZE = 128
23 logflags.RECALL = 256
24 logflags.VIEW = 512
25 logflags.REPRESS = 1024
26 logflags.FORGET = 2048
27 logflags.SANDBOX = (logflags.MEMORIZE | logflags.RECALL | logflags.VIEW |
28                     logflags.REPRESS | logflags.FORGET)
29
30
31 class Arena(object):
32     """A namespace/workspace for a Dejavu application."""
33    
34     def __init__(self):
35         self._sync_lock = threading.Lock()
36         self.stores = {}
37         self._registered_classes = {}
38         self.associations = Graph(directed=False)
39         self.engine_functions = {}
40         self.logflags = logflags.ERROR + logflags.IO
41    
42     def log(self, message):
43         """Default logger (writes to stdout). Feel free to replace."""
44         if isinstance(message, unicode):
45             print message.encode('utf8')
46         else:
47             print message
48    
49     def load(self, configFileName):
50         """Load StorageManagers."""
51         parser = ConfigParser.ConfigParser()
52         # Make names case-sensitive by overriding optionxform.
53         parser.optionxform = unicode
54         parser.read(configFileName)
55        
56         stores = []
57         for section in parser.sections():
58             opts = dict(parser.items(section))
59             stores.append((int(opts.get("Load Order", "0")), section, opts))
60         stores.sort()
61        
62         for order, name, options in stores:
63             self.add_store(name, options[u'Class'], options)
64    
65     def add_store(self, name, store, options=None):
66         """Register a StorageManager.
67         
68         The 'store' argument may be a StorageManager class, an instance of
69         that class, or the full importable dotted-package name of the class.
70         """
71        
72         if isinstance(store, basestring):
73             if store in storage.managers:
74                 store = storage.managers[store]
75        
76         if isinstance(store, basestring):
77             store = xray.classes(store)(self, options or {})
78         elif isinstance(store, (type, ClassType)):
79             store = store(self, options or {})
80        
81         self.stores[name] = store
82         return store
83    
84     def remove_store(self, name):
85         if name in self.stores:
86             store = self.stores[name]
87            
88             # Disassociate all registered classes with this store.
89             for c in self._registered_classes.keys():
90                 if self._registered_classes[c] is store:
91                     self._registered_classes[c] = None
92            
93             del self.stores[name]
94    
95     def shutdown(self):
96         """Shutdown the arena."""
97         # Tell all stores to shut down.
98         stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()]
99         stores.sort()
100         for order, store, name in stores:
101             store.shutdown()
102    
103     def new_sandbox(self):
104         """Return a new sandbox object in this Arena."""
105         return Sandbox(self)
106    
107    
108     # --------------------- Unit Class Registration --------------------- #
109    
110     def register(self, cls):
111         """Assert that Units of class 'cls' will be handled."""
112         # We must allow modules to register classes before any stores have
113         # been added, but not overwrite a store which has already been found.
114         if cls not in self._registered_classes:
115             self._registered_classes[cls] = None
116            
117             # Register any association(s) in an undirected graph.
118             for ua in cls._associations.itervalues():
119                 if getattr(ua, "register", True):
120                     self.associations.connect(cls, ua.farClass)
121    
122     def register_all(self, globals):
123         import dejavu
124         seen = {}
125         for obj in globals.itervalues():
126             if isinstance(obj, type) and issubclass(obj, dejavu.Unit):
127                 self.register(obj)
128                 seen[obj] = None
129         return seen.keys()
130    
131     def class_by_name(self, classname):
132         for cls in self._registered_classes:
133             if cls.__name__ == classname:
134                 return cls
135         raise KeyError("No registered class found for '%s'." % classname)
136    
137     def storage(self, cls):
138         """Return the StorageManager which handles Units of the given class."""
139         store = self._registered_classes.get(cls)
140         if store:
141             return store
142        
143         self._sync_lock.acquire()
144         try:
145             # Search all stores for the class name.
146             default_store = None
147             clsname = cls.__name__
148             for store in self.stores.values():
149                 if not store.classnames:
150                     # This store has no "classnames" list, which signals that it
151                     # handles all classes which are not handled by other stores.
152                     default_store = store
153                 elif clsname in store.classnames:
154                     store.sync([cls])
155                     self._registered_classes[cls] = store
156                     return store
157            
158             # Name not found in any store's classnames. Try a default store.
159             if default_store:
160                 default_store.sync([cls])
161                 self._registered_classes[cls] = default_store
162                 return default_store
163         finally:
164             self._sync_lock.release()
165        
166         raise KeyError("No store found for '%s'." % clsname)
167    
168     def create_storage(self, cls):
169         """Create storage space for cls."""
170         self.storage(cls).create_storage(cls)
171    
172     def has_storage(self, cls):
173         return self.storage(cls).has_storage(cls)
174    
175     def drop_storage(self, cls):
176         self.storage(cls).drop_storage(cls)
177    
178     def add_property(self, cls, name):
179         self.storage(cls).add_property(cls, name)
180    
181     def drop_property(self, cls, name):
182         self.storage(cls).drop_property(cls, name)
183    
184     def rename_property(self, cls, oldname, newname):
185         self.storage(cls).rename_property(cls, oldname, newname)
186    
187     def migrate_class(self, cls, new_store):
188         """Copy all units of cls to new_store."""
189         new_store.create_storage(cls)
190         for unit in self.new_sandbox().xrecall(cls):
191             new_store.reserve(unit)
192             new_store.save(unit, True)
193    
194     def migrate(self, new_store, old_store=None, copy_only=False):
195         """Copy all units (of old_store) to new_store."""
196         for cls in self._registered_classes:
197             store = self.storage(cls)
198             if old_store is None or old_store is store:
199                 self.migrate_class(cls, new_store)
200                 if not copy_only:
201                     self._registered_classes[cls] = new_store
202
203
204 ###########################################################################
205 ##                                                                       ##
206 ##                              Sandboxes                                ##
207 ##                                                                       ##
208 ###########################################################################
209
210
211 class Sandbox(object):
212     """Data sandbox for Dejavu arenas.
213     
214     Each consumer (that is, each UI process or thread) maintains a Sandbox
215     for managing Units. Sandboxes populate themselves with Units on a lazy
216     basis, allowing UI code to request data as it's needed. However, once
217     obtained, such Units are persisted (usually for the lifetime of the
218     thread); this important detail means that multiple requests for the
219     same Units result in multiple references to the same objects, rather
220     than multiple objects. Sandboxes are basically what Fowler calls
221     Identity Maps.
222     
223     The *REALLY* important thing to understand if you're customizing this
224     is that Sandboxes won't survive sharing across threads--DON'T TRY IT.
225     If you need to share unit data across requests, use or make an SM which
226     persists the data, and chain it with another, more normal SM.
227     
228     _cache(), _caches, and _stores are private for a reason--don't access
229     them from interface code--tell the Sandbox to do it for you.
230     
231     Starting with Python 2.5, each Sandbox instance is its own context
232     manager, so you can have boxes automatically flush themselves
233     when you're done, and automatically rollback on error. Example:
234     
235     # __future__ only needed for Python 2.5, not 2.6+
236     from __future__ import with_statement
237     
238     with arena.new_sandbox() as box:
239         WAP = box.unit(Zoo, Name='Wild Animal Park')
240         WAP.Opens = now
241     """
242    
243     def __init__(self, arena):
244         self.arena = arena
245         self._caches = {}
246    
247     def __getattr__(self, key):
248         # Support "magic recaller" methods on self.
249         for cls in self.arena._registered_classes.iterkeys():
250             name = cls.__name__
251             if name == key:
252                 def recaller(*args, **kwargs):
253                     # Allow identifiers to be supplied as args or kwargs
254                     # (since the common case will be a single identifier).
255                     for arg, key in zip(args, cls.identifiers):
256                         kwargs[str(key)] = arg
257                     expr = logic.filter(**kwargs)
258                     try:
259                         return self.xrecall(cls, expr).next()
260                     except StopIteration:
261                         return None
262                 recaller.__doc__ = "A single %s Unit, else None." % name
263                 return recaller
264         raise AttributeError("Sandbox object has no attribute '%s'" % key)
265    
266     def memorize(self, unit):
267         """Persist unit in storage."""
268         cls = unit.__class__
269         unit.sandbox = self
270        
271         # Ask the store to accept the unit, assigning it primary key values
272         # if necessary. The store should also call unit.cleanse() if it
273         # saves the whole unit state on this call.
274         self.arena.storage(cls).reserve(unit)
275        
276         # Insert the unit into the cache.
277         id = unit.identity()
278         self._cache(cls)[id] = unit
279         if self.arena.logflags & logflags.MEMORIZE:
280             self.arena.log("MEMORIZE %s: %s" % (cls.__name__, id))
281        
282         # Do this at the end of the func, since most on_memorize
283         # will want to have an identity when called.
284         if hasattr(unit, "on_memorize"):
285             unit.on_memorize()
286    
287     def forget(self, unit):
288         """Destroy unit, both in the cache and storage."""
289         cls = unit.__class__
290        
291         id = unit.identity()
292         if self.arena.logflags & logflags.FORGET:
293             self.arena.log("FORGET %s: %s" % (cls.__name__, id))
294         self.arena.storage(cls).destroy(unit)
295        
296         del self._cache(cls)[id]
297        
298         # This must be done after the destroy() call, so that a
299         # related unit can poll all instances of this class.
300         if hasattr(unit, "on_forget"):
301             unit.on_forget()
302        
303         unit.sandbox = None
304    
305     def xrecall(self, classes, expr=None, inherit=False, **kwargs):
306         """Iterator over units of cls which match expr.
307         
308         If inherit is True, units of the given class and all registered
309             subclasses of the given class will be recalled.
310         """
311         if classes.__class__.__name__ == "UnitJoin":
312             for unitrow in self.xmulti(classes, expr, **kwargs):
313                 yield unitrow
314             return
315        
316         cls = classes
317        
318         if expr and not isinstance(expr, logic.Expression):
319             expr = logic.Expression(expr)
320         if kwargs:
321             f = logic.filter(**kwargs)
322             if expr:
323                 expr += f
324             else:
325                 expr = f
326        
327         if self.arena.logflags & logflags.RECALL:
328             self.arena.log("RECALL %s: %s" % (cls.__name__, expr))
329        
330         if inherit:
331             # Collect all registered subclasses of cls.
332             # Note that cls is a subclass of itself.
333             classes = [c for c in self.arena._registered_classes.iterkeys()
334                        if issubclass(c, cls)]
335         else:
336             classes = [cls]
337         if not classes:
338             # Even the requested class is not registered.
339             raise errors.UnrecallableError("The '%s' class is not registered."
340                                            % cls.__name__)
341        
342         for cls in classes:
343             cache = self._cache(cls)
344            
345             # Special-case the scenario where one Unit is expected
346             # and called by ID. We should be able to save a database hit.
347             if expr:
348                 fc = expr.func.func_code
349                 if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S'
350                     and fc.co_names[-1] == 'ID'):
351                     ID = fc.co_consts[-1]
352                     unit = cache.get((ID,))
353                     if unit is not None:
354                         # Do NOT call on_recall here. That should be called
355                         # only at the Sandbox-SM boundary.
356                         yield unit
357                         return
358            
359             # Query the cache. We have to use a static copy of the
360             # keys, to ensure that our cache doesn't change size
361             # during iteration (due to overlapping xrecalls).
362             keys = cache.keys()
363             for id in keys:
364                 unit = cache.get(id)
365                 if unit and ((expr is None) or expr.evaluate(unit)):
366                     # Do NOT call on_recall here. That should be called
367                     # only at the Sandbox-SM boundary.
368                     yield unit
369            
370             # Query Storage.
371             for unit in self.arena.storage(cls).recall(cls, expr):
372                 id = unit.identity()
373                 # Don't offer up a unit that was already checked in our cache
374                 # (whether it matched the expr() or not--we assume the cache
375                 # has the freshest data).
376                 if id not in keys:
377                     # Very important that we check for existing unit, as its
378                     # state may have changed in memory but not in storage
379                     # (even between our cache yields and this yield).
380                     # Make sure the cache lookup and get happens atomically.
381                     existing = cache.get(id)
382                     if existing:
383                         yield existing
384                     else:
385                         unit.sandbox = self
386                         confirmed = True
387                         cache[id] = unit
388                         if hasattr(unit, 'on_recall'):
389                             try:
390                                 unit.on_recall()
391                             except errors.UnrecallableError:
392                                 confirmed = False
393                         if confirmed:
394                             yield unit
395    
396     def recall(self, classes, expr=None, inherit=False, **kwargs):
397         """List of units of the given class which match expr.
398         
399         If inherit is True, units of the given class and all registered
400             subclasses of the given class will be recalled.
401         """
402         return [x for x in self.xrecall(classes, expr, inherit, **kwargs)]
403    
404     def xmulti(self, classes, expr=None, **kwargs):
405         """Recall units of each cls if they together match the expr.
406         
407         Each yielded value will be a list of Units, in the same order as
408         the classes arg. This facilitates unpacking in iterative consumer
409         code like:
410         
411         for invoice, price in sandbox.xmulti(Invoice & Price, f):
412             deal_with(invoice)
413             deal_with(price)
414         """
415        
416         if expr and not isinstance(expr, logic.Expression):
417             expr = logic.Expression(expr)
418         if kwargs:
419             f = logic.filter(**kwargs)
420             if expr:
421                 expr += f
422             else:
423                 expr = f
424        
425         if self.arena.logflags & logflags.RECALL:
426             self.arena.log("RECALL %s %s" %
427                            (", ".join([c.__name__ for c in classes]), expr))
428        
429         stores = [self.arena.storage(cls) for cls in classes]
430         firststore = stores[0]
431         for s in stores:
432             if s is not firststore:
433                 raise ValueError(u"xmulti() does not support multiple"
434                                  u" classes in disparate stores.")
435        
436         # This is broken. If a filter expr is supplied, then the store may
437         # not return rows which our cache would, and those won't be included
438         # in the resultset. If you're using xmulti with no expr's, or
439         # in read-only scripts, it should be OK for now. But if you mutate
440         # Units and then call multirecall, expect inconsistent results.
441         for unitset in firststore.multirecall(classes, expr):
442             confirmed = True
443             for index in xrange(len(unitset)):
444                 unit = unitset[index]
445                 id = unit.identity()
446                 cache = self._cache(unit.__class__)
447                 if id in cache:
448                     # Keep the unit which is in our cache!
449                     unitset[index] = cache[id]
450                 else:
451                     cache[id] = unit
452                     unit.sandbox = self
453                     if hasattr(unit, 'on_recall'):
454                         try:
455                             unit.on_recall()
456                         except errors.UnrecallableError:
457                             confirmed = False
458                             break
459             if confirmed:
460                 yield unitset
461    
462     def unit(self, cls, expr=None, inherit=False, **kwargs):
463         """A single matching Unit, else None.
464         
465         **kwargs will be combined into an Expression via logic.filter.
466             The first Unit matching that expression is returned; if no
467             Units match, None is returned.
468         """
469         try:
470             return self.xrecall(cls, expr, inherit, **kwargs).next()
471         except StopIteration:
472             return None
473    
474     def view(self, cls, attrs, expr=None, **kwargs):
475         """Yield tuples of attrs for the given cls which match the expr.
476         
477         cls: The Unit subclass for which to yield property tuples.
478         attrs: a sequence of strings; each should be the name of
479             a UnitProperty on the given cls.
480         expr: a lambda or logic.Expression. If provided, data will only
481             be yielded for units of the given cls which match the expr.
482         **kwargs: additional expr filters in name=value format.
483         
484         Each yielded value will be a list of values, in the same order as
485         the attrs arg. This facilitates unpacking in iterative consumer
486         code like:
487         
488         for id, name in sandbox.view(Invoice, ['ID', 'Name'], f):
489             print id, ": ", name
490         
491         This is generally much faster than recall, and should be preferred
492         for performance-sensitive code.
493         """
494         if expr and not isinstance(expr, logic.Expression):
495             expr = logic.Expression(expr)
496         if kwargs:
497             f = logic.filter(**kwargs)
498             if expr:
499                 expr += f
500             else:
501                 expr = f
502        
503         if self.arena.logflags & logflags.VIEW:
504             self.arena.log("VIEW %s [%s]: %s" % (cls.__name__, attrs, expr))
505        
506         cache = self._cache(cls)
507        
508         for unit in cache.itervalues():
509             if expr is None or expr(unit):
510                 yield tuple([getattr(unit, attr) for attr in attrs])
511        
512         # Add the identity attribute(s) if not present. This is necessary
513         # to avoid duplicating objects which are already in our cache.
514         fields = list(attrs)
515         indices = []
516         added_fields = 0
517         for key in cls.identifiers:
518             if key not in fields:
519                 added_fields += 1
520                 fields.append(key)
521             indices.append(fields.index(key))
522        
523         for row in self.arena.storage(cls).view(cls, fields, expr):
524             id = tuple([row[x] for x in indices])
525             if id not in cache:
526                 if added_fields:
527                     # Remove the added identifier columns from the row.
528                     row = row[:-added_fields]
529                 yield row
530    
531     def sum(self, cls, attr, expr=None, **kwargs):
532         """Sum of all non-None values for the given cls.attr."""
533         expr = logic.Expression(lambda x: getattr(x, attr) != None) + expr
534         return sum([row[0] for row in self.view(cls, (attr,), expr, **kwargs)])
535    
536     def distinct(self, cls, attrs, expr=None, **kwargs):
537         """List of distinct Property tuples.
538         
539         If only one attribute is specified, a list of values will be returned.
540         If more than one attribute is specified, a zipped list will be returned.
541         
542         Notice that you can also use this function as a count() function
543         (in fact it's the only way to do it) by using attrs = ['ID'].
544         """
545         if expr and not isinstance(expr, logic.Expression):
546             expr = logic.Expression(expr)
547         if kwargs:
548             f = logic.filter(**kwargs)
549             if expr:
550                 expr += f
551             else:
552                 expr = f
553        
554         if self.arena.logflags & logflags.VIEW:
555             self.arena.log("DISTINCT %s [%s]: %s" % (cls.__name__, attrs, expr))
556        
557         seen = {}
558         cache = self._cache(cls)
559         for unit in cache.itervalues():
560             if expr is None or expr(unit):
561                 row = tuple([getattr(unit, attr) for attr in attrs])
562                 if row not in seen:
563                     seen[row] = None
564        
565         for row in self.arena.storage(cls).distinct(cls, attrs, expr):
566             if row not in seen:
567                 seen[row] = None
568        
569         seen = seen.keys()
570         seen.sort()
571         if len(attrs) == 1:
572             seen = [x[0] for x in seen]
573         return seen
574    
575     def count(self, cls, expr):
576         """Number of Units of class 'cls'."""
577         return len(self.distinct(cls, cls.identifiers, expr))
578    
579     def range(self, cls, attr, expr=None, **kwargs):
580         """Distinct, non-None attr values (ordered and continuous, if possible).
581         
582         If the given attribute is a known discrete, ordered type
583         (like int, long, datetime.date), this returns the closed interval:
584             [min(attr), ..., max(attr)]
585         That is, all possible values will be output between min and max,
586         even if they do not appear in the dataset.
587         """
588         existing = [x for x in self.distinct(cls, [attr], expr, **kwargs)
589                     if x is not None]
590         if not existing:
591             return []
592        
593         attr_type = getattr(cls, attr).type
594         if issubclass(attr_type, (int, long)):
595             return range(min(existing), max(existing) + 1)
596         else:
597             try:
598                 import datetime
599             except ImportError:
600                 pass
601             else:
602                 if issubclass(attr_type, datetime.date):
603                     def date_gen():
604                         start, end = min(existing), max(existing)
605                         for d in range((end + 1) - start):
606                             yield start + datetime.timedelta(d)
607                     return date_gen()
608        
609         try:
610             existing.sort()
611         except TypeError:
612             pass
613        
614         return existing
615    
616     #                           Cache Management                           #
617    
618     def _cache(self, cls):
619         """Return the cache for the specified class.
620         
621         This base class creates a new cache for each cls per request.
622         """
623         if cls not in self._caches:
624             self._caches[cls] = {}
625         return self._caches[cls]
626    
627     def purge(self, cls):
628         """Drop all cached Units of class 'cls'. Do not save."""
629         del self._caches[cls]
630    
631     def repress(self, unit):
632         """Remove unit from cache (but don't destroy)."""
633         cls = unit.__class__
634         id = unit.identity()
635         if self.arena.logflags & logflags.REPRESS:
636             self.arena.log("REPRESS %s: %s" % (cls.__name__, id))
637        
638         if hasattr(unit, "on_repress"):
639             unit.on_repress()
640        
641         # Save after on_repress in case on_repress modified the unit.
642         self.arena.storage(cls).save(unit)
643        
644         del self._cache(cls)[id]
645         unit.sandbox = None
646    
647     def flush_all(self):
648         """Repress all units."""
649        
650         for cls in self._caches.keys():
651             # Call all on_repress methods first! There are truly horrible
652             # interdependency chains in most on_repress methods, and
653             # it's best to resolve them all at once BEFORE flushing
654             # any units from the cache.
655             # Note we use values instead of itervalues, since the
656             # cache may change size during iteration.
657             for unit in self._cache(cls).values():
658                 if hasattr(unit, "on_repress"):
659                     unit.on_repress()
660        
661         seen_stores = {}
662         for cls in self._caches.keys():
663             cache = self._cache(cls)
664             store = self.arena.storage(cls)
665             seen_stores[store] = None
666             while cache:
667                 unitid, unit = cache.popitem()
668                 if self.arena.logflags & logflags.REPRESS:
669                     self.arena.log("REPRESS %s: %s" % (cls.__name__, unitid))
670                 store.save(unit)
671        
672         for store in seen_stores:
673             try:
674                 commit = store.db.commit
675             except AttributeError:
676                 pass
677             else:
678                 commit()
679    
680     def rollback(self):
681         """Ignore all changes and purge our cache."""
682         seen_stores = {}
683         for cls in self._caches.keys():
684             self.purge(cls)
685             seen_stores[self.arena.storage(cls)] = None
686        
687         for store in seen_stores:
688             try:
689                 rollback = store.db.rollback
690             except AttributeError:
691                 pass
692             else:
693                 rollback()
694    
695     #                          Context Management                          #
696    
697     def __enter__(self):
698         return self
699    
700     def __exit__ (self, type, value, tb):
701         if tb is None:
702             self.flush_all()
703         else:
704             self.rollback()
705
Note: See TracBrowser for help on using the browser.