Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/arenas.py

Revision 110 (checked in by fumanchu, 8 years ago)

multirecall changes (BACKWARD INCOMPATIBLE):

  1. The signature has changed; multirecall now takes two arguments, a 'classes' tuple or list, and an 'expr'. The expr will have multiple *args, one for each class in 'classes'. [Hopefully, this will allow OUTER JOINs in the near future!]
  2. The behavior has changed; the pairs used to each be related to the first class; now, each class is related (INNER JOIN'ed) to the class before it in the 'classes' argument.
  3. db.SQLDecompiler has changed to handling multiple tables (*args) for a single Expression.
  • Property svn:eol-style set to native
Line 
1
2 import ConfigParser
3
4 from containers import Graph as _Graph
5 import logic as _logic
6 import errors as _errors
7
8
9 # logging flags (see Arena.logflags)
10 LOGSQL = 4
11 LOGCONN = 8
12
13 LOGMEMORIZE = 128
14 LOGRECALL = 256
15 LOGVIEW = 512
16 LOGREPRESS = 1024
17 LOGFORGET = 2048
18 LOGSANDBOX = LOGMEMORIZE | LOGRECALL | LOGVIEW | LOGREPRESS | LOGFORGET
19
20
21 class Arena(object):
22     """Arena(). A namespace/workspace for a Dejavu application."""
23    
24     def __init__(self):
25         self.defaultStore = None
26         self.stores = {}
27         self._registered_classes = {}
28         self.associations = _Graph(directed=False)
29         self.engine_functions = {}
30         self.logflags = 0
31    
32     def log(self, message, flag):
33         """Default logger (writes to stdout). Feel free to replace."""
34         if flag & self.logflags:
35             print message
36    
37     def load(self, configFileName):
38         """Load StorageManagers."""
39         parser = ConfigParser.ConfigParser()
40         # Make names case-sensitive by overriding optionxform.
41         parser.optionxform = unicode
42         parser.read(configFileName)
43        
44         stores = []
45         for section in parser.sections():
46             opts = dict(parser.items(section))
47             stores.append((int(opts.get("Load Order", "0")), section, opts))
48         stores.sort()
49        
50         for order, name, options in stores:
51             self.add_store(name, options[u'Class'], options)
52    
53     def add_store(self, name, store, options=None):
54         """add_store(name, store, options=None). Register a StorageManager.
55         
56         The 'store' argument may be the name of a Storage Manager class;
57         if so, it must be importable (that is, it must have the full dotted
58         package name).
59         """
60        
61         if isinstance(store, basestring):
62             import xray
63             store = xray.classes(store)(name, self, options or {})
64        
65         self.stores[name] = store
66         if not store.classnames:
67             # This store has no "classnames" list, which signals that it
68             # handles all classes which are not handled by other stores.
69             self.defaultStore = store
70         return store
71    
72     def remove_store(self, name):
73         if name in self.stores:
74             store = self.stores[name]
75            
76             # Disassociate all registered classes with this store.
77             for c in self._registered_classes.keys():
78                 if self._registered_classes[c] is store:
79                     self._registered_classes[c] = None
80            
81             del self.stores[name]
82    
83     def shutdown(self):
84         """Shutdown the arena."""
85         # Tell all stores to shut down.
86         stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()]
87         stores.sort()
88         for order, store, name in stores:
89             store.shutdown()
90    
91     def new_sandbox(self):
92         return Sandbox(self)
93    
94     ###########################################
95     ##        Unit Class Registration        ##
96     ###########################################
97    
98     def register(self, cls):
99         """register(cls) -> Assert that Units of class 'cls' will be handled."""
100         # We must allow modules to register classes before any stores have
101         # been added, but not overwrite a store which has already been found.
102         if cls not in self._registered_classes:
103             self._registered_classes[cls] = None
104        
105         # Register any association(s) in an undirected graph.
106         for ua in cls._associations.itervalues():
107             self.associations.connect(cls, ua.farClass)
108    
109     def register_all(self, globals):
110         import dejavu
111         for obj in globals.itervalues():
112             if isinstance(obj, type) and issubclass(obj, dejavu.Unit):
113                 self.register(obj)
114    
115     def class_by_name(self, classname):
116         for cls in self._registered_classes:
117             if cls.__name__ == classname:
118                 return cls
119         raise KeyError("No registered class found for '%s'." % classname)
120    
121     def storage(self, cls):
122         """Return the StorageManager which handles Units of the given class."""
123         found = self._registered_classes.get(cls)
124        
125         if found:
126             return found
127        
128         # Search all stores for the class name.
129         clsname = cls.__name__
130         for store in self.stores.itervalues():
131             if clsname in store.classnames:
132                 found = store
133                 break
134         found = found or self.defaultStore
135         if found is None:
136             raise KeyError("No store found for '%s' and no "
137                            "default store." % clsname)
138        
139         self._registered_classes[cls] = found
140         return found
141    
142     def create_storage(self, cls):
143         """create_storage(cls). Create storage space for cls."""
144         self.storage(cls).create_storage(cls)
145    
146     def migrate_class(self, cls, new_store):
147         """migrate_class(cls, new_store). Copy all units of cls to new_store."""
148         new_store.create_storage(cls)
149         for unit in self.new_sandbox().xrecall(cls):
150             new_store.reserve(unit)
151             new_store.save(unit, True)
152    
153     def migrate(self, new_store, old_store=None, copy_only=False):
154         """migrate(new_store, old_store=None). Copy all units (of old_store) to new_store."""
155         for cls in self._registered_classes:
156             store = self.storage(cls)
157             if old_store is None or old_store is store:
158                 self.migrate_class(cls, new_store)
159                 if not copy_only:
160                     self._registered_classes[cls] = new_store
161
162
163 ###########################################################################
164 ##                                                                       ##
165 ##                              Sandboxes                                ##
166 ##                                                                       ##
167 ###########################################################################
168
169
170 class Sandbox(object):
171     """Sandbox(arena). Data sandbox for Dejavu arenas.
172     
173     Each consumer (that is, each UI process) maintains a Sandbox for
174     managing Units. Sandboxes populate themselves with Units on a lazy
175     basis, allowing UI code to request data as it's needed. However, once
176     obtained, such Units are persisted (usually for the lifetime of the
177     thread); this important detail means that multiple requests for the
178     same Units result in multiple references to the same objects, rather
179     than multiple objects. Sandboxes are basically what Fowler calls
180     Identity Maps.
181     
182     The *REALLY* important thing to understand if you're customizing this
183     is that Sandboxes won't survive sharing across threads--DON'T TRY IT.
184     If you need to share unit data across requests, use or make an SM which
185     persists the data, and chain it with another, more normal SM.
186     
187     _cache(), _caches, and _stores are private for a reason--don't access
188     them from interface code--tell the Sandbox to do it for you.
189     """
190    
191     def __init__(self, arena):
192         self.arena = arena
193         self._caches = {}
194    
195     def memorize(self, unit):
196         """memorize(unit). Persist unit in storage."""
197         cls = unit.__class__
198         unit.sandbox = self
199        
200         # Ask the store to accept the unit, assigning it an ID if
201         # necessary. The store should also call unit.cleanse()
202         # if it saves the whole unit state on this call.
203         self.arena.storage(cls).reserve(unit)
204        
205         # Insert the unit into the cache.
206         self._cache(cls)[unit.ID] = unit
207         self.arena.log("MEMORIZE %s: %s" % (cls.__name__, unit.ID), LOGMEMORIZE)
208        
209         # Do this at the end of the func, since most on_memorize
210         # will want to have an ID when called.
211         if hasattr(unit, "on_memorize"):
212             unit.on_memorize()
213    
214     def forget(self, unit):
215         """Destroy unit, both in the cache and storage."""
216         cls = unit.__class__
217        
218         self.arena.log("FORGET %s: %s" % (cls.__name__, unit.ID), LOGFORGET)
219         self.arena.storage(cls).destroy(unit)
220        
221         del self._cache(cls)[unit.ID]
222        
223         if hasattr(unit, "on_forget"):
224             unit.on_forget()
225        
226         unit.sandbox = None
227    
228     def xrecall(self, cls, expr=None):
229         """Iterator over units of cls which match expr."""
230        
231         self.arena.log("RECALL %s: %s" % (cls.__name__, expr), LOGRECALL)
232        
233         cache = self._cache(cls)
234        
235         # Special-case the scenario where one Unit is expected and called
236         # by ID. We should be able to save a database hit.
237         if expr:
238             fc = expr.func.func_code
239             if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S' and
240                 fc.co_names[-1] == 'ID'):
241                 ID = fc.co_consts[-1]
242                 unit = cache.get(ID)
243                 if unit is not None:
244                     # Do NOT call on_recall here. That should be called
245                     # only at the Sandbox-SM boundary.
246                     yield unit
247                     raise StopIteration
248        
249         # Query Cache and Storage.
250         for unit in self.arena.storage(cls).recall(cls, expr):
251             ID = unit.ID
252             # Very important that we check for existing unit, as its
253             # state may have changed in memory but not in storage.
254             # Make sure the cache lookup and get happens atomically.
255             existing = cache.get(ID)
256             if existing:
257                 yield existing
258             else:
259                 unit.sandbox = self
260                 confirmed = True
261                 cache[ID] = unit
262                 if hasattr(unit, 'on_recall'):
263                     try:
264                         unit.on_recall()
265                     except _errors.UnrecallableError:
266                         confirmed = False
267                 if confirmed:
268                     yield unit
269    
270     def recall(self, cls, expr=None):
271         """List of units of class 'cls' which match expr."""
272         return [x for x in self.xrecall(cls, expr)]
273    
274     def multirecall(self, classes, expr=None):
275         """multirecall(classes, expr) -> [[unit, ...], [unit, ...], ...]
276         Recall units of each cls if they together match the expr.
277         
278         Units of each additional cls pair will be recalled; however, only
279         those Units with associations to Units in the previous set will be
280         returned. For you database guys, it's a set of inner joins, each of
281         which is between each class and its direct antecedent in the list.
282         
283         Each yielded value will be a list of Units, in the same order as
284         the classes arg. This facilitates unpacking in iterative consumer
285         code like:
286         
287         for invoice, price in sandbox.multirecall([Invoice, Price], f):
288             deal_with(invoice)
289             deal_with(price)
290         """
291        
292         self.arena.log("RECALL %s %s" %
293                        (", ".join([c.__name__ for c in classes]), expr),
294                        LOGRECALL)
295        
296         store = self.arena.storage(classes[0])
297         for c in classes[1:]:
298             if self.arena.storage(c) is not store:
299                 raise ValueError(u"multirecall() does not support multiple"
300                                  u" classes in disparate stores.")
301        
302         # This is broken. If a filter expr is supplied, then the store may
303         # not return rows which our cache would, and those won't be included
304         # in the resultset. If you're using multirecall with no expr's, or
305         # in read-only scripts, it should be OK for now. But if you mutate
306         # Units and then call multirecall, expect inconsistent results.
307         for unitset in store.multirecall(classes, expr):
308             confirmed = True
309             for index in xrange(len(unitset)):
310                 unit = unitset[index]
311                 ID = unit.ID
312                 cache = self._cache(unit.__class__)
313                 if ID in cache:
314                     # Keep the unit which is in our cache!
315                     unitset[index] = cache[ID]
316                 else:
317                     cache[ID] = unit
318                     unit.sandbox = self
319                     if hasattr(unit, 'on_recall'):
320                         try:
321                             unit.on_recall()
322                         except _errors.UnrecallableError:
323                             confirmed = False
324                             break
325             if confirmed:
326                 yield unitset
327    
328     def unit(self, cls, **kwargs):
329         """unit(cls, **kwargs) -> A single matching Unit, else None.
330         
331         **kwargs will be combined into an Expression via logic.filter.
332             The first Unit matching that expression is returned; if no
333             Units match, None is returned.
334         
335         If you need a single Unit which matches a more complex
336             expression, use recall()[0] or xrecall().next().
337         """
338         expr = None
339         if kwargs:
340             expr = _logic.filter(**kwargs)
341         try:
342             return self.xrecall(cls, expr).next()
343         except StopIteration:
344             return None
345    
346     def view(self, cls, attrs, expr=None):
347         """view(cls, attrs, expr=None) -> Iterator of all Property tuples."""
348         self.arena.log("VIEW %s [%s]: %s" % (cls.__name__, attrs, expr), LOGVIEW)
349        
350         cache = self._cache(cls)
351        
352         for unit in cache.itervalues():
353             if expr is None or expr(unit):
354                 yield tuple([getattr(unit, attr) for attr in attrs])
355        
356         # Add the ID attribute if not present. This is necessary to
357         # avoid duplicating objects which are already in our cache.
358         fields = list(attrs)
359         if "ID" not in fields:
360             fields.append("ID")
361         index_of_id = fields.index("ID")
362        
363         for row in self.arena.storage(cls).view(cls, fields, expr):
364             if row[index_of_id] not in cache:
365                 if "ID" not in attrs:
366                     # Remove the ID column from the tuple.
367                     row = row[:len(row) - 1]
368                 yield row
369    
370     def distinct(self, cls, attrs, expr=None):
371         """distinct(cls, attrs, expr=None) -> List of distinct Property tuples.
372         
373         If only one attribute is specified, a list of values will be returned.
374         If more than one attribute is specified, a zipped list will be returned.
375         
376         Notice that you can also use this function as a count() function
377         (in fact it's the only way to do it) by using attrs = ['ID'].
378         """
379         self.arena.log("DISTINCT %s [%s]: %s" % (cls.__name__, attrs, expr), LOGVIEW)
380        
381         seen = {}
382         cache = self._cache(cls)
383         for unit in cache.itervalues():
384             if expr is None or expr(unit):
385                 row = tuple([getattr(unit, attr) for attr in attrs])
386                 if row not in seen:
387                     seen[row] = None
388        
389         for row in self.arena.storage(cls).distinct(cls, attrs, expr):
390             if row not in seen:
391                 seen[row] = None
392        
393         seen = seen.keys()
394         seen.sort()
395         if len(attrs) == 1:
396             seen = [x[0] for x in seen]
397         return seen
398    
399     def count(self, cls, expr):
400         """count(cls, expr) -> Number of Units of class 'cls'."""
401         return len(self.distinct(cls, ['ID'], expr))
402    
403     ####################################
404     ##        Cache Management        ##
405     ####################################
406    
407     def _cache(self, cls):
408         """cache(cls). Return the cache for the specified class.
409         
410         This base class creates a new cache for each cls per request.
411         """
412         if cls not in self._caches:
413             self._caches[cls] = {}
414         return self._caches[cls]
415    
416     def purge(self, cls):
417         """purge(cls). Drop all cached Units of class 'cls'. Do not save."""
418         del self._caches[cls]
419    
420     def repress(self, unit):
421         """repress(unit). Remove unit from cache (but don't destroy)."""
422         cls = unit.__class__
423         self.arena.log("REPRESS %s: %s" % (cls.__name__, unit.ID), LOGREPRESS)
424        
425         if hasattr(unit, "on_repress"):
426             unit.on_repress()
427        
428         # Save after on_repress in case on_repress modified the unit.
429         self.arena.storage(cls).save(unit)
430        
431         del self._cache(cls)[unit.ID]
432    
433     def flush_all(self):
434         """flush_all(). Repress all units."""
435        
436         for cls in self._caches.keys():
437             # Call all on_repress methods first! There are truly horrible
438             # interdependency chains in most on_repress methods, and
439             # it's best to resolve them all at once BEFORE flushing
440             # any units from the cache.
441             # Note we use values instead of itervalues, since the
442             # cache may change size during iteration.
443             for unit in self._cache(cls).values():
444                 if hasattr(unit, "on_repress"):
445                     unit.on_repress()
446        
447         for cls in self._caches.keys():
448             cache = self._cache(cls)
449             store = self.arena.storage(cls)
450             while cache:
451                 unitid, unit = cache.popitem()
452                 self.arena.log("REPRESS %s: %s" % (cls.__name__, unitid), LOGREPRESS)
453                 store.save(unit)
454
Note: See TracBrowser for help on using the browser.