Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/arenas.py

Revision 356 (checked in by fumanchu, 7 years ago)

Mostly doc tweaks.

  • Property svn:eol-style set to native
Line 
1
2 import ConfigParser
3 import threading
4 from types import ClassType
5
6 from dejavu.containers import Graph
7 from dejavu import logic, errors, xray
8
9 __all__ = ['Arena', 'Sandbox', 'logflags',
10            ]
11
12
13 class Enum(object):
14     pass
15
16 # logging flags (see Arena.logflags)
17 logflags = Enum()
18 logflags.ERROR = 1
19 logflags.IO = 2
20 logflags.SQL = 4
21
22 logflags.MEMORIZE = 128
23 logflags.RECALL = 256
24 logflags.VIEW = 512
25 logflags.REPRESS = 1024
26 logflags.FORGET = 2048
27 logflags.SANDBOX = (logflags.MEMORIZE | logflags.RECALL | logflags.VIEW |
28                     logflags.REPRESS | logflags.FORGET)
29
30
31 class Arena(object):
32     """A namespace/workspace for a Dejavu application."""
33    
34     def __init__(self):
35         self._sync_lock = threading.Lock()
36         self.stores = {}
37         self._registered_classes = {}
38         self.associations = Graph(directed=False)
39         self.engine_functions = {}
40         self.logflags = logflags.ERROR + logflags.IO
41    
42     def log(self, message):
43         """Default logger (writes to stdout). Feel free to replace."""
44         if isinstance(message, unicode):
45             print message.encode('utf8')
46         else:
47             print message
48    
49     def load(self, configFileName):
50         """Load StorageManagers."""
51         parser = ConfigParser.ConfigParser()
52         # Make names case-sensitive by overriding optionxform.
53         parser.optionxform = unicode
54         parser.read(configFileName)
55        
56         stores = []
57         for section in parser.sections():
58             opts = dict(parser.items(section))
59             stores.append((int(opts.get("Load Order", "0")), section, opts))
60         stores.sort()
61        
62         for order, name, options in stores:
63             self.add_store(name, options[u'Class'], options)
64    
65     def add_store(self, name, store, options=None):
66         """Register a StorageManager.
67         
68         The 'store' argument may be a StorageManager class, an instance of
69         that class, or the full importable dotted-package name of the class.
70         """
71        
72         if isinstance(store, basestring):
73             store = xray.classes(store)(self, options or {})
74         elif isinstance(store, (type, ClassType)):
75             store = store(self, options or {})
76        
77         self.stores[name] = store
78         return store
79    
80     def remove_store(self, name):
81         if name in self.stores:
82             store = self.stores[name]
83            
84             # Disassociate all registered classes with this store.
85             for c in self._registered_classes.keys():
86                 if self._registered_classes[c] is store:
87                     self._registered_classes[c] = None
88            
89             del self.stores[name]
90    
91     def shutdown(self):
92         """Shutdown the arena."""
93         # Tell all stores to shut down.
94         stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()]
95         stores.sort()
96         for order, store, name in stores:
97             store.shutdown()
98    
99     def new_sandbox(self):
100         """Return a new sandbox object in this Arena."""
101         return Sandbox(self)
102    
103    
104     # --------------------- Unit Class Registration --------------------- #
105    
106     def register(self, cls):
107         """Assert that Units of class 'cls' will be handled."""
108         # We must allow modules to register classes before any stores have
109         # been added, but not overwrite a store which has already been found.
110         if cls not in self._registered_classes:
111             self._registered_classes[cls] = None
112            
113             # Register any association(s) in an undirected graph.
114             for ua in cls._associations.itervalues():
115                 if getattr(ua, "register", True):
116                     self.associations.connect(cls, ua.farClass)
117    
118     def register_all(self, globals):
119         import dejavu
120         seen = {}
121         for obj in globals.itervalues():
122             if isinstance(obj, type) and issubclass(obj, dejavu.Unit):
123                 self.register(obj)
124                 seen[obj] = None
125         return seen.keys()
126    
127     def class_by_name(self, classname):
128         for cls in self._registered_classes:
129             if cls.__name__ == classname:
130                 return cls
131         raise KeyError("No registered class found for '%s'." % classname)
132    
133     def storage(self, cls):
134         """Return the StorageManager which handles Units of the given class."""
135         store = self._registered_classes.get(cls)
136         if store:
137             return store
138        
139         self._sync_lock.acquire()
140         try:
141             # Search all stores for the class name.
142             default_store = None
143             clsname = cls.__name__
144             for store in self.stores.values():
145                 if not store.classnames:
146                     # This store has no "classnames" list, which signals that it
147                     # handles all classes which are not handled by other stores.
148                     default_store = store
149                 elif clsname in store.classnames:
150                     store.sync([cls])
151                     self._registered_classes[cls] = store
152                     return store
153            
154             # Name not found in any store's classnames. Try a default store.
155             if default_store:
156                 default_store.sync([cls])
157                 self._registered_classes[cls] = default_store
158                 return default_store
159         finally:
160             self._sync_lock.release()
161        
162         raise KeyError("No store found for '%s'." % clsname)
163    
164     def create_storage(self, cls):
165         """Create storage space for cls."""
166         self.storage(cls).create_storage(cls)
167    
168     def has_storage(self, cls):
169         return self.storage(cls).has_storage(cls)
170    
171     def drop_storage(self, cls):
172         self.storage(cls).drop_storage(cls)
173    
174     def add_property(self, cls, name):
175         self.storage(cls).add_property(cls, name)
176    
177     def drop_property(self, cls, name):
178         self.storage(cls).drop_property(cls, name)
179    
180     def rename_property(self, cls, oldname, newname):
181         self.storage(cls).rename_property(cls, oldname, newname)
182    
183     def migrate_class(self, cls, new_store):
184         """Copy all units of cls to new_store."""
185         new_store.create_storage(cls)
186         for unit in self.new_sandbox().xrecall(cls):
187             new_store.reserve(unit)
188             new_store.save(unit, True)
189    
190     def migrate(self, new_store, old_store=None, copy_only=False):
191         """Copy all units (of old_store) to new_store."""
192         for cls in self._registered_classes:
193             store = self.storage(cls)
194             if old_store is None or old_store is store:
195                 self.migrate_class(cls, new_store)
196                 if not copy_only:
197                     self._registered_classes[cls] = new_store
198
199
200 ###########################################################################
201 ##                                                                       ##
202 ##                              Sandboxes                                ##
203 ##                                                                       ##
204 ###########################################################################
205
206
207 class Sandbox(object):
208     """Data sandbox for Dejavu arenas.
209     
210     Each consumer (that is, each UI process or thread) maintains a Sandbox
211     for managing Units. Sandboxes populate themselves with Units on a lazy
212     basis, allowing UI code to request data as it's needed. However, once
213     obtained, such Units are persisted (usually for the lifetime of the
214     thread); this important detail means that multiple requests for the
215     same Units result in multiple references to the same objects, rather
216     than multiple objects. Sandboxes are basically what Fowler calls
217     Identity Maps.
218     
219     The *REALLY* important thing to understand if you're customizing this
220     is that Sandboxes won't survive sharing across threads--DON'T TRY IT.
221     If you need to share unit data across requests, use or make an SM which
222     persists the data, and chain it with another, more normal SM.
223     
224     _cache(), _caches, and _stores are private for a reason--don't access
225     them from interface code--tell the Sandbox to do it for you.
226     
227     Starting with Python 2.5, each Sandbox instance is its own context
228     manager, so you can have boxes automatically flush themselves
229     when you're done, and automatically rollback on error. Example:
230     
231     # __future__ only needed for Python 2.5, not 2.6+
232     from __future__ import with_statement
233     
234     with arena.new_sandbox() as box:
235         WAP = box.unit(Zoo, Name='Wild Animal Park')
236         WAP.Opens = now
237     """
238    
239     def __init__(self, arena):
240         self.arena = arena
241         self._caches = {}
242    
243     def __getattr__(self, key):
244         # Support "magic recaller" methods on self.
245         for cls in self.arena._registered_classes.iterkeys():
246             name = cls.__name__
247             if name == key:
248                 def recaller(*args, **kwargs):
249                     # Allow identifiers to be supplied as args or kwargs
250                     # (since the common case will be a single identifier).
251                     for arg, key in zip(args, cls.identifiers):
252                         kwargs[str(key)] = arg
253                     expr = logic.filter(**kwargs)
254                     try:
255                         return self.xrecall(cls, expr).next()
256                     except StopIteration:
257                         return None
258                 recaller.__doc__ = "A single %s Unit, else None." % name
259                 return recaller
260         raise AttributeError("Sandbox object has no attribute '%s'" % key)
261    
262     def memorize(self, unit):
263         """Persist unit in storage."""
264         cls = unit.__class__
265         unit.sandbox = self
266        
267         # Ask the store to accept the unit, assigning it primary key values
268         # if necessary. The store should also call unit.cleanse() if it
269         # saves the whole unit state on this call.
270         self.arena.storage(cls).reserve(unit)
271        
272         # Insert the unit into the cache.
273         id = unit.identity()
274         self._cache(cls)[id] = unit
275         if self.arena.logflags & logflags.MEMORIZE:
276             self.arena.log("MEMORIZE %s: %s" % (cls.__name__, id))
277        
278         # Do this at the end of the func, since most on_memorize
279         # will want to have an identity when called.
280         if hasattr(unit, "on_memorize"):
281             unit.on_memorize()
282    
283     def forget(self, unit):
284         """Destroy unit, both in the cache and storage."""
285         cls = unit.__class__
286        
287         id = unit.identity()
288         if self.arena.logflags & logflags.FORGET:
289             self.arena.log("FORGET %s: %s" % (cls.__name__, id))
290         self.arena.storage(cls).destroy(unit)
291        
292         del self._cache(cls)[id]
293        
294         # This must be done after the destroy() call, so that a
295         # related unit can poll all instances of this class.
296         if hasattr(unit, "on_forget"):
297             unit.on_forget()
298        
299         unit.sandbox = None
300    
301     def xrecall(self, classes, expr=None, inherit=False, **kwargs):
302         """Iterator over units of cls which match expr.
303         
304         If inherit is True, units of the given class and all registered
305             subclasses of the given class will be recalled.
306         """
307         if classes.__class__.__name__ == "UnitJoin":
308             for unitrow in self.xmulti(classes, expr, **kwargs):
309                 yield unitrow
310             return
311        
312         cls = classes
313        
314         if expr and not isinstance(expr, logic.Expression):
315             expr = logic.Expression(expr)
316         if kwargs:
317             f = logic.filter(**kwargs)
318             if expr:
319                 expr += f
320             else:
321                 expr = f
322        
323         if self.arena.logflags & logflags.RECALL:
324             self.arena.log("RECALL %s: %s" % (cls.__name__, expr))
325        
326         if inherit:
327             # Collect all registered subclasses of cls.
328             # Note that cls is a subclass of itself.
329             classes = [c for c in self.arena._registered_classes.iterkeys()
330                        if issubclass(c, cls)]
331         else:
332             classes = [cls]
333         if not classes:
334             # Even the requested class is not registered.
335             raise errors.UnrecallableError("The '%s' class is not registered."
336                                            % cls.__name__)
337        
338         for cls in classes:
339             cache = self._cache(cls)
340            
341             # Special-case the scenario where one Unit is expected
342             # and called by ID. We should be able to save a database hit.
343             if expr:
344                 fc = expr.func.func_code
345                 if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S'
346                     and fc.co_names[-1] == 'ID'):
347                     ID = fc.co_consts[-1]
348                     unit = cache.get((ID,))
349                     if unit is not None:
350                         # Do NOT call on_recall here. That should be called
351                         # only at the Sandbox-SM boundary.
352                         yield unit
353                         return
354            
355             # Query the cache. We have to use a static copy of the
356             # keys, to ensure that our cache doesn't change size
357             # during iteration (due to overlapping xrecalls).
358             keys = cache.keys()
359             for id in keys:
360                 unit = cache.get(id)
361                 if unit and ((expr is None) or expr.evaluate(unit)):
362                     # Do NOT call on_recall here. That should be called
363                     # only at the Sandbox-SM boundary.
364                     yield unit
365            
366             # Query Storage.
367             for unit in self.arena.storage(cls).recall(cls, expr):
368                 id = unit.identity()
369                 # Don't offer up a unit that was already checked in our cache
370                 # (whether it matched the expr() or not--we assume the cache
371                 # has the freshest data).
372                 if id not in keys:
373                     # Very important that we check for existing unit, as its
374                     # state may have changed in memory but not in storage
375                     # (even between our cache yields and this yield).
376                     # Make sure the cache lookup and get happens atomically.
377                     existing = cache.get(id)
378                     if existing:
379                         yield existing
380                     else:
381                         unit.sandbox = self
382                         confirmed = True
383                         cache[id] = unit
384                         if hasattr(unit, 'on_recall'):
385                             try:
386                                 unit.on_recall()
387                             except errors.UnrecallableError:
388                                 confirmed = False
389                         if confirmed:
390                             yield unit
391    
392     def recall(self, classes, expr=None, inherit=False, **kwargs):
393         """List of units of the given class which match expr.
394         
395         If inherit is True, units of the given class and all registered
396             subclasses of the given class will be recalled.
397         """
398         return [x for x in self.xrecall(classes, expr, inherit, **kwargs)]
399    
400     def xmulti(self, classes, expr=None, **kwargs):
401         """Recall units of each cls if they together match the expr.
402         
403         Each yielded value will be a list of Units, in the same order as
404         the classes arg. This facilitates unpacking in iterative consumer
405         code like:
406         
407         for invoice, price in sandbox.xmulti(Invoice & Price, f):
408             deal_with(invoice)
409             deal_with(price)
410         """
411        
412         if expr and not isinstance(expr, logic.Expression):
413             expr = logic.Expression(expr)
414         if kwargs:
415             f = logic.filter(**kwargs)
416             if expr:
417                 expr += f
418             else:
419                 expr = f
420        
421         if self.arena.logflags & logflags.RECALL:
422             self.arena.log("RECALL %s %s" %
423                            (", ".join([c.__name__ for c in classes]), expr))
424        
425         stores = [self.arena.storage(cls) for cls in classes]
426         firststore = stores[0]
427         for s in stores:
428             if s is not firststore:
429                 raise ValueError(u"xmulti() does not support multiple"
430                                  u" classes in disparate stores.")
431        
432         # This is broken. If a filter expr is supplied, then the store may
433         # not return rows which our cache would, and those won't be included
434         # in the resultset. If you're using xmulti with no expr's, or
435         # in read-only scripts, it should be OK for now. But if you mutate
436         # Units and then call multirecall, expect inconsistent results.
437         for unitset in firststore.multirecall(classes, expr):
438             confirmed = True
439             for index in xrange(len(unitset)):
440                 unit = unitset[index]
441                 id = unit.identity()
442                 cache = self._cache(unit.__class__)
443                 if id in cache:
444                     # Keep the unit which is in our cache!
445                     unitset[index] = cache[id]
446                 else:
447                     cache[id] = unit
448                     unit.sandbox = self
449                     if hasattr(unit, 'on_recall'):
450                         try:
451                             unit.on_recall()
452                         except errors.UnrecallableError:
453                             confirmed = False
454                             break
455             if confirmed:
456                 yield unitset
457    
458     def unit(self, cls, expr=None, inherit=False, **kwargs):
459         """A single matching Unit, else None.
460         
461         **kwargs will be combined into an Expression via logic.filter.
462             The first Unit matching that expression is returned; if no
463             Units match, None is returned.
464         """
465         try:
466             return self.xrecall(cls, expr, inherit, **kwargs).next()
467         except StopIteration:
468             return None
469    
470     def view(self, cls, attrs, expr=None, **kwargs):
471         """Yield tuples of attrs for the given cls which match the expr.
472         
473         cls: The Unit subclass for which to yield property tuples.
474         attrs: a sequence of strings; each should be the name of
475             a UnitProperty on the given cls.
476         expr: a lambda or logic.Expression. If provided, data will only
477             be yielded for units of the given cls which match the expr.
478         **kwargs: additional expr filters in name=value format.
479         
480         Each yielded value will be a list of values, in the same order as
481         the attrs arg. This facilitates unpacking in iterative consumer
482         code like:
483         
484         for id, name in sandbox.view(Invoice, ['ID', 'Name'], f):
485             print id, ": ", name
486         
487         This is generally much faster than recall, and should be preferred
488         for performance-sensitive code.
489         """
490         if expr and not isinstance(expr, logic.Expression):
491             expr = logic.Expression(expr)
492         if kwargs:
493             f = logic.filter(**kwargs)
494             if expr:
495                 expr += f
496             else:
497                 expr = f
498        
499         if self.arena.logflags & logflags.VIEW:
500             self.arena.log("VIEW %s [%s]: %s" % (cls.__name__, attrs, expr))
501        
502         cache = self._cache(cls)
503        
504         for unit in cache.itervalues():
505             if expr is None or expr(unit):
506                 yield tuple([getattr(unit, attr) for attr in attrs])
507        
508         # Add the identity attribute(s) if not present. This is necessary
509         # to avoid duplicating objects which are already in our cache.
510         fields = list(attrs)
511         indices = []
512         added_fields = 0
513         for key in cls.identifiers:
514             if key not in fields:
515                 added_fields += 1
516                 fields.append(key)
517             indices.append(fields.index(key))
518        
519         for row in self.arena.storage(cls).view(cls, fields, expr):
520             id = tuple([row[x] for x in indices])
521             if id not in cache:
522                 if added_fields:
523                     # Remove the added identifier columns from the row.
524                     row = row[:-added_fields]
525                 yield row
526    
527     def sum(self, cls, attr, expr=None, **kwargs):
528         """Sum of all non-None values for the given cls.attr."""
529         expr = logic.Expression(lambda x: getattr(x, attr) != None) + expr
530         return sum([row[0] for row in self.view(cls, (attr,), expr, **kwargs)])
531    
532     def distinct(self, cls, attrs, expr=None, **kwargs):
533         """List of distinct Property tuples.
534         
535         If only one attribute is specified, a list of values will be returned.
536         If more than one attribute is specified, a zipped list will be returned.
537         
538         Notice that you can also use this function as a count() function
539         (in fact it's the only way to do it) by using attrs = ['ID'].
540         """
541         if expr and not isinstance(expr, logic.Expression):
542             expr = logic.Expression(expr)
543         if kwargs:
544             f = logic.filter(**kwargs)
545             if expr:
546                 expr += f
547             else:
548                 expr = f
549        
550         if self.arena.logflags & logflags.VIEW:
551             self.arena.log("DISTINCT %s [%s]: %s" % (cls.__name__, attrs, expr))
552        
553         seen = {}
554         cache = self._cache(cls)
555         for unit in cache.itervalues():
556             if expr is None or expr(unit):
557                 row = tuple([getattr(unit, attr) for attr in attrs])
558                 if row not in seen:
559                     seen[row] = None
560        
561         for row in self.arena.storage(cls).distinct(cls, attrs, expr):
562             if row not in seen:
563                 seen[row] = None
564        
565         seen = seen.keys()
566         seen.sort()
567         if len(attrs) == 1:
568             seen = [x[0] for x in seen]
569         return seen
570    
571     def count(self, cls, expr):
572         """Number of Units of class 'cls'."""
573         return len(self.distinct(cls, cls.identifiers, expr))
574    
575     def range(self, cls, attr, expr=None, **kwargs):
576         """Distinct, non-None attr values (ordered and continuous, if possible).
577         
578         If the given attribute is a known discrete, ordered type
579         (like int, long, datetime.date), this returns the closed interval:
580             [min(attr), ..., max(attr)]
581         That is, all possible values will be output between min and max,
582         even if they do not appear in the dataset.
583         """
584         existing = [x for x in self.distinct(cls, [attr], expr, **kwargs)
585                     if x is not None]
586         if not existing:
587             return []
588        
589         attr_type = getattr(cls, attr).type
590         if issubclass(attr_type, (int, long)):
591             return range(min(existing), max(existing) + 1)
592         else:
593             try:
594                 import datetime
595             except ImportError:
596                 pass
597             else:
598                 if issubclass(attr_type, datetime.date):
599                     def date_gen():
600                         start, end = min(existing), max(existing)
601                         for d in range((end + 1) - start):
602                             yield start + datetime.timedelta(d)
603                     return date_gen()
604        
605         try:
606             existing.sort()
607         except TypeError:
608             pass
609        
610         return existing
611    
612     #                           Cache Management                           #
613    
614     def _cache(self, cls):
615         """Return the cache for the specified class.
616         
617         This base class creates a new cache for each cls per request.
618         """
619         if cls not in self._caches:
620             self._caches[cls] = {}
621         return self._caches[cls]
622    
623     def purge(self, cls):
624         """Drop all cached Units of class 'cls'. Do not save."""
625         del self._caches[cls]
626    
627     def repress(self, unit):
628         """Remove unit from cache (but don't destroy)."""
629         cls = unit.__class__
630         id = unit.identity()
631         if self.arena.logflags & logflags.REPRESS:
632             self.arena.log("REPRESS %s: %s" % (cls.__name__, id))
633        
634         if hasattr(unit, "on_repress"):
635             unit.on_repress()
636        
637         # Save after on_repress in case on_repress modified the unit.
638         self.arena.storage(cls).save(unit)
639        
640         del self._cache(cls)[id]
641         unit.sandbox = None
642    
643     def flush_all(self):
644         """Repress all units."""
645        
646         for cls in self._caches.keys():
647             # Call all on_repress methods first! There are truly horrible
648             # interdependency chains in most on_repress methods, and
649             # it's best to resolve them all at once BEFORE flushing
650             # any units from the cache.
651             # Note we use values instead of itervalues, since the
652             # cache may change size during iteration.
653             for unit in self._cache(cls).values():
654                 if hasattr(unit, "on_repress"):
655                     unit.on_repress()
656        
657         seen_stores = {}
658         for cls in self._caches.keys():
659             cache = self._cache(cls)
660             store = self.arena.storage(cls)
661             seen_stores[store] = None
662             while cache:
663                 unitid, unit = cache.popitem()
664                 if self.arena.logflags & logflags.REPRESS:
665                     self.arena.log("REPRESS %s: %s" % (cls.__name__, unitid))
666                 store.save(unit)
667        
668         for store in seen_stores:
669             try:
670                 commit = store.db.commit
671             except AttributeError:
672                 pass
673             else:
674                 commit()
675    
676     def rollback(self):
677         """Ignore all changes and purge our cache."""
678         seen_stores = {}
679         for cls in self._caches.keys():
680             self.purge(cls)
681             seen_stores[self.arena.storage(cls)] = None
682        
683         for store in seen_stores:
684             try:
685                 rollback = store.db.rollback
686             except AttributeError:
687                 pass
688             else:
689                 rollback()
690    
691     #                          Context Management                          #
692    
693     def __enter__(self):
694         return self
695    
696     def __exit__ (self, type, value, tb):
697         if tb is None:
698             self.flush_all()
699         else:
700             self.rollback()
701
Note: See TracBrowser for help on using the browser.