Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/branches/crazycache/dejavu/storage/partitions.py

Revision 548 (checked in by fumanchu, 4 years ago)

Pass unit call through VerticalPatitioner?.

  • Property svn:eol-style set to native
Line 
1 """A StorageManager for Dejavu which mediates multiple stores."""
2
3 try:
4     # Builtin in Python 2.4+
5     set
6 except NameError:
7     # Module in Python 2.3
8     from sets import Set as set
9
10 from geniusql import logic
11
12 import dejavu
13 from dejavu import errors, storage, logflags
14
15
16 class VerticalPartitioner(storage.StorageManager):
17     """A mediator for multiple vertically-partitioned stores."""
18    
19     __metaclass__ = dejavu._AttributeDocstrings
20    
21     stores = {}
22     stores__doc = "A map from store names to StorageManager instances."
23    
24     classmap = {}
25     classmap__doc = """
26     A map from Unit classes to lists of StorageManager instances.
27     
28     DDL methods will generally dispatch to all stores for each class.
29     DML methods will generally dispatch to classmap[unit.__class__][0];
30     those which involve multiple classes (e.g. multirecall), will try
31     to find a single store which handles all classes in the given
32     relation. To override this default search, you can add entries
33     to classmap of the form: {(clsA, clsB, clsC): [store1]},
34     which instructs the partitioner to use the given store for
35     any Join with the same order, such as (clsA << clsB) & clsC."""
36    
37     def __init__(self, allOptions={}):
38         storage.StorageManager.__init__(self, allOptions)
39         self.stores = {}
40         self.classmap = {}
41    
42     def migrate(self, classes, new_store, old_store=None, copy_only=False):
43         """Move all units of the given class(es) to new_store.
44         
45         copy_only: if False (the default), this copies the data to the new
46             store, deletes it from the old store, and updates self.classmap.
47             If True, the data is copied to the new store only.
48         """
49         if not isinstance(classes, (list, tuple)):
50             classes = list(classes)
51        
52         for cls in classes:
53             new_store.classes.add(cls)
54             if not new_store.has_storage(cls):
55                 new_store.create_storage(cls)
56            
57             if old_store is None:
58                 units = self.xrecall(cls)
59             else:
60                 units = old_store.xrecall(cls)
61            
62             for unit in units:
63                 new_store.reserve(unit)
64                 new_store.save(unit, forceSave=True)
65                 if not copy_only:
66                     self.destroy(unit)
67            
68             if not copy_only:
69                 classmap = self.classmap[cls]
70                 if old_store is None:
71                     for store in classmap:
72                         store.classes.remove(cls)
73                         classmap.remove(store)
74                 else:
75                     old_store.classes.remove(cls)
76                     if old_store in classmap:
77                         classmap.remove(old_store)
78                
79                 if new_store not in classmap:
80                     classmap.append(new_store)
81    
82     def migrate_all(self, new_store, old_store=None, copy_only=False):
83         """Copy all units (of old_store) to new_store."""
84         if old_store is None:
85             for store in self.classmap[cls]:
86                 self.migrate(store.classes, new_store, store, copy_only)
87         else:
88             self.migrate(store.classes, new_store, old_store, copy_only)
89    
90     def add_store(self, name, store):
91         """Register a StorageManager to be mediated.
92         
93         name: a key for the given store.
94         store: a StorageManager instance. The given store should have its
95             'classes' attribute already set (or you will have to populate
96             self.classmap manually).
97         """
98         self.stores[name] = store
99         for cls in store.classes:
100             self.classmap.setdefault(cls, []).insert(0, store)
101             self.register(cls)
102         return store
103    
104     def remove_store(self, name):
105         """Remove (unregister) the named store.
106         
107         All classes associated to the given store will be disassociated.
108         """
109         if name in self.stores:
110             store = self.stores[name]
111            
112             # Disassociate all registered classes with this store.
113             for cls, stores in self.classmap.items():
114                 if store in stores:
115                     stores.remove(store)
116                     if not stores:
117                         del self.classmap[cls]
118                         self.classes.remove(cls)
119            
120             del self.stores[name]
121    
122     def map(self, classes, conflicts='error'):
123         """Map classes to internal storage.
124         
125         conflicts: see errors.conflict.
126         """
127         storemap = {}
128         for cls in classes:
129             for store in self.classmap[cls]:
130                 bucket = storemap.setdefault(store, [])
131                 bucket.append(cls)
132         storemap = [(getattr(s, 'loadOrder', 5), s, c)
133                     for s, c in storemap.iteritems()]
134         storemap.sort()
135        
136         for order, store, classlist in storemap:
137             try:
138                 store.map(classlist, conflicts=conflicts)
139             except errors.MappingError, x:
140                 for key in self.stores:
141                     if self.stores[key] is store:
142                         break
143                 else:
144                     key = None
145                 x.args += (key, store.__class__)
146                 raise
147    
148     def map_all(self, conflicts='error'):
149         """Map all registered classes to internal storage structures.
150         
151         This method is idempotent, but that doesn't mean cheap. Try not
152         to call it very often (once at app startup is usually enough).
153         
154         conflicts: see errors.conflict.
155         """
156         storemap = {}
157         for cls, stores in self.classmap.iteritems():
158             for store in stores:
159                 bucket = storemap.setdefault(store, [])
160                 bucket.append(cls)
161         storemap = [(getattr(s, 'loadOrder', 5), s, c)
162                     for s, c in storemap.iteritems()]
163         storemap.sort()
164        
165         for order, store, classes in storemap:
166             try:
167                 store.map(classes, conflicts=conflicts)
168             except errors.MappingError, x:
169                 for key in self.stores:
170                     if self.stores[key] is store:
171                         break
172                 else:
173                     key = None
174                 x.args += (key, store.__class__)
175                 raise
176    
177     def shutdown(self, conflicts='error'):
178         """Shutdown self and all its stores.
179         
180         conflicts: see errors.conflict.
181         """
182         # Tell all stores to shut down.
183         stores = [(getattr(v, 'shutdownOrder', 5), v, k) for k, v in self.stores.iteritems()]
184         stores.sort()
185         for order, store, name in stores:
186             store.shutdown(conflicts=conflicts)
187    
188     def version(self):
189         """Return provider-specific version strings for each mediated store."""
190         output = []
191         for store in self.stores.itervalues():
192             if store.version:
193                 output.append(store.version())
194         return '\n\n'.join(output)
195    
196    
197     # --------------------- Unit Class Registration --------------------- #
198    
199     def create_database(self, conflicts='error'):
200         for s in self.stores.itervalues():
201             s.create_database(conflicts=conflicts)
202    
203     def drop_database(self, conflicts='error'):
204         for s in self.stores.itervalues():
205             s.drop_database(conflicts=conflicts)
206    
207     def create_storage(self, cls, conflicts='error'):
208         """Create storage space for cls."""
209         for store in self.classmap[cls]:
210             store.create_storage(cls, conflicts=conflicts)
211    
212     def has_storage(self, cls):
213         """If storage space for cls exists, return True (False otherwise)."""
214         for store in self.classmap[cls]:
215             if not store.has_storage(cls):
216                 return False
217         return True
218    
219     def drop_storage(self, cls, conflicts='error'):
220         """Remove storage space for cls."""
221         for store in self.classmap[cls]:
222             store.drop_storage(cls, conflicts=conflicts)
223    
224     def add_property(self, cls, name, conflicts='error'):
225         """Add storage space for the named property of the given cls."""
226         for store in self.classmap[cls]:
227             store.add_property(cls, name, conflicts=conflicts)
228    
229     def has_property(self, cls, name):
230         """If storage structures exist for the given property, return True."""
231         for store in self.classmap[cls]:
232             if not store.has_property(cls, name):
233                 return False
234         return True
235    
236     def drop_property(self, cls, name, conflicts='error'):
237         """Drop storage space for the named property of the given cls."""
238         for store in self.classmap[cls]:
239             store.drop_property(cls, name, conflicts=conflicts)
240    
241     def rename_property(self, cls, oldname, newname, conflicts='error'):
242         """Rename storage space for the property of the given cls."""
243         for store in self.classmap[cls]:
244             store.rename_property(cls, oldname, newname, conflicts=conflicts)
245    
246     def add_index(self, cls, name, conflicts='error'):
247         """Add an index to the given property.
248         
249         conflicts: see errors.conflict.
250         """
251         for store in self.classmap[cls]:
252             store.add_index(cls, name, conflicts=conflicts)
253    
254     def has_index(self, cls, name):
255         """If an index exists for the given property, return True."""
256         for store in self.classmap[cls]:
257             if not store.has_index(cls, name):
258                 return False
259         return True
260    
261     def drop_index(self, cls, name, conflicts='error'):
262         """Destroy any index on the given property.
263         
264         conflicts: see errors.conflict.
265         """
266         for store in self.classmap[cls]:
267             store.drop_index(cls, name, conflicts=conflicts)
268    
269    
270     # ------------------------------- DML ------------------------------- #
271    
272     def reserve(self, unit):
273         """Reserve storage space for the Unit."""
274         self.classmap[unit.__class__][0].reserve(unit)
275    
276     def save(self, unit, forceSave=False):
277         """Store the unit's property values."""
278         self.classmap[unit.__class__][0].save(unit, forceSave)
279    
280     def destroy(self, unit):
281         """Delete the unit."""
282         self.classmap[unit.__class__][0].destroy(unit)
283    
284     def unit(self, cls, **kwargs):
285         return self.classmap[cls][0].unit(cls, **kwargs)
286    
287     def xrecall(self, classes, expr=None, order=None, limit=None, offset=None):
288         """Yield a sequence of Unit instances which satisfy the expression."""
289         if isinstance(classes, dejavu.UnitJoin):
290             for unitrow in self._xmultirecall(classes, expr, order=order,
291                                               limit=limit, offset=offset):
292                 yield unitrow
293         else:
294             store = self.classmap[classes][0]
295             for unit in store.xrecall(classes, expr, order, limit, offset):
296                 yield unit
297    
298     def _xmultirecall(self, classes, expr=None,
299                       order=None, limit=None, offset=None):
300         """Yield lists of units of the given classes which match expr.
301         
302         This does not yet handle multiple classes in disparate stores.
303         """
304         return self._single_store(classes)._xmultirecall(
305             classes, expr, order=order, limit=limit, offset=offset)
306    
307     def _single_store(self, relation):
308         """Return the store for the given relation (or raise ValueError)."""
309         if hasattr(relation, "class1"):
310             # This is a UnitJoin.
311             try:
312                 # First, see if there's an explicit entry in self.classmap
313                 # for tuple([cls for cls in relation]).
314                 return self.classmap[tuple(relation)][0]
315             except (KeyError, IndexError):
316                 # Otherwise, return the first store that handles all
317                 # classes in relation.
318                 stores = None
319                 for cls in relation:
320                     if stores is None:
321                         stores = set(self.classmap[cls])
322                     else:
323                         stores &= set(self.classmap[cls])
324                
325                 for store in stores or []:
326                     return store
327                
328                 raise ValueError("This operation does not support multiple"
329                                  " classes in disparate stores.")
330         else:
331             return self.classmap[relation][0]
332    
333     def xview(self, query, order=None, limit=None, offset=None, distinct=False):
334         """Yield tuples of attribute values for the given query.
335         
336         Each yielded value will be a list of values, in the same order as
337         the Query.attributes. This facilitates unpacking in iterative
338         consumer code like:
339         
340         for id, name in store.view(Query(Invoice, ['ID', 'Name'], f)):
341             print id, ": ", name
342         
343         This is generally much faster than recall, and should be preferred
344         for performance-sensitive code.
345         """
346         if not isinstance(query, dejavu.Query):
347             query = dejavu.Query(*query)
348        
349         if self.logflags & logflags.VIEW:
350             self.log(logflags.VIEW.message(query, distinct))
351        
352         store = self._single_store(query.relation)
353         for row in store.xview(query, order=order, limit=limit,
354                                offset=offset, distinct=distinct):
355             yield row
356    
357     def insert_into(self, name, query, distinct=False):
358         """INSERT matching data INTO a new class and return the class."""
359         if not isinstance(query, dejavu.Query):
360             query = dejavu.Query(*query)
361        
362         store = self._single_store(query.relation)
363         return store.insert_into(name, query, distinct)
364    
365     #                        Transaction Management                        #
366    
367     def start(self, isolation=None):
368         """Start a transaction."""
369         for store in self.stores.itervalues():
370             # By default, stores do not support transactions,
371             # in which case 'start' will be None.
372             if store.start:
373                 store.start(isolation)
374    
375     def commit(self):
376         """Commit the current transaction.
377         
378         If errors occur during this process, they are not trapped here.
379         You must either call rollback yourself (or fix the problem and
380         try to commit again).
381         """
382         for store in self.stores.itervalues():
383             if store.commit:
384                 store.commit()
385    
386     def rollback(self):
387         """Roll back the current transaction."""
388         for store in self.stores.itervalues():
389             if store.rollback:
390                 store.rollback()
391
392
Note: See TracBrowser for help on using the browser.