Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/arenas.py

Revision 309 (checked in by fumanchu, 7 years ago)

Further (final?) fix for #4 (transaction support). Implicit *sandbox* flush/rollback now available in Python 2.5+ using "with Sandbox(arena) as box:"; each sandbox is its own context manager.

  • Property svn:eol-style set to native
Line 
1
2 import ConfigParser
3 import threading
4 from types import ClassType
5
6 from dejavu.containers import Graph
7 from dejavu import logic, errors, xray
8
9 __all__ = ['Arena', 'Sandbox', 'logflags',
10            ]
11
12
13 class Enum(object):
14     pass
15
16 # logging flags (see Arena.logflags)
17 logflags = Enum()
18 logflags.ERROR = 1
19 logflags.IO = 2
20 logflags.SQL = 4
21
22 logflags.MEMORIZE = 128
23 logflags.RECALL = 256
24 logflags.VIEW = 512
25 logflags.REPRESS = 1024
26 logflags.FORGET = 2048
27 logflags.SANDBOX = (logflags.MEMORIZE | logflags.RECALL | logflags.VIEW |
28                     logflags.REPRESS | logflags.FORGET)
29
30
31 class Arena(object):
32     """A namespace/workspace for a Dejavu application."""
33    
34     def __init__(self):
35         self._sync_lock = threading.Lock()
36         self.stores = {}
37         self._registered_classes = {}
38         self.associations = Graph(directed=False)
39         self.engine_functions = {}
40         self.logflags = logflags.ERROR + logflags.IO
41    
42     def log(self, message, flag):
43         """Default logger (writes to stdout). Feel free to replace."""
44         if flag & self.logflags:
45             if isinstance(message, unicode):
46                 print message.encode('utf8')
47             else:
48                 print message
49    
50     def load(self, configFileName):
51         """Load StorageManagers."""
52         parser = ConfigParser.ConfigParser()
53         # Make names case-sensitive by overriding optionxform.
54         parser.optionxform = unicode
55         parser.read(configFileName)
56        
57         stores = []
58         for section in parser.sections():
59             opts = dict(parser.items(section))
60             stores.append((int(opts.get("Load Order", "0")), section, opts))
61         stores.sort()
62        
63         for order, name, options in stores:
64             self.add_store(name, options[u'Class'], options)
65    
66     def add_store(self, name, store, options=None):
67         """Register a StorageManager.
68         
69         The 'store' argument may be a StorageManager class, an instance of
70         that class, or the full importable dotted-package name of the class.
71         """
72        
73         if isinstance(store, basestring):
74             store = xray.classes(store)(self, options or {})
75         elif isinstance(store, (type, ClassType)):
76             store = store(self, options or {})
77        
78         self.stores[name] = store
79         return store
80    
81     def remove_store(self, name):
82         if name in self.stores:
83             store = self.stores[name]
84            
85             # Disassociate all registered classes with this store.
86             for c in self._registered_classes.keys():
87                 if self._registered_classes[c] is store:
88                     self._registered_classes[c] = None
89            
90             del self.stores[name]
91    
92     def shutdown(self):
93         """Shutdown the arena."""
94         # Tell all stores to shut down.
95         stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()]
96         stores.sort()
97         for order, store, name in stores:
98             store.shutdown()
99    
100     def new_sandbox(self):
101         """Return a new sandbox object in this Arena."""
102         return Sandbox(self)
103    
104    
105     # --------------------- Unit Class Registration --------------------- #
106    
107     def register(self, cls):
108         """Assert that Units of class 'cls' will be handled."""
109         # We must allow modules to register classes before any stores have
110         # been added, but not overwrite a store which has already been found.
111         if cls not in self._registered_classes:
112             self._registered_classes[cls] = None
113            
114             # Register any association(s) in an undirected graph.
115             for ua in cls._associations.itervalues():
116                 if getattr(ua, "register", True):
117                     self.associations.connect(cls, ua.farClass)
118    
119     def register_all(self, globals):
120         import dejavu
121         for obj in globals.itervalues():
122             if isinstance(obj, type) and issubclass(obj, dejavu.Unit):
123                 self.register(obj)
124    
125     def class_by_name(self, classname):
126         for cls in self._registered_classes:
127             if cls.__name__ == classname:
128                 return cls
129         raise KeyError("No registered class found for '%s'." % classname)
130    
131     def storage(self, cls):
132         """Return the StorageManager which handles Units of the given class."""
133         store = self._registered_classes.get(cls)
134         if store:
135             return store
136        
137         self._sync_lock.acquire()
138         try:
139             # Search all stores for the class name.
140             default_store = None
141             clsname = cls.__name__
142             for store in self.stores.values():
143                 if not store.classnames:
144                     # This store has no "classnames" list, which signals that it
145                     # handles all classes which are not handled by other stores.
146                     default_store = store
147                 elif clsname in store.classnames:
148                     store.sync([cls])
149                     self._registered_classes[cls] = store
150                     return store
151            
152             # Name not found in any store's classnames. Try a default store.
153             if default_store:
154                 default_store.sync([cls])
155                 self._registered_classes[cls] = default_store
156                 return default_store
157         finally:
158             self._sync_lock.release()
159        
160         raise KeyError("No store found for '%s'." % clsname)
161    
162     def create_storage(self, cls):
163         """Create storage space for cls."""
164         self.storage(cls).create_storage(cls)
165    
166     def has_storage(self, cls):
167         return self.storage(cls).has_storage(cls)
168    
169     def drop_storage(self, cls):
170         self.storage(cls).drop_storage(cls)
171    
172     def add_property(self, cls, name):
173         self.storage(cls).add_property(cls, name)
174    
175     def drop_property(self, cls, name):
176         self.storage(cls).drop_property(cls, name)
177    
178     def rename_property(self, cls, oldname, newname):
179         self.storage(cls).rename_property(cls, oldname, newname)
180    
181     def migrate_class(self, cls, new_store):
182         """Copy all units of cls to new_store."""
183         new_store.create_storage(cls)
184         for unit in self.new_sandbox().xrecall(cls):
185             new_store.reserve(unit)
186             new_store.save(unit, True)
187    
188     def migrate(self, new_store, old_store=None, copy_only=False):
189         """Copy all units (of old_store) to new_store."""
190         for cls in self._registered_classes:
191             store = self.storage(cls)
192             if old_store is None or old_store is store:
193                 self.migrate_class(cls, new_store)
194                 if not copy_only:
195                     self._registered_classes[cls] = new_store
196
197
198 ###########################################################################
199 ##                                                                       ##
200 ##                              Sandboxes                                ##
201 ##                                                                       ##
202 ###########################################################################
203
204
205 class Sandbox(object):
206     """Data sandbox for Dejavu arenas.
207     
208     Each consumer (that is, each UI process or thread) maintains a Sandbox
209     for managing Units. Sandboxes populate themselves with Units on a lazy
210     basis, allowing UI code to request data as it's needed. However, once
211     obtained, such Units are persisted (usually for the lifetime of the
212     thread); this important detail means that multiple requests for the
213     same Units result in multiple references to the same objects, rather
214     than multiple objects. Sandboxes are basically what Fowler calls
215     Identity Maps.
216     
217     The *REALLY* important thing to understand if you're customizing this
218     is that Sandboxes won't survive sharing across threads--DON'T TRY IT.
219     If you need to share unit data across requests, use or make an SM which
220     persists the data, and chain it with another, more normal SM.
221     
222     _cache(), _caches, and _stores are private for a reason--don't access
223     them from interface code--tell the Sandbox to do it for you.
224     
225     Starting with Python 2.5, each Sandbox instance is its own context
226     manager, so you can have boxes automatically flush themselves
227     when you're done, and automatically rollback on error. Example:
228     
229     # __future__ only needed for Python 2.5, not 2.6+
230     from __future__ import with_statement
231     
232     with arena.new_sandbox() as box:
233         WAP = box.unit(Zoo, Name='Wild Animal Park')
234         WAP.Opens = now
235     """
236    
237     def __init__(self, arena):
238         self.arena = arena
239         self._caches = {}
240    
241     def __getattr__(self, key):
242         # Support "magic recaller" methods on self.
243         for cls in self.arena._registered_classes.iterkeys():
244             name = cls.__name__
245             if name == key:
246                 def recaller(*args, **kwargs):
247                     # Allow identifiers to be supplied as args or kwargs
248                     # (since the common case will be a single identifier).
249                     for arg, key in zip(args, cls.identifiers):
250                         kwargs[str(key)] = arg
251                     expr = logic.filter(**kwargs)
252                     try:
253                         return self.xrecall(cls, expr).next()
254                     except StopIteration:
255                         return None
256                 recaller.__doc__ = "A single %s Unit, else None." % name
257                 return recaller
258         raise AttributeError("Sandbox object has no attribute '%s'" % key)
259    
260     def memorize(self, unit):
261         """Persist unit in storage."""
262         cls = unit.__class__
263         unit.sandbox = self
264        
265         # Ask the store to accept the unit, assigning it primary key values
266         # if necessary. The store should also call unit.cleanse() if it
267         # saves the whole unit state on this call.
268         self.arena.storage(cls).reserve(unit)
269        
270         # Insert the unit into the cache.
271         id = unit.identity()
272         self._cache(cls)[id] = unit
273         self.arena.log("MEMORIZE %s: %s" % (cls.__name__, id), logflags.MEMORIZE)
274        
275         # Do this at the end of the func, since most on_memorize
276         # will want to have an identity when called.
277         if hasattr(unit, "on_memorize"):
278             unit.on_memorize()
279    
280     def forget(self, unit):
281         """Destroy unit, both in the cache and storage."""
282         cls = unit.__class__
283        
284         id = unit.identity()
285         self.arena.log("FORGET %s: %s" % (cls.__name__, id), logflags.FORGET)
286         self.arena.storage(cls).destroy(unit)
287        
288         del self._cache(cls)[id]
289        
290         # This must be done after the destroy() call, so that a
291         # related unit can poll all instances of this class.
292         if hasattr(unit, "on_forget"):
293             unit.on_forget()
294        
295         unit.sandbox = None
296    
297     def xrecall(self, classes, expr=None, **kwargs):
298         """Iterator over units of cls which match expr."""
299         if classes.__class__.__name__ == "UnitJoin":
300             for unitrow in self.xmulti(classes, expr, **kwargs):
301                 yield unitrow
302             return
303        
304         cls = classes
305        
306         if expr and not isinstance(expr, logic.Expression):
307             expr = logic.Expression(expr)
308         if kwargs:
309             f = logic.filter(**kwargs)
310             if expr:
311                 expr += f
312             else:
313                 expr = f
314        
315         self.arena.log("RECALL %s: %s" % (cls.__name__, expr), logflags.RECALL)
316        
317         # Collect all registered subclasses of cls.
318         # Note that cls is a subclass of itself.
319         classes = [c for c in self.arena._registered_classes.iterkeys()
320                    if issubclass(c, cls)]
321         if not classes:
322             # Even the requested class is not registered.
323             raise errors.UnrecallableError("The '%s' class is not registered."
324                                            % cls.__name__)
325        
326         for cls in classes:
327             finished = False
328             cache = self._cache(cls)
329            
330             # Special-case the scenario where one Unit is expected
331             # and called by ID. We should be able to save a database hit.
332             if expr:
333                 fc = expr.func.func_code
334                 if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S'
335                     and fc.co_names[-1] == 'ID'):
336                     ID = fc.co_consts[-1]
337                     unit = cache.get((ID,))
338                     if unit is not None:
339                         # Do NOT call on_recall here. That should be called
340                         # only at the Sandbox-SM boundary.
341                         yield unit
342                         finished = True
343            
344             if not finished:
345                 # Query the cache. We have to use a static copy of the
346                 # keys, to ensure that our cache doesn't change size
347                 # during iteration (due to overlapping xrecalls).
348                 keys = cache.keys()
349                 for id in keys:
350                     unit = cache.get(id)
351                     if unit and ((expr is None) or expr.evaluate(unit)):
352                         # Do NOT call on_recall here. That should be called
353                         # only at the Sandbox-SM boundary.
354                         yield unit
355                
356                 # Query Storage.
357                 for unit in self.arena.storage(cls).recall(cls, expr):
358                     id = unit.identity()
359                     # Don't offer up a unit that was already checked in our cache
360                     # (whether it matched the expr() or not--we assume the cache
361                     # has the freshest data).
362                     # A list is 2% to 4% faster than a dict, by the way (in Py 2.3).
363                     if id not in keys:
364                         # Very important that we check for existing unit, as its
365                         # state may have changed in memory but not in storage.
366                         # Make sure the cache lookup and get happens atomically.
367                         existing = cache.get(id)
368                         if existing:
369                             yield existing
370                         else:
371                             unit.sandbox = self
372                             confirmed = True
373                             cache[id] = unit
374                             if hasattr(unit, 'on_recall'):
375                                 try:
376                                     unit.on_recall()
377                                 except errors.UnrecallableError:
378                                     confirmed = False
379                             if confirmed:
380                                 yield unit
381    
382     def recall(self, classes, expr=None, **kwargs):
383         """List of units of class 'cls' which match expr."""
384         return [x for x in self.xrecall(classes, expr, **kwargs)]
385    
386     def xmulti(self, classes, expr=None, **kwargs):
387         """Recall units of each cls if they together match the expr.
388         
389         Each yielded value will be a list of Units, in the same order as
390         the classes arg. This facilitates unpacking in iterative consumer
391         code like:
392         
393         for invoice, price in sandbox.xmulti(Invoice & Price, f):
394             deal_with(invoice)
395             deal_with(price)
396         """
397        
398         if expr and not isinstance(expr, logic.Expression):
399             expr = logic.Expression(expr)
400         if kwargs:
401             f = logic.filter(**kwargs)
402             if expr:
403                 expr += f
404             else:
405                 expr = f
406        
407         self.arena.log("RECALL %s %s" %
408                        (", ".join([c.__name__ for c in classes]), expr),
409                        logflags.RECALL)
410        
411         stores = [self.arena.storage(cls) for cls in classes]
412         firststore = stores[0]
413         for s in stores:
414             if s is not firststore:
415                 raise ValueError(u"xmulti() does not support multiple"
416                                  u" classes in disparate stores.")
417        
418         # This is broken. If a filter expr is supplied, then the store may
419         # not return rows which our cache would, and those won't be included
420         # in the resultset. If you're using xmulti with no expr's, or
421         # in read-only scripts, it should be OK for now. But if you mutate
422         # Units and then call multirecall, expect inconsistent results.
423         for unitset in firststore.multirecall(classes, expr):
424             confirmed = True
425             for index in xrange(len(unitset)):
426                 unit = unitset[index]
427                 id = unit.identity()
428                 cache = self._cache(unit.__class__)
429                 if id in cache:
430                     # Keep the unit which is in our cache!
431                     unitset[index] = cache[id]
432                 else:
433                     cache[id] = unit
434                     unit.sandbox = self
435                     if hasattr(unit, 'on_recall'):
436                         try:
437                             unit.on_recall()
438                         except errors.UnrecallableError:
439                             confirmed = False
440                             break
441             if confirmed:
442                 yield unitset
443    
444     def unit(self, cls, expr=None, **kwargs):
445         """A single matching Unit, else None.
446         
447         **kwargs will be combined into an Expression via logic.filter.
448             The first Unit matching that expression is returned; if no
449             Units match, None is returned.
450         """
451         try:
452             return self.xrecall(cls, expr, **kwargs).next()
453         except StopIteration:
454             return None
455    
456     def view(self, cls, attrs, expr=None, **kwargs):
457         """Iterator of all Property tuples."""
458         if expr and not isinstance(expr, logic.Expression):
459             expr = logic.Expression(expr)
460         if kwargs:
461             f = logic.filter(**kwargs)
462             if expr:
463                 expr += f
464             else:
465                 expr = f
466        
467         self.arena.log("VIEW %s [%s]: %s" % (cls.__name__, attrs, expr), logflags.VIEW)
468        
469         cache = self._cache(cls)
470        
471         for unit in cache.itervalues():
472             if expr is None or expr(unit):
473                 yield tuple([getattr(unit, attr) for attr in attrs])
474        
475         # Add the identity attribute(s) if not present. This is necessary
476         # to avoid duplicating objects which are already in our cache.
477         fields = list(attrs)
478         indices = []
479         added_fields = 0
480         for key in cls.identifiers:
481             if key not in fields:
482                 added_fields += 1
483                 fields.append(key)
484             indices.append(fields.index(key))
485        
486         for row in self.arena.storage(cls).view(cls, fields, expr):
487             id = tuple([row[x] for x in indices])
488             if id not in cache:
489                 if added_fields:
490                     # Remove the added identifier columns from the row.
491                     row = row[:len(row) - added_fields]
492                 yield row
493    
494     def distinct(self, cls, attrs, expr=None, **kwargs):
495         """List of distinct Property tuples.
496         
497         If only one attribute is specified, a list of values will be returned.
498         If more than one attribute is specified, a zipped list will be returned.
499         
500         Notice that you can also use this function as a count() function
501         (in fact it's the only way to do it) by using attrs = ['ID'].
502         """
503         if expr and not isinstance(expr, logic.Expression):
504             expr = logic.Expression(expr)
505         if kwargs:
506             f = logic.filter(**kwargs)
507             if expr:
508                 expr += f
509             else:
510                 expr = f
511        
512         self.arena.log("DISTINCT %s [%s]: %s" % (cls.__name__, attrs, expr), logflags.VIEW)
513        
514         seen = {}
515         cache = self._cache(cls)
516         for unit in cache.itervalues():
517             if expr is None or expr(unit):
518                 row = tuple([getattr(unit, attr) for attr in attrs])
519                 if row not in seen:
520                     seen[row] = None
521        
522         for row in self.arena.storage(cls).distinct(cls, attrs, expr):
523             if row not in seen:
524                 seen[row] = None
525        
526         seen = seen.keys()
527         seen.sort()
528         if len(attrs) == 1:
529             seen = [x[0] for x in seen]
530         return seen
531    
532     def count(self, cls, expr):
533         """Number of Units of class 'cls'."""
534         return len(self.distinct(cls, cls.identifiers, expr))
535    
536     def range(self, cls, attr, expr=None, **kwargs):
537         """Distinct, non-None attr values (ordered and continuous, if possible).
538         
539         If the given attribute is a known discrete, ordered type
540         (like int, long, datetime.date), this returns the closed interval:
541             [min(attr), ..., max(attr)]
542         That is, all possible values will be output between min and max,
543         even if they do not appear in the dataset.
544         """
545         existing = [x for x in self.distinct(cls, [attr], expr, **kwargs)
546                     if x is not None]
547         if not existing:
548             return []
549        
550         attr_type = getattr(cls, attr).type
551         if issubclass(attr_type, (int, long)):
552             return range(min(existing), max(existing) + 1)
553         else:
554             try:
555                 import datetime
556             except ImportError:
557                 pass
558             else:
559                 if issubclass(attr_type, datetime.date):
560                     def date_gen():
561                         start, end = min(existing), max(existing)
562                         for d in range((end + 1) - start):
563                             yield start + datetime.timedelta(d)
564                     return date_gen()
565        
566         try:
567             existing.sort()
568         except TypeError:
569             pass
570        
571         return existing
572    
573     #                           Cache Management                           #
574    
575     def _cache(self, cls):
576         """Return the cache for the specified class.
577         
578         This base class creates a new cache for each cls per request.
579         """
580         if cls not in self._caches:
581             self._caches[cls] = {}
582         return self._caches[cls]
583    
584     def purge(self, cls):
585         """Drop all cached Units of class 'cls'. Do not save."""
586         del self._caches[cls]
587    
588     def repress(self, unit):
589         """Remove unit from cache (but don't destroy)."""
590         cls = unit.__class__
591         id = unit.identity()
592         self.arena.log("REPRESS %s: %s" % (cls.__name__, id), logflags.REPRESS)
593        
594         if hasattr(unit, "on_repress"):
595             unit.on_repress()
596        
597         # Save after on_repress in case on_repress modified the unit.
598         self.arena.storage(cls).save(unit)
599        
600         del self._cache(cls)[id]
601         unit.sandbox = None
602    
603     def flush_all(self):
604         """Repress all units."""
605        
606         for cls in self._caches.keys():
607             # Call all on_repress methods first! There are truly horrible
608             # interdependency chains in most on_repress methods, and
609             # it's best to resolve them all at once BEFORE flushing
610             # any units from the cache.
611             # Note we use values instead of itervalues, since the
612             # cache may change size during iteration.
613             for unit in self._cache(cls).values():
614                 if hasattr(unit, "on_repress"):
615                     unit.on_repress()
616        
617         seen_stores = {}
618         for cls in self._caches.keys():
619             cache = self._cache(cls)
620             store = self.arena.storage(cls)
621             seen_stores[store] = None
622             while cache:
623                 unitid, unit = cache.popitem()
624                 self.arena.log("REPRESS %s: %s" % (cls.__name__, unitid), logflags.REPRESS)
625                 store.save(unit)
626        
627         for store in seen_stores:
628             try:
629                 commit = store.db.commit
630             except AttributeError:
631                 pass
632             else:
633                 commit()
634    
635     def rollback(self):
636         """Ignore all changes and purge our cache."""
637         seen_stores = {}
638         for cls in self._caches.keys():
639             self.purge(cls)
640             seen_stores[self.arena.storage(cls)] = None
641        
642         for store in seen_stores:
643             try:
644                 rollback = store.db.rollback
645             except AttributeError:
646                 pass
647             else:
648                 rollback()
649    
650     #                          Context Management                          #
651    
652     def __enter__(self):
653         return self
654    
655     def __exit__ (self, type, value, tb):
656         if tb is None:
657             self.flush_all()
658         else:
659             self.rollback()
660
Note: See TracBrowser for help on using the browser.