Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/trunk/dejavu/sandboxes.py

Revision 552 (checked in by fumanchu, 1 year ago)

Buglets.

  • Property svn:eol-style set to native
Line 
1 try:
2     set
3 except NameError:
4     from sets import Set as set
5
6 import dejavu
7 from dejavu import errors
8 from geniusql import codewalk, logic
9
10
11 _simple_attr_compare = ''.join(map(chr, (
12     codewalk.opmap['LOAD_FAST'], 0, 0,
13     codewalk.opmap['LOAD_ATTR'], 1, 0,
14     codewalk.opmap['LOAD_CONST'], 1, 0,
15     codewalk.opmap['COMPARE_OP'], 2, 0,
16     codewalk.opmap['RETURN_VALUE']
17     )))
18
19
20 class Sandbox(object):
21     """Data sandbox for Dejavu.
22     
23     Each consumer (that is, each UI process or thread) maintains a Sandbox
24     for managing Units. Sandboxes populate themselves with Units on a lazy
25     basis, allowing UI code to request data as it's needed. However, once
26     obtained, such Units are persisted (usually for the lifetime of the
27     thread); this important detail means that multiple requests for the
28     same Units result in multiple references to the same objects, rather
29     than multiple objects. Sandboxes are basically what Fowler calls
30     Identity Maps.
31     
32     The *REALLY* important thing to understand if you're customizing this
33     is that Sandboxes won't survive sharing across threads--DON'T TRY IT.
34     If you need to share unit data across requests, use or make an SM which
35     persists the data, and chain it with another, more normal SM.
36     
37     _cache() and _caches are private for a reason--don't access
38     them from interface code--tell the Sandbox to do it for you.
39     
40     Starting with Python 2.5, each Sandbox instance is its own context
41     manager, so you can have boxes automatically flush themselves
42     when you're done, and automatically rollback on error. Example:
43     
44     # __future__ only needed for Python 2.5, not 2.6+
45     from __future__ import with_statement
46     
47     with storage.new_sandbox() as box:
48         WAP = box.unit(Zoo, Name='Wild Animal Park')
49         WAP.Opens = now
50     """
51    
52     def __init__(self, store):
53         self.store = store
54         self._caches = {}
55    
56     def __getattr__(self, key):
57         # Support "magic recaller" methods on self.
58         for cls in self.store.classes:
59             name = cls.__name__
60             if name == key:
61                 if cls.identifiers:
62                     uniq = cls.identifiers
63                 else:
64                     uniq = cls._properties.keys()
65                 def recaller(*args, **kwargs):
66                     # Allow identifiers to be supplied as args or kwargs
67                     # (since the common case will be a single identifier).
68                     for arg, key in zip(args, uniq):
69                         kwargs[str(key)] = arg
70                     return self.unit(cls, **kwargs)
71                 recaller.__doc__ = "A single %s Unit, else None." % name
72                 return recaller
73         raise AttributeError("Sandbox object has no attribute '%s'" % key)
74    
75     def memorize(self, *units):
76         """Persist the given unit(s) in storage."""
77         for unit in units:
78             cls = unit.__class__
79             unit.sandbox = self
80            
81             # Ask the store to accept the unit, assigning it primary key values
82             # if necessary. The store should also call unit.cleanse() if it
83             # saves the whole unit state on this call.
84             self.store.reserve(unit)
85            
86             # Insert the unit into the cache.
87             if cls.identifiers:
88                 uid = unit.identity()
89             else:
90                 # Use id(unit) instead of unit.ID
91                 uid = id(unit)
92             self._cache(cls)[uid] = unit
93            
94             # Do this at the end of the func, since most on_memorize
95             # will want to have an identity when called.
96             if hasattr(unit, "on_memorize"):
97                 unit.on_memorize()
98    
99     def forget(self, *units):
100         """Destroy the given units, both in the cache and storage."""
101         for unit in units:
102             cls = unit.__class__
103            
104             if cls.identifiers:
105                 uid = unit.identity()
106             else:
107                 uid = id(unit)
108             del self._cache(cls)[uid]
109            
110             self.store.destroy(unit)
111            
112             # This must be done after the destroy() call, so that a
113             # related unit can poll all instances of this class.
114             if hasattr(unit, "on_forget"):
115                 unit.on_forget()
116            
117             unit.sandbox = None
118    
119     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
120         """Iterator over units of the given class(es) which match expr.
121         
122         If the 'classes' arg is a UnitJoin, each yielded value will
123         be a list of Units, in the same order as the classes arg.
124         This facilitates unpacking in iterative consumer code like:
125         
126         for invoice, price in sandbox.xrecall(Invoice & Price, f):
127             deal_with(invoice)
128             deal_with(price)
129         
130         Recalling multiple classes is currently not well-isolated.
131         If an expr argument is supplied, then the store may not return rows
132         which our cache would, and those won't be included in the resultset.
133         If you're using xrecall with joins, you should be safe if:
134         
135             * You pass no expr, or
136             * You're using this sandbox as read-only, or
137             * You call flush_all() after mutating Units but before recalling
138                 multiple classes.
139         """
140         if isinstance(classes, dejavu.UnitJoin):
141             for unitrow in self._xmultirecall(classes, expr, order=order,
142                                               limit=limit, offset=offset):
143                 yield unitrow
144             return
145        
146         cls = classes
147         if not isinstance(expr, logic.Expression):
148             expr = logic.Expression(expr)
149        
150         cache = self._cache(cls)
151        
152         # Special-case the scenario where one Unit is expected
153         # and called by ID. We should be able to save a database hit.
154         if expr:
155             fc = expr.func.func_code
156             if (fc.co_code == _simple_attr_compare and
157                     cls.identifiers == (fc.co_names[-1], )):
158                 ID = fc.co_consts[-1]
159                 unit = cache.get((ID,))
160                 if unit is not None:
161                     # Do NOT call on_recall here. That should be called
162                     # only at the Sandbox-SM boundary.
163                     yield unit
164                     return
165        
166         if limit:
167             offset = offset or 0
168             limit = offset + limit
169         elif limit == 0:
170             return
171        
172         keys = []
173         if order:
174             # If an order is supplied, there's no point in running the
175             # query against our cache (because we'd have to interleave
176             # the results with those from storage anyway). We'll still
177             # prefer sandboxed units over those retrieved from storage
178             # (see below), but we'll pass for now.
179             pass
180         elif offset:
181             raise TypeError("Order argument expected when offset is provided.")
182         elif cls.identifiers:
183             # Query the cache. We have to use a static copy of the
184             # keys, to ensure that our cache doesn't change size
185             # during iteration (due to overlapping xrecalls).
186             keys = cache.keys()
187             for id in keys:
188                 unit = cache.get(id)
189                 if unit and ((expr is None) or expr.evaluate(unit)):
190                     # Do NOT call on_recall here. That should be called
191                     # only at the Sandbox-SM boundary.
192                     yield unit
193                     if limit:
194                         limit -= 1
195                         if limit == 0:
196                             return
197        
198         # Query storage.
199         if not cls.identifiers:
200             # Classes with no identifiers cannot be compared to our cache
201             for unit in self.store.xrecall(cls, expr, order=order,
202                                            limit=limit, offset=offset):
203                 unit.sandbox = self
204                 if hasattr(unit, 'on_recall'):
205                     try:
206                         unit.on_recall()
207                     except errors.UnrecallableError:
208                         continue
209                 yield unit
210         else:
211             for unit in self.store.xrecall(cls, expr, order=order,
212                                            limit=limit, offset=offset):
213                 id = unit.identity()
214                 # Don't offer up a unit that was already checked in our cache
215                 # (whether it matched the expr() or not--we assume the cache
216                 # has the freshest data).
217                 if id in keys:
218                     # We've already yielded this unit.
219                     pass
220                 else:
221                     # Very important that we check for an existing unit in
222                     # the sandbox cache, as its state may have changed in
223                     # memory but not in storage (even between our cache
224                     # yields and this yield).
225                     # Make sure the cache lookup and get happens atomically.
226                     existing = cache.get(id)
227                     if existing:
228                         yield existing
229                     else:
230                         unit.sandbox = self
231                         cache[id] = unit
232                         if hasattr(unit, 'on_recall'):
233                             try:
234                                 unit.on_recall()
235                             except errors.UnrecallableError:
236                                 continue
237                         yield unit
238    
239     def recall(self, classes, expr=None, order=None, limit=None, offset=None):
240         """List of units of the given class(es) which match expr.
241         
242         If the 'classes' arg is a UnitJoin, each yielded value will
243         be a list of Units, in the same order as the classes arg.
244         This facilitates unpacking in iterative consumer code like:
245         
246         for invoice, price in sandbox.recall(Invoice & Price, f):
247             deal_with(invoice)
248             deal_with(price)
249         
250         Recalling multiple classes is currently not well-isolated.
251         If an expr argument is supplied, then the store may not return rows
252         which our cache would, and those won't be included in the resultset.
253         If you're using recall with joins, you should be safe if:
254         
255             * You pass no expr, or
256             * You're using this sandbox as read-only, or
257             * You call flush_all() after mutating Units but before recalling
258                 multiple classes.
259         """
260         return [x for x in self.xrecall(classes, expr, order=order,
261                                         limit=limit, offset=offset)]
262    
263     def unit(self, cls, **kwargs):
264         """A single Unit which matches the given kwargs, else None.
265         
266         The first Unit matching the kwargs is returned; if no Units match,
267         None is returned.
268         """
269         cache = self._cache(cls)
270        
271         # Special-case the scenario where one Unit is expected
272         # and called by ID. We should be able to save a database hit.
273         if set(kwargs.keys()) == set(cls.identifiers):
274             ident = tuple([kwargs[k] for k in cls.identifiers])
275             if ident:
276                 u = cache.get(ident)
277                 if u is None:
278                     u = self.store.unit(cls, **kwargs)
279                     if u is not None:
280                         u.sandbox = self
281                         cache[ident] = u
282                         if hasattr(u, 'on_recall'):
283                             try:
284                                 u.on_recall()
285                             except errors.UnrecallableError:
286                                 return None
287                 return u
288        
289         # Query the cache. We have to use a static copy of the
290         # keys, to ensure that our cache doesn't change size
291         # during iteration (due to overlapping xrecalls).
292         keys = cache.keys()
293         for id in keys:
294             u = cache.get(id)
295             if u:
296                 for k, v in kwargs.iteritems():
297                     if getattr(u, k) != v:
298                         break
299                 else:
300                     # Do NOT call on_recall here. That should be called
301                     # only at the Sandbox-SM boundary.
302                     return u
303        
304         # Query Storage.
305         u = self.store.unit(cls, **kwargs)
306         if u is not None:
307             u.sandbox = self
308            
309             # Classes with no identifiers cannot be compared to our cache
310             if cls.identifiers:
311                 # Very important that we check for existing unit, as its
312                 # state may have changed in memory but not in storage
313                 # (even between our cache lookups and this lookup).
314                 # Make sure the cache lookup and get happens atomically.
315                 id = u.identity()
316                 existing = cache.get(id)
317                 if existing:
318                     return existing
319                 cache[id] = u
320            
321             if hasattr(u, 'on_recall'):
322                 try:
323                     u.on_recall()
324                 except errors.UnrecallableError:
325                     return None
326         return u
327    
328     def _xmultirecall(self, classes, expr=None,
329                       order=None, limit=None, offset=None):
330         """Recall units of each cls if they together match the expr.
331         
332         The 'classes' arg must be a UnitJoin, and each yielded value
333         will be a list of Units, in the same order as the classes arg.
334         This facilitates unpacking in iterative consumer code like:
335         
336         for invoice, price in sandbox.recall(Invoice & Price, f):
337             deal_with(invoice)
338             deal_with(price)
339         
340         Recalling multiple classes is currently not well-isolated.
341         If an expr argument is supplied, then the store may not return rows
342         which our cache would, and those won't be included in the resultset.
343         If you're using recall with joins, you should be safe if:
344         
345             * You pass no expr, or
346             * You're using this sandbox as read-only, or
347             * You call flush_all() after mutating Units but before recalling
348                 multiple classes.
349         """
350         if not isinstance(expr, logic.Expression):
351             expr = logic.Expression(expr)
352        
353         # This is broken. If a filter expr is supplied, then the store may
354         # not return rows which our cache would, and those won't be included
355         # in the resultset. If you're using xmulti with no expr's, or
356         # in read-only scripts, it should be OK for now. But if you mutate
357         # Units and then call _xmultirecall, expect inconsistent results.
358         for unitset in self.store._xmultirecall(classes, expr, order=order,
359                                                 limit=limit, offset=offset):
360             confirmed = True
361             for index in xrange(len(unitset)):
362                 unit = unitset[index]
363                 id = unit.identity()
364                 if not unit.sequencer.valid_id(id):
365                     # This is a 'dummy unit' from an outer join.
366                     continue
367                 cache = self._cache(unit.__class__)
368                 if id in cache:
369                     # Keep the unit which is in our cache!
370                     unitset[index] = cache[id]
371                 else:
372                     cache[id] = unit
373                     unit.sandbox = self
374                     if hasattr(unit, 'on_recall'):
375                         try:
376                             unit.on_recall()
377                         except errors.UnrecallableError:
378                             confirmed = False
379                             break
380             if confirmed:
381                 yield unitset
382    
383     def xview(self, query, distinct=False):
384         """Yield tuples of attrs for the given Query.
385         
386         Each yielded value will be a list of values, in the same order as
387         the Query attributes. This facilitates unpacking in iterative
388         consumer code like:
389         
390         for id, name in sandbox.xview(Query(Invoice, ['ID', 'Name'], f)):
391             print id, ": ", name
392         
393         This is generally much faster than recall, and should be preferred
394         for performance-sensitive code.
395         """
396         if not isinstance(query, dejavu.Query):
397             query = dejavu.Query(*query)
398        
399         expr = query.restriction
400         attrs = query.attributes
401         if isinstance(query.relation, dejavu.UnitJoin):
402             classes = query.relation
403         else:
404             classes = [query.relation]
405        
406         # Add the identity attribute(s) if not present. This is necessary
407         # to avoid duplicating objects which are already in our cache.
408         if isinstance(query.attributes, logic.Expression):
409             # TODO: add support for this
410             choke
411         elif isinstance(query.relation, dejavu.UnitJoin):
412             # TODO: add support for this
413             choke
414         else:
415             cls = query.relation
416             fields = list(query.attributes)
417             indices = []
418             added_fields = 0
419             for key in cls.identifiers:
420                 if key not in fields:
421                     added_fields += 1
422                     fields.append(key)
423                 indices.append(fields.index(key))
424        
425         seen = {}
426        
427         for cls in classes:
428             cache = self._cache(cls)
429             for unit in cache.itervalues():
430                 if expr is None or expr(unit):
431                     datarow = tuple([getattr(unit, attr) for attr in attrs])
432                     if distinct:
433                         if datarow not in seen:
434                             yield datarow
435                             seen[datarow] = None
436                     else:
437                         yield datarow
438        
439         for datarow in self.store.xview((query.relation, fields, expr)):
440             id = tuple([datarow[x] for x in indices])
441             if id not in cache:
442                 if added_fields:
443                     # Remove the added identifier columns from the row.
444                     datarow = datarow[:-added_fields]
445                
446                 if distinct:
447                     if datarow not in seen:
448                         yield datarow
449                         seen[datarow] = None
450                 else:
451                     yield datarow
452    
453     def view(self, query, distinct=False):
454         """Return tuples of attrs for the given Query."""
455         return [x for x in self.xview(query, distinct=distinct)]
456    
457     def sum(self, cls, attr, expr=None):
458         """Sum of all non-None values for the given cls.attr."""
459         expr = logic.Expression(lambda x: getattr(x, attr) != None) + expr
460         return sum([row[0] for row in
461                     self.view(dejavu.Query(cls, (attr,), expr))])
462    
463     def count(self, cls, expr=None):
464         """Number of Units of the given cls which match the given expr."""
465         if cls.identifiers:
466             uniq = cls.identifiers
467         else:
468             uniq = cls._properties.keys()
469         return len(self.view((cls, uniq, expr), distinct=True))
470    
471     def range(self, cls, attr, expr=None):
472         """Distinct, non-None attr values (ordered and continuous, if possible).
473         
474         If the given attribute is a known discrete, ordered type
475         (like int, long, datetime.date), this returns the closed interval:
476             
477             [min(attr), ..., max(attr)]
478         
479         That is, all possible values will be output between min and max,
480         even if they do not appear in the dataset.
481         
482         If the given attribute is not reasonably discrete (e.g., str,
483         unicode, or float) then all distinct, non-None values are returned
484         (sorted, if possible).
485         """
486         query = dejavu.Query(cls, [attr], expr)
487         existing = [x[0] for x in self.view(query, distinct=True)
488                     if x is not None]
489         if not existing:
490             return []
491        
492         attr_type = getattr(cls, attr).type
493         if issubclass(attr_type, (int, long)):
494             return range(min(existing), max(existing) + 1)
495         else:
496             try:
497                 import datetime
498             except ImportError:
499                 pass
500   &nb