Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/branches/crazycache/dejavu/storage/db.py

Revision 562 (checked in by fumanchu, 1 year ago)

Using the new index_name method from Geniusql.

  • Property svn:eol-style set to native
Line 
1 """Base classes and tools for writing database Storage Managers.
2
3 DATA TYPES
4 ==========
5 Database Storage Manager modules are mostly adapters to support round-trip
6 data coercion:
7
8 Unit type -> [SQL repr ->] DB -> incoming Python value -> Unit type
9
10 Since Dejavu relies on external database servers for its persistence,
11 Python datatypes must be converted to column types in the DB. When writing
12 a StorageManager, you should make sure that your type conversions can handle
13 at least the following limitations: If possible, implement the type with no
14 limits. Also, follow UnitProperty.hints['bytes'] where possible. A value
15 of zero for hints['bytes'] implies no limit. If no value is given, try to
16 assume no limit, although you may choose whatever default size you wish
17 (255 is common for strings).
18
19 ENCODING ISSUES
20 ===============
21 All SQL sent to the database must be strings, not unicode. You can set the
22 encoding of the Adapters (I may add a more centralized encoding context in
23 the future). We must use encoded strings so that we can mix encodings
24 within the same string; for example, we might have a DB which understands
25 utf8, but a pickle value which will be encoded in raw-unicode-escape inline
26 with that. All values, therefore, must be coerced before we try to join
27 them into an SQL statement string.
28
29 """
30
31 import threading
32 import warnings
33
34
35 import geniusql
36 from geniusql import logic, logicfuncs
37
38 import dejavu
39 from dejavu import storage, logflags, xray
40 from dejavu.errors import StorageWarning, MappingError, conflict
41
42
43 # --------------------------- Storage Manager --------------------------- #
44
45
46 class StorageManagerDB(storage.StorageManager):
47     """StoreManager base class to save and retrieve Units using a DB."""
48    
49     databaseclass = geniusql.Database
50    
51     def __init__(self, allOptions={}):
52         storage.StorageManager.__init__(self, allOptions)
53         self.reserve_lock = threading.Lock()
54        
55         # Config Overrides
56         def get_option(name):
57             item = allOptions.get(name)
58             if isinstance(item, basestring):
59                 item = xray.classes(item)
60             return item
61        
62         dbclass = get_option('Database Class')
63         if dbclass:
64             self.databaseclass = dbclass
65        
66         allOptions = dict([(str(k), v) for k, v in allOptions.iteritems()])
67        
68         self.db = self.databaseclass(**allOptions)
69         self.schema = self.db.schema()
70        
71         if 'Prefix' in allOptions:
72             self.schema.prefix = allOptions['Prefix']
73        
74         def logger(msg):
75             if self.logflags & logflags.SQL:
76                 self.log(logflags.SQL.message(msg))
77         self.db.log = logger
78    
79     def version(self):
80         return self.db.version()
81    
82     def shutdown(self, conflicts='error'):
83         """Shut down all connections to internal storage.
84         
85         conflicts: see errors.conflict.
86         """
87         self.db.connections.shutdown()
88    
89     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
90         """Yield a sequence of Unit instances which satisfy the expression."""
91         if isinstance(classes, dejavu.UnitJoin):
92             for unitrow in self._xmultirecall(classes, expr, order=order,
93                                               limit=limit, offset=offset):
94                 yield unitrow
95             return
96         else:
97             cls = classes
98        
99         if self.logflags & logflags.RECALL:
100             self.log(logflags.RECALL.message(cls, expr))
101        
102         clsname = cls.__name__
103        
104         # Put the identifier properties first, in case other fields
105         # depend upon them.
106         idnames = list(cls.identifiers)
107         attrs = idnames + [x for x in cls.properties if x not in idnames]
108         coercers = [getattr(cls, key).coerce for key in attrs]
109        
110         data = self.select((cls, attrs, expr), order=order,
111                            limit=limit, offset=offset)
112         for row in data:
113             unit = cls.__new__(cls)
114             unit._zombie = True
115             unit.__init__()
116            
117             for key, value, propcoerce in zip(attrs, row, coercers):
118                 try:
119                     if propcoerce:
120                         value = propcoerce(unit, value)
121                     unit._properties[key] = value
122                 except UnicodeDecodeError, x:
123                     x.reason += " [%r: %r]" % (key, value)
124                     raise
125                 except Exception, x:
126                     x.args += (key, value)
127                     raise
128            
129             # If our SQL is imperfect, don't yield it to the
130             # caller unless it passes expr(unit).
131             if expr and data.statement.imperfect:
132                 if not expr(unit):
133                     continue
134            
135             unit.cleanse()
136             yield unit
137    
138     def reserve(self, unit):
139         """Reserve a persistent slot for unit."""
140         self.reserve_lock.acquire()
141         try:
142             # First, see if our db subclass has a handler that
143             # uses the DB to generate the appropriate identifier(s).
144             seqclass = unit.sequencer.__class__.__name__
145             seq_handler = getattr(self, "_seq_%s" % seqclass, None)
146             if seq_handler:
147                 seq_handler(unit)
148             else:
149                 self._manual_reserve(unit)
150             unit.cleanse()
151         finally:
152             self.reserve_lock.release()
153        
154         # Usually we log ASAP, but here we log after
155         # the unit has had a chance to get an auto ID.
156         if self.logflags & logflags.RESERVE:
157             self.log(logflags.RESERVE.message(unit))
158    
159     def _seq_UnitSequencerInteger(self, unit):
160         """Reserve a unit (using the table's autoincrement fields)."""
161         cls = unit.__class__
162        
163         # Grab the new ID. This is threadsafe because reserve has a mutex.
164         newids = self.schema[cls.__name__].insert(**unit._properties)
165         for k, v in newids.iteritems():
166             setattr(unit, k, v)
167    
168     def _manual_reserve(self, unit):
169         """Use when the DB cannot automatically generate an identifier.
170         The identifiers will be supplied by UnitSequencer.assign().
171         """
172         cls = unit.__class__
173         t = self.schema[cls.__name__]
174         if not unit.sequencer.valid_id(unit.identity()):
175             # Examine all existing IDs and grant the "next" one.
176             data = list(self.db.select((t, cls.identifiers)))
177             cls.sequencer.assign(unit, data)
178         t.insert(**unit._properties)
179    
180     def save(self, unit, forceSave=False):
181         """Update storage from unit's data (if unit.dirty())."""
182         if self.logflags & logflags.SAVE:
183             self.log(logflags.SAVE.message(unit, forceSave))
184        
185         if forceSave or unit.dirty():
186             self.schema[unit.__class__.__name__].save(**unit._properties)
187             unit.cleanse()
188    
189     def destroy(self, unit):
190         """Delete the unit."""
191         if self.logflags & logflags.DESTROY:
192             self.log(logflags.DESTROY.message(unit))
193        
194         table = self.schema[unit.__class__.__name__]
195         table.delete(**unit._properties)
196    
197    
198     #                                Views                                #
199    
200     def tablejoin(self, join):
201         """Return a geniusql Join tree for the given UnitJoin."""
202         t1, t2 = join.class1, join.class2
203        
204         if isinstance(t1, dejavu.UnitJoin):
205             wt1 = self.tablejoin(t1)
206         else:
207             wt1 = self.schema[t1.__name__]
208        
209         if isinstance(t2, dejavu.UnitJoin):
210             wt2 = self.tablejoin(t2)
211         else:
212             wt2 = self.schema[t2.__name__]
213        
214         uj = geniusql.Join(wt1, wt2, join.leftbiased)
215         # if the original UnitJoin had a custom association path,
216         # copy it to the new Join instance
217         uj.path = join.path
218         return uj
219    
220     def _geniusql_query(self, query):
221         """Return a Geniusql Query object for the given Dejavu Query."""
222         rel = query.relation
223         if isinstance(rel, dejavu.UnitJoin):
224             rel = self.tablejoin(rel)
225         elif rel is None:
226             # This is a Geniusql-ism: send the schema when we have no FROM.
227             rel = self.schema
228         else:
229             rel = self.schema[rel.__name__]
230         return geniusql.Query(rel, query.attributes, query.restriction)
231    
232     def _geniusql_statement(self, statement):
233         """Return a Geniusql Statement object for the given Dejavu Statement."""
234         return geniusql.Statement(
235             self._geniusql_query(statement.query),
236             order=statement.order, limit=statement.limit,
237             offset=statement.offset, distinct=statement.distinct)
238    
239     def select(self, query, order=None, limit=None, offset=None, distinct=None):
240         """Return a geniusql Dataset for the given Query object."""
241         if not isinstance(query, dejavu.Query):
242             query = dejavu.Query(*query)
243        
244         return self.db.select(self._geniusql_query(query),
245                               order=order, limit=limit, offset=offset,
246                               distinct=distinct, strict=False)
247    
248     def insert_into(self, name, query, distinct=False):
249         """INSERT matching data INTO a new class and return the class."""
250         if not isinstance(query, dejavu.Query):
251             query = dejavu.Query(*query)
252        
253         self.db.insert_into(name, self._geniusql_query(query),
254                             distinct=distinct)
255         return Modeler(self.schema).make_class(name)
256    
257     def make_class(self, name):
258         """Return a (new) Unit class for the given storage name."""
259         return Modeler(self.schema).make_class(name)
260    
261     def xview(self, query, order=None, limit=None, offset=None, distinct=False):
262         """Yield value tuples for the given query."""
263         if not isinstance(query, dejavu.Query):
264             query = dejavu.Query(*query)
265        
266         if self.logflags & logflags.VIEW:
267             self.log(logflags.VIEW.message(query, distinct))
268        
269         data = self.select(query, order=order, limit=limit, offset=offset,
270                            distinct=distinct)
271         if data.statement.imperfect:
272             # ^%$#@! There's no way to handle imperfect queries without
273             # creating all involved Units, which defeats the performance
274             # benefits of view.
275             clsname = self.__class__.__name__
276             warnings.warn("The requested query cannot produce perfect SQL "
277                           "with a %s datasource. It may take an absurdly "
278                           "long time to run, since each unit must be fully-"
279                           "formed. %s" % (clsname, query), StorageWarning)
280             for row in storage.StorageManager.xview(self, query, order=order,
281                                                     limit=limit, offset=offset,
282                                                     distinct=distinct):
283                 yield row
284         else:
285             # Use tuples for hashability
286             for row in data:
287                 yield tuple(row)
288    
289     def count(self, cls, expr=None):
290         """Number of Units of the given cls which match the given expr."""
291         if cls.identifiers:
292             uniq = cls.identifiers
293         else:
294             uniq = cls._properties.keys()
295         # TODO: handle multiple args to count()
296         counter = lambda x: [logicfuncs.count(getattr(x, uniq[0]))]
297        
298         query = dejavu.Query(cls, counter, expr)
299        
300         if self.logflags & logflags.VIEW:
301             self.log(logflags.VIEW.message(query, False))
302        
303         data = self.select(query)
304         if data.statement.imperfect:
305             # ^%$#@! There's no way to handle imperfect queries without
306             # creating all involved Units, which defeats the performance
307             # benefits of view.
308             clsname = self.__class__.__name__
309             warnings.warn("The requested query cannot produce perfect SQL "
310                           "with a %s datasource. It may take an absurdly "
311                           "long time to run, since each unit must be fully-"
312                           "formed. %s" % (clsname, query), StorageWarning)
313             return storage.StorageManager.count(self, cls, expr)
314         else:
315             return data.scalar()
316    
317     def _xmultirecall(self, classes, expr=None, order=None, limit=None, offset=None):
318         """Yield Unit instance sets which satisfy the expression."""
319         if self.logflags & logflags.RECALL:
320             self.log(logflags.RECALL.message(classes, expr))
321        
322         # Gather attribute list.
323         allattrs = []
324         props = []
325         for cls in classes:
326             attrs = []
327             for key in cls.properties:
328                 attrs.append(key)
329                 props.append((cls, key, getattr(cls, key).coerce))
330             allattrs.append(attrs)
331        
332         data = self.select((classes, allattrs, expr), order=order,
333                            limit=limit, offset=offset)
334         for row in data:
335             # TODO: This is broken; won't work if same cls appears twice.
336             units = {}
337             for i, (cls, key, propcoerce) in enumerate(props):
338                 if cls in units:
339                     unit = units[cls]
340                 else:
341                     unit = cls.__new__(cls)
342                     unit._zombie = True
343                     unit.__init__()
344                     units[cls] = unit
345                
346                 value = row[i]
347                 try:
348                     if propcoerce:
349                         value = propcoerce(unit, value)
350                     unit._properties[key] = value
351                 except Exception, x:
352                     x.args += (cls, key)
353                     raise
354            
355             unitset = []
356             for cls in classes:
357                 unit = units[cls]
358                 unit.cleanse()
359                 unitset.append(unit)
360            
361             # If our SQL is imperfect, don't yield units to the
362             # caller unless they pass expr(unit).
363             acceptable = True
364             if expr and data.statement.imperfect:
365                 acceptable = expr(*unitset)
366             if acceptable:
367                 yield unitset
368    
369     #                               Schemas                               #
370    
371     def create_database(self, conflicts='error'):
372         """Create internal structures for the entire database.
373         
374         conflicts: see errors.conflict.
375         """
376         if self.logflags & logflags.DDL:
377             self.log(logflags.DDL.message("create database"))
378        
379         try:
380             self.db.create()
381             self.schema.create()
382         except geniusql.errors.MappingError, x:
383             conflict(conflicts, str(x))
384    
385     def drop_database(self, conflicts='error'):
386         """Destroy internal structures for the entire database.
387         
388         conflicts: see errors.conflict.
389         """
390         if self.logflags & logflags.DDL:
391             self.log(logflags.DDL.message("drop database"))
392        
393         try:
394             self.schema.drop()
395         except geniusql.errors.MappingError, x:
396             conflict(conflicts, str(x))
397        
398         try:
399             self.db.drop()
400         except geniusql.errors.MappingError, x:
401             conflict(conflicts, str(x))
402    
403     def _make_table(self, cls):
404         """Create and return a Table or View object for the given class."""
405         if hasattr(cls, "view_statement"):
406             gs = self._geniusql_statement(cls.view_statement)
407             t = self.schema.view(cls.__name__, gs)
408             fields = []
409             for key in cls.properties:
410                 t[key] = self._make_column(cls, key)
411         else:
412             t = self.schema.table(cls.__name__)
413             indices = cls.indices()
414             fields = []
415             for key in cls.properties:
416                 t[key] = col = self._make_column(cls, key)
417                 if key in indices:
418                     t.add_index(key)
419                 if col.autoincrement and col.sequence_name is None:
420                     # Not every database needs/uses sequence_name
421                     col.sequence_name = self.schema.sequence_name(t.name, key)
422        
423         # Copy associations to table.references.
424         for k, v in cls._associations.iteritems():
425             t.references[k] = (v.nearKey, v.farClass.__name__, v.farKey)
426        
427         return t
428    
429     def create_storage(self, cls, conflicts='error'):
430         """Create internal structures for the given class.
431         
432         conflicts: see errors.conflict.
433         """
434         if self.logflags & logflags.DDL:
435             self.log(logflags.DDL.message("create storage %s" % cls))
436        
437         try:
438             # Attach to self.schema, which should call CREATE TABLE.
439             self.schema[cls.__name__] = self._make_table(cls)
440         except geniusql.errors.MappingError,