Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/branches/crazycache/dejavu/storage/multischema.py

Revision 561 (checked in by fumanchu, 1 year ago)

DB sequencing improvements. Columns now obtain a default sequence_name as early as possible. Requires latest Geniusql.

  • Property svn:eol-style set to native
Line 
1 """A database Storage Manager which mixes multiple schemas.
2
3 This SM depends on each Dejavu class in the model possessing
4 an additional '_dbschema_name' attribute.
5 """
6
7 import threading
8
9 import geniusql
10 import dejavu
11 from dejavu import storage, logflags, xray
12 from dejavu.storage import db
13 from dejavu.errors import conflict
14
15
16 # --------------------------- Storage Manager --------------------------- #
17
18
19 class MultiSchemaStorageManagerDB(db.StorageManagerDB):
20     """StoreManager base class to save and retrieve Units using a DB."""
21    
22     def __init__(self, allOptions={}):
23         storage.StorageManager.__init__(self, allOptions)
24         self.reserve_lock = threading.Lock()
25        
26         # Config Overrides
27         def get_option(name):
28             item = allOptions.get(name)
29             if isinstance(item, basestring):
30                 item = xray.classes(item)
31             return item
32        
33         dbclass = get_option('Database Class')
34         if dbclass:
35             self.databaseclass = dbclass
36        
37         allOptions = dict([(str(k), v) for k, v in allOptions.iteritems()])
38        
39         self.db = self.databaseclass(**allOptions)
40        
41         self.schemas = {}
42         schema_names = allOptions.get('Schemas', '')
43         if isinstance(schema_names, basestring):
44             schema_names = schema_names.split(', ')
45         for name in schema_names:
46             name = name.strip()
47             if name:
48                 self.schemas[name] = self.db.schema(name)
49        
50         self._table_map = {}
51        
52         def logger(msg):
53             if self.logflags & logflags.SQL:
54                 self.log(logflags.SQL.message(msg))
55         self.db.log = logger
56    
57     def _seq_UnitSequencerInteger(self, unit):
58         """Reserve a unit (using the table's autoincrement fields)."""
59         # Grab the new ID. This is threadsafe because reserve has a mutex.
60         newids = self._table_map[unit.__class__].insert(**unit._properties)
61         for k, v in newids.iteritems():
62             setattr(unit, k, v)
63    
64     def _manual_reserve(self, unit):
65         """Use when the DB cannot automatically generate an identifier.
66         The identifiers will be supplied by UnitSequencer.assign().
67         """
68         t = self._table_map[unit.__class__]
69         if not unit.sequencer.valid_id(unit.identity()):
70             # Examine all existing IDs and grant the "next" one.
71             data = list(self.db.select((t, unit.identifiers)))
72             unit.sequencer.assign(unit, data)
73         t.insert(**unit._properties)
74    
75     def save(self, unit, forceSave=False):
76         """Update storage from unit's data (if unit.dirty())."""
77         if self.logflags & logflags.SAVE:
78             self.log(logflags.SAVE.message(unit, forceSave))
79        
80         if forceSave or unit.dirty():
81             self._table_map[unit.__class__].save(**unit._properties)
82             unit.cleanse()
83    
84     def destroy(self, unit):
85         """Delete the unit."""
86         if self.logflags & logflags.DESTROY:
87             self.log(logflags.DESTROY.message(unit))
88         self._table_map[unit.__class__].delete(**unit._properties)
89    
90    
91     #                                Views                                #
92    
93     def tablejoin(self, join):
94         """Return a geniusql Join tree for the given UnitJoin."""
95         t1, t2 = join.class1, join.class2
96        
97         if isinstance(t1, dejavu.UnitJoin):
98             wt1 = self.tablejoin(t1)
99         else:
100             wt1 = self._table_map[t1]
101        
102         if isinstance(t2, dejavu.UnitJoin):
103             wt2 = self.tablejoin(t2)
104         else:
105             wt2 = self._table_map[t2]
106        
107         uj = geniusql.Join(wt1, wt2, join.leftbiased)
108         # if the original UnitJoin had a custom association path,
109         # copy it to the new Join instance
110         uj.path = join.path
111         return uj
112    
113     def _geniusql_query(self, query):
114         """Return a Geniusql Query object for the given Dejavu Query."""
115         rel = query.relation
116         if isinstance(rel, dejavu.UnitJoin):
117             rel = self.tablejoin(rel)
118         else:
119             rel = self._table_map[rel]
120         return geniusql.Query(rel, query.attributes, query.restriction)
121    
122     def insert_into(self, name, query, distinct=False):
123         """INSERT matching data INTO a new class and return the class."""
124         if not isinstance(query, dejavu.Query):
125             query = dejavu.Query(*query)
126        
127         self.db.insert_into(name, self._geniusql_query(query),
128                             distinct=distinct)
129         if isinstance(query.relation, dejavu.UnitJoin):
130             for cls in query.relation:
131                 schema = self._table_map[cls].schema
132                 break
133         else:
134             schema = self._table_map[query.relation].schema
135         return Modeler(schema).make_class(name)
136    
137     def make_class(self, name):
138         """Return a (new) Unit class for the given storage name."""
139         # TODO:
140         raise NotImplementedError
141    
142    
143     #                               Schemas                               #
144    
145     def create_database(self, conflicts='error'):
146         """Create internal structures for the entire database.
147         
148         conflicts: see errors.conflict.
149         """
150         if self.logflags & logflags.DDL:
151             self.log(logflags.DDL.message("create database"))
152         try:
153             self.db.create()
154             for s in self.schemas.itervalues():
155                 s.create()
156         except geniusql.errors.MappingError, x:
157             conflict(conflicts, str(x))
158    
159     def drop_database(self, conflicts='error'):
160         """Destroy internal structures for the entire database.
161         
162         conflicts: see errors.conflict.
163         """
164         if self.logflags & logflags.DDL:
165             self.log(logflags.DDL.message("drop database"))
166        
167         try:
168             self.db.drop()
169         except geniusql.errors.MappingError, x:
170             conflict(conflicts, str(x))
171    
172     def _make_table(self, cls):
173         """Create and return a Table or View object for the given class."""
174         s = self.schemas[cls._dbschema_name]
175         if hasattr(cls, "view_statement"):
176             gs = self._geniusql_statement(cls.view_statement)
177             self._table_map[cls] = t = s.view(cls.__name__, gs)
178             fields = []
179             for key in cls.properties:
180                 t[key] = self._make_column(cls, key)
181         else:
182             self._table_map[cls] = t = s.table(cls.__name__)
183             indices = cls.indices()
184             fields = []
185             for key in cls.properties:
186                 t[key] = col = self._make_column(cls, key)
187                 if key in indices:
188                     t.add_index(key)
189                 if col.autoincrement and col.sequence_name is None:
190                     # Not every database needs/uses sequence_name
191                     col.sequence_name = s.sequence_name(t.name, key)
192        
193         # Copy associations to table.references.
194         for k, v in cls._associations.iteritems():
195             t.references[k] = (v.nearKey, v.farClass.__name__, v.farKey)
196        
197         return t
198    
199     def create_storage(self, cls, conflicts='error'):
200         """Create internal structures for the given class.
201         
202         conflicts: see errors.conflict.
203         """
204         if self.logflags & logflags.DDL:
205             self.log(logflags.DDL.message("create storage %s" % cls))
206         try:
207             # Attach to a schema, which should call CREATE TABLE.
208             s = self.schemas[cls._dbschema_name]
209             s[cls.__name__] = self._make_table(cls)
210         except geniusql.errors.MappingError, x:
211             conflict(conflicts, str(x))
212    
213     def _make_column(self, cls, key):
214         s = self._table_map[cls].schema
215         prop = getattr(cls, key)
216         col = s.column(prop.type, default=prop.default, hints=prop.hints)
217         if key in cls.identifiers:
218             col.key = True
219             if isinstance(cls.sequencer, dejavu.UnitSequencerInteger):
220                 col.autoincrement = True
221                 col.initial = cls.sequencer.initial
222         return col
223    
224     def has_storage(self, cls):
225         """If storage structures exist for the given class, return True."""
226         return cls.__name__ in self.schemas[cls._dbschema_name]
227    
228     def drop_storage(self, cls, conflicts='error'):
229         """Destroy internal structures for the given class.
230         
231         conflicts: see errors.conflict.
232         """
233         if self.logflags & logflags.DDL:
234             self.log(logflags.DDL.message("drop storage %s" % cls))
235         try:
236             s = self._table_map[cls].schema
237             del s[cls.__name__]
238         except geniusql.errors.MappingError, x:
239             conflict(conflicts, str(x))
240    
241     def rename_storage(self, oldname, newname, conflicts='error'):
242         """Rename internal structures for the given class.
243         
244         conflicts: see errors.conflict.
245         """
246         # TODO:
247         return NotImplementedError
248    
249     def add_property(self, cls, name, conflicts='error'):
250         """Add internal structures for the given property.
251         
252         conflicts: see errors.conflict.
253         """
254         if self.logflags & logflags.DDL:
255             self.log(logflags.DDL.message("add property %s %s" %
256                                           (cls, name)))
257         if not self.has_property(cls, name):
258             try:
259                 s = self._table_map[cls].schema
260                 t = s[cls.__name__]
261                 t[name] = col = self._make_column(cls, name)
262                 if col.autoincrement and col.sequence_name is None:
263                     # Not every database needs/uses sequence_name
264                     col.sequence_name = s.sequence_name(t.name, name)
265             except geniusql.errors.MappingError, x:
266                 conflict(conflicts, str(x))
267    
268     def has_property(self, cls, name):
269         """If storage structures exist for the given property, return True."""
270         return name in self._table_map[cls][cls.__name__]
271    
272     def drop_property(self, cls, name, conflicts='error'):
273         """Destroy internal structures for the given property.
274         
275         conflicts: see errors.conflict.
276         """
277         if self.logflags & logflags.DDL:
278             self.log(logflags.DDL.message("drop property %s %s" %
279                                           (cls, name)))
280         if self.has_property(cls, name):
281             try:
282                 del self._table_map[cls][name]
283             except geniusql.errors.MappingError, x:
284                 conflict(conflicts, str(x))
285    
286     def rename_property(self, cls, oldname, newname, conflicts='error'):
287         """Rename internal structures for the given property.
288         
289         conflicts: see errors.conflict.
290         """
291         if self.logflags & logflags.DDL:
292             self.log(logflags.DDL.message(
293                 "rename property %s from %s to %s" %
294                 (cls, oldname, newname)))
295        
296         try:
297             t = self._table_map[cls]
298             s = t.schema
299            
300             # Sometimes, a Dejavu Schema will change a code model first, and
301             # then change the database afterward. So it's possible that the
302             # column we're trying to rename hasn't been loaded, because the
303             # model layer no longer references it. So if table[oldname]
304             # raises a KeyError, try to find a column that matches oldkey.
305             tempcol = None
306             try:
307                 t[oldname]
308             except KeyError:
309                 c = [x for x in s._get_columns(t.name)
310                      if x.name == s.column_name(t.name, oldname)]
311                 if not c:
312                     raise KeyError("Rename failed. Old column %r not found in %r."
313                                    % (oldname, t.name))
314                 oldcol = c[0]
315                 # Use the superclass call to avoid DROP COLUMN/ADD COLUMN.
316                 dict.__setitem__(t, oldname, oldcol)
317            
318             t.rename(oldname, newname)
319         except geniusql.errors.MappingError, x:
320             conflict(conflicts, str(x))
321    
322     def add_index(self, cls, name, conflicts='error'):
323         """Add an index on the given property.
324         
325         conflicts: see errors.conflict.
326         """
327         try:
328             self._table_map[cls].add_index(name)
329         except geniusql.errors.MappingError, x:
330             conflict(conflicts, str(x))
331    
332     def has_index(self, cls, name):
333         """If an index exists on the given property, return True."""
334         return name in self._table_map[cls].indices
335    
336     def drop_index(self, cls, name, conflicts='error'):
337         """Destroy any index on the given property.
338         
339         conflicts: see errors.conflict.
340         """
341         try:
342             del self._table_map[cls].indices[name]
343         except geniusql.errors.MappingError, x:
344             conflict(conflicts, str(x))
345    
346     auto_discover = True
347    
348     def map(self, classes, conflicts='error'):
349         """Map classes to internal storage.
350         
351         If self.auto_discover is True (the default), then Table/Column/Index
352         objects will be formed by inspecting the underlying database using
353         self.sync().
354         
355         If auto_discover is False, then mock Table/Column/Index objects
356         will be used instead; this provides a performance improvement
357         in scenarios where the model maps perfectly to the database
358         and changes to the database are not expected outside the model.
359         
360         conflicts: see errors.conflict.
361         """
362         # Map tables before views, because views depend on them
363         tables = [cls for cls in classes if not hasattr(cls, 'view_statement')]
364         views = [cls for cls in classes if hasattr(cls, 'view_statement')]
365         classes = tables + views
366        
367         if self.auto_discover:
368             self.sync(classes, conflicts)
369         else:
370             for cls in classes:
371                 s = self.schemas[cls._dbschema_name]
372                 if cls.__name__ in s:
373                     # If our consumer-side key is already present, skip this cls.
374                     # This allows callers to auto-sync class by class
375                     # without making a new Table object each time.
376                     continue
377                
378                 t = self._make_table(cls)
379                
380                 # Use the superclass call to avoid DROP/CREATE TABLE
381                 dict.__setitem__(s, cls.__name__, t)
382    
383     def sync(self, classes, conflicts='error'):
384         """Map classes to existing Table objects (found via discovery).
385         
386         conflicts: see errors.conflict.
387         """
388         for cls in classes:
389             clsname = cls.__name__
390            
391             s = self.schemas[cls._dbschema_name]
392             if clsname in s:
393                 # If our consumer-side key is already present, skip this cls.
394                 # This allows callers to auto-sync class by class
395                 # without calling the expensive discover() func each time.
396                 continue
397            
398             self._table_map[cls] = self._find_table(s, cls, conflicts)
399
Note: See TracBrowser for help on using the browser.