Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/dejavu/storage/partitions.py

Revision 482 (checked in by fumanchu, 6 years ago)

Spec-compliance updates.

  • Property svn:eol-style set to native
Line 
1 """A StorageManager for Dejavu which mediates multiple stores."""
2
3 from geniusql import logic
4
5 import dejavu
6 from dejavu import errors, storage, logflags
7
8
9 class VerticalPartitioner(storage.StorageManager):
10     """A mediator for multiple vertically-partitioned stores.
11     
12     Options:
13         <Name> Store: Use the name to look up another store and attach it
14             to self.stores.
15         <Name> Units: Use the name to look up another store and direct
16             the given Unit classes to that named store.
17     """
18    
19     # A map from store names to StorageManager instances
20     stores = {}
21    
22     # A map from Unit classes to StorageManager instances
23     classmap = {}
24    
25     # A map from Unit class names to StorageManager instances
26     classnames = {}
27    
28     def __init__(self, allOptions={}):
29         storage.StorageManager.__init__(self, allOptions)
30         self.stores = {}
31         self.classmap = {}
32         self.classnames = {}
33         self.default_store = None
34        
35         opts = allOptions.items()
36         opts.sort()
37         for k, v in opts:
38             if k.endswith(" Store"):
39                 self.stores[k.split(" ", 1)[0]] = v
40             elif k.endswith(" Units"):
41                 sm = self.stores[k.split(" ", 1)[0]]
42                 units = [x.strip() for x in v.split(",") if x.strip()]
43                 if units:
44                     for x in units:
45                         self.classnames[x] = sm
46                 else:
47                     self.default_store = sm
48    
49     def migrate_class(self, cls, new_store):
50         """Copy all units of cls to new_store."""
51         new_store.create_storage(cls)
52         for unit in self.new_sandbox().xrecall(cls):
53             new_store.reserve(unit)
54             new_store.save(unit, forceSave=True)
55    
56     def migrate(self, new_store, old_store=None, copy_only=False):
57         """Copy all units (of old_store) to new_store."""
58         for cls in self.classes:
59             store = self.storage(cls)
60             if old_store is None or old_store is store:
61                 self.migrate_class(cls, new_store)
62                 if not copy_only:
63                     self.classmap[cls] = new_store
64    
65     def add_store(self, name, store, units=None):
66         """Register a StorageManager to be mediated.
67         
68         name: a key for the given store.
69         store: a StorageManager class, an instance of that class,
70             or the full importable dotted-package name of the class.
71         units: a list of strings, naming the classes this store
72             will handle. If None, the store will handle all classes
73             which are not handled by any other store.
74         """
75         self.stores[name] = store = storage.resolve(store, options)
76         if units:
77             for x in units:
78                 self.classnames[x] = store
79         else:
80             self.default_store = store
81         return store
82    
83     def remove_store(self, name):
84         """Remove (unregister) the named store.
85         
86         All classes associated to the given store will be disassociated,
87         and will then be free to associate with another store (usually
88         the default store).
89         """
90         if name in self.stores:
91             store = self.stores[name]
92            
93             # Disassociate all registered classes with this store.
94             for c in self.classes:
95                 if self.classmap[c] is store:
96                     self.classmap[c] = None
97            
98             if store is self.default_store:
99                 self.default_store = None
100            
101             del self.stores[name]
102    
103     def map_all(self, conflict_mode='error'):
104         """Map all registered classes to internal storage structures.
105         
106         Although classes are mapped automatically the first time they
107         are accessed (see self.storage), in production it is often more
108         useful to map all classes at application startup. Call this
109         method to do so (but register all classes first).
110         
111         This method is idempotent, but that doesn't mean cheap. Try not
112         to call it very often (once at app startup is usually enough).
113         
114         conflict_mode: This argument determines what happens when there are
115         discrepancies between the Dejavu model and the actual database.
116             
117             If 'error' (the default), MappingError is raised for the
118             first issue and the sync process is aborted.
119             
120             If 'warn', then a warning is raised (instead of an error)
121             for each issue, and the sync process is not aborted. This
122             allows you to see all errors at once, without having to stop
123             and fix each one and then execute the process again.
124             
125             If 'repair', then each issue will be resolved by changing
126             the database to match the model.
127         """
128         storemap = {}
129        
130         # Don't use iteritems because self.storage may mutate the dict.
131         for cls, store in self.classmap.items():
132             if store is None:
133                 # Find storage directly to skip the map() call in self.storage.
134                 store = self.classnames.get(cls.__name__, self.default_store)
135                 self.classmap[cls] = store
136             bucket = storemap.setdefault(store, [])
137             bucket.append(cls)
138        
139         storemap = [(s.loadOrder, s, c) for s, c in storemap.iteritems()]
140         storemap.sort()
141        
142         for order, store, classes in storemap:
143             store.map(classes, conflict_mode)
144    
145     def shutdown(self):
146         """Shutdown self and all its stores."""
147         # Tell all stores to shut down.
148         stores = [(v.shutdownOrder, v, k) for k, v in self.stores.iteritems()]
149         stores.sort()
150         for order, store, name in stores:
151             store.shutdown()
152    
153     def version(self):
154         """Return provider-specific version strings for each mediated store."""
155         output = []
156         for store in self.stores.itervalues():
157             if store.version:
158                 output.append(store.version())
159         return '\n\n'.join(output)
160    
161    
162     # --------------------- Unit Class Registration --------------------- #
163    
164     def create_database(self):
165         for s in self.stores.itervalues():
166             s.create_database()
167    
168     def drop_database(self):
169         for s in self.stores.itervalues():
170             s.drop_database()
171    
172     def register(self, cls):
173         """Assert that Units of class 'cls' will be handled."""
174         storage.StorageManager.register(self, cls)
175        
176         # We must allow modules to register classes before any stores have
177         # been added, but not overwrite a store which has already been found.
178         if cls not in self.classmap:
179             self.classmap[cls] = None
180    
181     def storage(self, cls, automap=True):
182         """Return the StorageManager which handles Units of the given class.
183         
184         The results of this call will be cached for performance.
185         
186         If automap is True (the default), this method will call
187         store.map(cls) if the class is not already cached.
188         """
189         store = self.classmap.get(cls)
190         if store:
191             return store
192        
193         store = self.classnames.get(cls.__name__, self.default_store)
194         self.classmap[cls] = store
195         if automap:
196             store.map([cls])
197         return store
198    
199     def create_storage(self, cls):
200         """Create storage space for cls."""
201         store = self.storage(cls, automap=False)
202         store.create_storage(cls)
203    
204     def has_storage(self, cls):
205         """If storage space for cls exists, return True (False otherwise)."""
206         try:
207             store = self.storage(cls)
208         except errors.MappingError:
209             return False
210         return store.has_storage(cls)
211    
212     def drop_storage(self, cls):
213         """Remove storage space for cls."""
214         self.storage(cls).drop_storage(cls)
215    
216     def add_property(self, cls, name):
217         """Add storage space for the named property of the given cls."""
218         self.storage(cls).add_property(cls, name)
219    
220     def drop_property(self, cls, name):
221         """Drop storage space for the named property of the given cls."""
222         self.storage(cls).drop_property(cls, name)
223    
224     def rename_property(self, cls, oldname, newname):
225         """Rename storage space for the property of the given cls."""
226         self.storage(cls).rename_property(cls, oldname, newname)
227    
228    
229     # ------------------------------- DML ------------------------------- #
230    
231     def reserve(self, unit):
232         """Reserve storage space for the Unit."""
233         self.storage(unit.__class__).reserve(unit)
234    
235     def save(self, unit, forceSave=False):
236         """Store the unit's property values."""
237         self.storage(unit.__class__).save(unit, forceSave)
238    
239     def destroy(self, unit):
240         """Delete the unit."""
241         self.storage(unit.__class__).destroy(unit)
242    
243     def xrecall(self, cls, expr=None, order=None, limit=None, offset=None):
244         """Yield a sequence of Unit instances which satisfy the expression."""
245         if isinstance(cls, dejavu.UnitJoin):
246             for unitrow in self.multirecall(cls, expr):
247                 yield unitrow
248         else:
249             for unit in self.storage(cls).xrecall(cls, expr, order,
250                                                   limit, offset):
251                 yield unit
252    
253     def xmultirecall(self, classes, expr=None, order=None, limit=None, offset=None):
254         """Yield lists of units of the given classes which match expr.
255         
256         This does not yet handle multiple classes in disparate stores.
257         """
258         if expr is None:
259             expr = logic.Expression(lambda *args: True)
260        
261         return self._single_store(classes).xmultirecall(classes, expr, order,
262                                                         limit, offset)
263    
264     def _single_store(self, relation):
265         """Return the store for the given relation (or raise ValueError)."""
266         if hasattr(relation, "class1"):
267             # This is a UnitJoin
268             stores = [self.storage(cls) for cls in relation]
269             store = stores[0]
270             for s in stores:
271                 if s is not store:
272                     raise ValueError(u"This operation does not support multiple"
273                                      u" classes in disparate stores.")
274         else:
275             store = self.storage(relation)
276         return store
277    
278     def xview(self, query, distinct=False):
279         """Yield tuples of attribute values for the given query.
280         
281         Each yielded value will be a list of values, in the same order as
282         the Query.attributes. This facilitates unpacking in iterative
283         consumer code like:
284         
285         for id, name in store.view(Query(Invoice, ['ID', 'Name'], f)):
286             print id, ": ", name
287         
288         This is generally much faster than recall, and should be preferred
289         for performance-sensitive code.
290         """
291         if not isinstance(query, dejavu.Query):
292             query = dejavu.Query(*query)
293        
294         if self.logflags & logflags.VIEW:
295             self.log(logflags.VIEW.message(query, distinct))
296        
297         store = self._single_store(query.relation)
298         for row in store.xview(query, distinct=distinct):
299             yield row
300    
301     def insert_into(self, name, query, distinct=False):
302         """INSERT matching data INTO a new class and return the class."""
303         if not isinstance(query, dejavu.Query):
304             query = dejavu.Query(*query)
305        
306         store = self._single_store(query.relation)
307         return store.insert_into(name, query, distinct)
308    
309     #                        Transaction Management                        #
310    
311     def start(self, isolation=None):
312         """Start a transaction."""
313         for store in self.stores.itervalues():
314             # By default, stores do not support transactions,
315             # in which case 'start' will be None.
316             if store.start:
317                 store.start(isolation)
318    
319     def commit(self):
320         """Commit the current transaction.
321         
322         If errors occur during this process, they are not trapped here.
323         You must either call rollback yourself (or fix the problem and
324         try to commit again).
325         """
326         for store in self.stores.itervalues():
327             if store.commit:
328                 store.commit()
329    
330     def rollback(self):
331         """Roll back the current transaction."""
332         for store in self.stores.itervalues():
333             if store.rollback:
334                 store.rollback()
335
336
Note: See TracBrowser for help on using the browser.