| 1 |
"""A StorageManager for Dejavu which mediates multiple stores.""" |
|---|
| 2 |
|
|---|
| 3 |
try: |
|---|
| 4 |
|
|---|
| 5 |
set |
|---|
| 6 |
except NameError: |
|---|
| 7 |
|
|---|
| 8 |
from sets import Set as set |
|---|
| 9 |
|
|---|
| 10 |
from geniusql import logic |
|---|
| 11 |
|
|---|
| 12 |
import dejavu |
|---|
| 13 |
from dejavu import errors, storage, logflags |
|---|
| 14 |
|
|---|
| 15 |
|
|---|
| 16 |
class VerticalPartitioner(storage.StorageManager): |
|---|
| 17 |
"""A mediator for multiple vertically-partitioned stores.""" |
|---|
| 18 |
|
|---|
| 19 |
__metaclass__ = dejavu._AttributeDocstrings |
|---|
| 20 |
|
|---|
| 21 |
stores = {} |
|---|
| 22 |
stores__doc = "A map from store names to StorageManager instances." |
|---|
| 23 |
|
|---|
| 24 |
classmap = {} |
|---|
| 25 |
classmap__doc = """ |
|---|
| 26 |
A map from Unit classes to lists of StorageManager instances. |
|---|
| 27 |
|
|---|
| 28 |
DDL methods will generally dispatch to all stores for each class. |
|---|
| 29 |
DML methods will generally dispatch to classmap[unit.__class__][0]; |
|---|
| 30 |
those which involve multiple classes (e.g. multirecall), will try |
|---|
| 31 |
to find a single store which handles all classes in the given |
|---|
| 32 |
relation. To override this default search, you can add entries |
|---|
| 33 |
to classmap of the form: {(clsA, clsB, clsC): [store1]}, |
|---|
| 34 |
which instructs the partitioner to use the given store for |
|---|
| 35 |
any Join with the same order, such as (clsA << clsB) & clsC.""" |
|---|
| 36 |
|
|---|
| 37 |
def __init__(self, allOptions={}): |
|---|
| 38 |
storage.StorageManager.__init__(self, allOptions) |
|---|
| 39 |
self.stores = {} |
|---|
| 40 |
self.classmap = {} |
|---|
| 41 |
|
|---|
| 42 |
def migrate(self, classes, new_store, old_store=None, copy_only=False): |
|---|
| 43 |
"""Move all units of the given class(es) to new_store. |
|---|
| 44 |
|
|---|
| 45 |
copy_only: if False (the default), this copies the data to the new |
|---|
| 46 |
store, deletes it from the old store, and updates self.classmap. |
|---|
| 47 |
If True, the data is copied to the new store only. |
|---|
| 48 |
""" |
|---|
| 49 |
if not isinstance(classes, (list, tuple)): |
|---|
| 50 |
classes = list(classes) |
|---|
| 51 |
|
|---|
| 52 |
for cls in classes: |
|---|
| 53 |
new_store.classes.add(cls) |
|---|
| 54 |
if not new_store.has_storage(cls): |
|---|
| 55 |
new_store.create_storage(cls) |
|---|
| 56 |
|
|---|
| 57 |
if old_store is None: |
|---|
| 58 |
units = self.xrecall(cls) |
|---|
| 59 |
else: |
|---|
| 60 |
units = old_store.xrecall(cls) |
|---|
| 61 |
|
|---|
| 62 |
for unit in units: |
|---|
| 63 |
new_store.reserve(unit) |
|---|
| 64 |
new_store.save(unit, forceSave=True) |
|---|
| 65 |
if not copy_only: |
|---|
| 66 |
self.destroy(unit) |
|---|
| 67 |
|
|---|
| 68 |
if not copy_only: |
|---|
| 69 |
classmap = self.classmap[cls] |
|---|
| 70 |
if old_store is None: |
|---|
| 71 |
for store in classmap: |
|---|
| 72 |
store.classes.remove(cls) |
|---|
| 73 |
classmap.remove(store) |
|---|
| 74 |
else: |
|---|
| 75 |
old_store.classes.remove(cls) |
|---|
| 76 |
if old_store in classmap: |
|---|
| 77 |
classmap.remove(old_store) |
|---|
| 78 |
|
|---|
| 79 |
if new_store not in classmap: |
|---|
| 80 |
classmap.append(new_store) |
|---|
| 81 |
|
|---|
| 82 |
def migrate_all(self, new_store, old_store=None, copy_only=False): |
|---|
| 83 |
"""Copy all units (of old_store) to new_store.""" |
|---|
| 84 |
if old_store is None: |
|---|
| 85 |
for store in self.classmap[cls]: |
|---|
| 86 |
self.migrate(store.classes, new_store, store, copy_only) |
|---|
| 87 |
else: |
|---|
| 88 |
self.migrate(store.classes, new_store, old_store, copy_only) |
|---|
| 89 |
|
|---|
| 90 |
def add_store(self, name, store): |
|---|
| 91 |
"""Register a StorageManager to be mediated. |
|---|
| 92 |
|
|---|
| 93 |
name: a key for the given store. |
|---|
| 94 |
store: a StorageManager instance. The given store should have its |
|---|
| 95 |
'classes' attribute already set (or you will have to populate |
|---|
| 96 |
self.classmap manually). |
|---|
| 97 |
""" |
|---|
| 98 |
self.stores[name] = store |
|---|
| 99 |
for cls in store.classes: |
|---|
| 100 |
self.classmap.setdefault(cls, []).insert(0, store) |
|---|
| 101 |
self.register(cls) |
|---|
| 102 |
return store |
|---|
| 103 |
|
|---|
| 104 |
def remove_store(self, name): |
|---|
| 105 |
"""Remove (unregister) the named store. |
|---|
| 106 |
|
|---|
| 107 |
All classes associated to the given store will be disassociated. |
|---|
| 108 |
""" |
|---|
| 109 |
if name in self.stores: |
|---|
| 110 |
store = self.stores[name] |
|---|
| 111 |
|
|---|
| 112 |
|
|---|
| 113 |
for cls, stores in self.classmap.items(): |
|---|
| 114 |
if store in stores: |
|---|
| 115 |
stores.remove(store) |
|---|
| 116 |
if not stores: |
|---|
| 117 |
del self.classmap[cls] |
|---|
| 118 |
self.classes.remove(cls) |
|---|
| 119 |
|
|---|
| 120 |
del self.stores[name] |
|---|
| 121 |
|
|---|
| 122 |
def map(self, classes, conflicts='error'): |
|---|
| 123 |
"""Map classes to internal storage. |
|---|
| 124 |
|
|---|
| 125 |
conflicts: see errors.conflict. |
|---|
| 126 |
""" |
|---|
| 127 |
storemap = {} |
|---|
| 128 |
for cls in classes: |
|---|
| 129 |
for store in self.classmap[cls]: |
|---|
| 130 |
bucket = storemap.setdefault(store, []) |
|---|
| 131 |
bucket.append(cls) |
|---|
| 132 |
storemap = [(getattr(s, 'loadOrder', 5), s, c) |
|---|
| 133 |
for s, c in storemap.iteritems()] |
|---|
| 134 |
storemap.sort() |
|---|
| 135 |
|
|---|
| 136 |
for order, store, classlist in storemap: |
|---|
| 137 |
try: |
|---|
| 138 |
store.map(classlist, conflicts=conflicts) |
|---|
| 139 |
except errors.MappingError, x: |
|---|
| 140 |
for key in self.stores: |
|---|
| 141 |
if self.stores[key] is store: |
|---|
| 142 |
break |
|---|
| 143 |
else: |
|---|
| 144 |
key = None |
|---|
| 145 |
x.args += (key, store.__class__) |
|---|
| 146 |
raise |
|---|
| 147 |
|
|---|
| 148 |
def map_all(self, conflicts='error'): |
|---|
| 149 |
"""Map all registered classes to internal storage structures. |
|---|
| 150 |
|
|---|
| 151 |
This method is idempotent, but that doesn't mean cheap. Try not |
|---|
| 152 |
to call it very often (once at app startup is usually enough). |
|---|
| 153 |
|
|---|
| 154 |
conflicts: see errors.conflict. |
|---|
| 155 |
""" |
|---|
| 156 |
storemap = {} |
|---|
| 157 |
for cls, stores in self.classmap.iteritems(): |
|---|
| 158 |
for store in stores: |
|---|
| 159 |
bucket = storemap.setdefault(store, []) |
|---|
| 160 |
bucket.append(cls) |
|---|
| 161 |
storemap = [(getattr(s, 'loadOrder', 5), s, c) |
|---|
| 162 |
for s, c in storemap.iteritems()] |
|---|
| 163 |
storemap.sort() |
|---|
| 164 |
|
|---|
| 165 |
for order, store, classes in storemap: |
|---|
| 166 |
try: |
|---|
| 167 |
store.map(classes, conflicts=conflicts) |
|---|
| 168 |
except errors.MappingError, x: |
|---|
| 169 |
for key in self.stores: |
|---|
| 170 |
if self.stores[key] is store: |
|---|
| 171 |
break |
|---|
| 172 |
else: |
|---|
| 173 |
key = None |
|---|
| 174 |
x.args += (key, store.__class__) |
|---|
| 175 |
raise |
|---|
| 176 |
|
|---|
| 177 |
def shutdown(self, conflicts='error'): |
|---|
| 178 |
"""Shutdown self and all its stores. |
|---|
| 179 |
|
|---|
| 180 |
conflicts: see errors.conflict. |
|---|
| 181 |
""" |
|---|
| 182 |
|
|---|
| 183 |
stores = [(getattr(v, 'shutdownOrder', 5), v, k) for k, v in self.stores.iteritems()] |
|---|
| 184 |
stores.sort() |
|---|
| 185 |
for order, store, name in stores: |
|---|
| 186 |
store.shutdown(conflicts=conflicts) |
|---|
| 187 |
|
|---|
| 188 |
def version(self): |
|---|
| 189 |
"""Return provider-specific version strings for each mediated store.""" |
|---|
| 190 |
output = [] |
|---|
| 191 |
for store in self.stores.itervalues(): |
|---|
| 192 |
if store.version: |
|---|
| 193 |
output.append(store.version()) |
|---|
| 194 |
return '\n\n'.join(output) |
|---|
| 195 |
|
|---|
| 196 |
|
|---|
| 197 |
|
|---|
| 198 |
|
|---|
| 199 |
def create_database(self, conflicts='error'): |
|---|
| 200 |
for s in self.stores.itervalues(): |
|---|
| 201 |
s.create_database(conflicts=conflicts) |
|---|
| 202 |
|
|---|
| 203 |
def drop_database(self, conflicts='error'): |
|---|
| 204 |
for s in self.stores.itervalues(): |
|---|
| 205 |
s.drop_database(conflicts=conflicts) |
|---|
| 206 |
|
|---|
| 207 |
def create_storage(self, cls, conflicts='error'): |
|---|
| 208 |
"""Create storage space for cls.""" |
|---|
| 209 |
for store in self.classmap[cls]: |
|---|
| 210 |
store.create_storage(cls, conflicts=conflicts) |
|---|
| 211 |
|
|---|
| 212 |
def has_storage(self, cls): |
|---|
| 213 |
"""If storage space for cls exists, return True (False otherwise).""" |
|---|
| 214 |
for store in self.classmap[cls]: |
|---|
| 215 |
if not store.has_storage(cls): |
|---|
| 216 |
return False |
|---|
| 217 |
return True |
|---|
| 218 |
|
|---|
| 219 |
def drop_storage(self, cls, conflicts='error'): |
|---|
| 220 |
"""Remove storage space for cls.""" |
|---|
| 221 |
for store in self.classmap[cls]: |
|---|
| 222 |
store.drop_storage(cls, conflicts=conflicts) |
|---|
| 223 |
|
|---|
| 224 |
def add_property(self, cls, name, conflicts='error'): |
|---|
| 225 |
"""Add storage space for the named property of the given cls.""" |
|---|
| 226 |
for store in self.classmap[cls]: |
|---|
| 227 |
store.add_property(cls, name, conflicts=conflicts) |
|---|
| 228 |
|
|---|
| 229 |
def has_property(self, cls, name): |
|---|
| 230 |
"""If storage structures exist for the given property, return True.""" |
|---|
| 231 |
for store in self.classmap[cls]: |
|---|
| 232 |
if not store.has_property(cls, name): |
|---|
| 233 |
return False |
|---|
| 234 |
return True |
|---|
| 235 |
|
|---|
| 236 |
def drop_property(self, cls, name, conflicts='error'): |
|---|
| 237 |
"""Drop storage space for the named property of the given cls.""" |
|---|
| 238 |
for store in self.classmap[cls]: |
|---|
| 239 |
store.drop_property(cls, name, conflicts=conflicts) |
|---|
| 240 |
|
|---|
| 241 |
def rename_property(self, cls, oldname, newname, conflicts='error'): |
|---|
| 242 |
"""Rename storage space for the property of the given cls.""" |
|---|
| 243 |
for store in self.classmap[cls]: |
|---|
| 244 |
store.rename_property(cls, oldname, newname, conflicts=conflicts) |
|---|
| 245 |
|
|---|
| 246 |
def add_index(self, cls, name, conflicts='error'): |
|---|
| 247 |
"""Add an index to the given property. |
|---|
| 248 |
|
|---|
| 249 |
conflicts: see errors.conflict. |
|---|
| 250 |
""" |
|---|
| 251 |
for store in self.classmap[cls]: |
|---|
| 252 |
store.add_index(cls, name, conflicts=conflicts) |
|---|
| 253 |
|
|---|
| 254 |
def has_index(self, cls, name): |
|---|
| 255 |
"""If an index exists for the given property, return True.""" |
|---|
| 256 |
for store in self.classmap[cls]: |
|---|
| 257 |
if not store.has_index(cls, name): |
|---|
| 258 |
return False |
|---|
| 259 |
return True |
|---|
| 260 |
|
|---|
| 261 |
def drop_index(self, cls, name, conflicts='error'): |
|---|
| 262 |
"""Destroy any index on the given property. |
|---|
| 263 |
|
|---|
| 264 |
conflicts: see errors.conflict. |
|---|
| 265 |
""" |
|---|
| 266 |
for store in self.classmap[cls]: |
|---|
| 267 |
store.drop_index(cls, name, conflicts=conflicts) |
|---|
| 268 |
|
|---|
| 269 |
|
|---|
| 270 |
|
|---|
| 271 |
|
|---|
| 272 |
def reserve(self, unit): |
|---|
| 273 |
"""Reserve storage space for the Unit.""" |
|---|
| 274 |
self.classmap[unit.__class__][0].reserve(unit) |
|---|
| 275 |
|
|---|
| 276 |
def save(self, unit, forceSave=False): |
|---|
| 277 |
"""Store the unit's property values.""" |
|---|
| 278 |
self.classmap[unit.__class__][0].save(unit, forceSave) |
|---|
| 279 |
|
|---|
| 280 |
def destroy(self, unit): |
|---|
| 281 |
"""Delete the unit.""" |
|---|
| 282 |
self.classmap[unit.__class__][0].destroy(unit) |
|---|
| 283 |
|
|---|
| 284 |
def unit(self, cls, **kwargs): |
|---|
| 285 |
return self.classmap[cls][0].unit(cls, **kwargs) |
|---|
| 286 |
|
|---|
| 287 |
def xrecall(self, classes, expr=None, order=None, limit=None, offset=None): |
|---|
| 288 |
"""Yield a sequence of Unit instances which satisfy the expression.""" |
|---|
| 289 |
if isinstance(classes, dejavu.UnitJoin): |
|---|
| 290 |
for unitrow in self._xmultirecall(classes, expr, order=order, |
|---|
| 291 |
limit=limit, offset=offset): |
|---|
| 292 |
yield unitrow |
|---|
| 293 |
else: |
|---|
| 294 |
store = self.classmap[classes][0] |
|---|
| 295 |
for unit in store.xrecall(classes, expr, order, limit, offset): |
|---|
| 296 |
yield unit |
|---|
| 297 |
|
|---|
| 298 |
def _xmultirecall(self, classes, expr=None, |
|---|
| 299 |
order=None, limit=None, offset=None): |
|---|
| 300 |
"""Yield lists of units of the given classes which match expr. |
|---|
| 301 |
|
|---|
| 302 |
This does not yet handle multiple classes in disparate stores. |
|---|
| 303 |
""" |
|---|
| 304 |
return self._single_store(classes)._xmultirecall( |
|---|
| 305 |
classes, expr, order=order, limit=limit, offset=offset) |
|---|
| 306 |
|
|---|
| 307 |
def _single_store(self, relation): |
|---|
| 308 |
"""Return the store for the given relation (or raise ValueError).""" |
|---|
| 309 |
if hasattr(relation, "class1"): |
|---|
| 310 |
|
|---|
| 311 |
try: |
|---|
| 312 |
|
|---|
| 313 |
|
|---|
| 314 |
return self.classmap[tuple(relation)][0] |
|---|
| 315 |
except (KeyError, IndexError): |
|---|
| 316 |
|
|---|
| 317 |
|
|---|
| 318 |
stores = None |
|---|
| 319 |
for cls in relation: |
|---|
| 320 |
if stores is None: |
|---|
| 321 |
stores = set(self.classmap[cls]) |
|---|
| 322 |
else: |
|---|
| 323 |
stores &= set(self.classmap[cls]) |
|---|
| 324 |
|
|---|
| 325 |
for store in stores or []: |
|---|
| 326 |
return store |
|---|
| 327 |
|
|---|
| 328 |
raise ValueError("This operation does not support multiple" |
|---|
| 329 |
" classes in disparate stores.") |
|---|
| 330 |
else: |
|---|
| 331 |
return self.classmap[relation][0] |
|---|
| 332 |
|
|---|
| 333 |
def xview(self, query, order=None, limit=None, offset=None, distinct=False): |
|---|
| 334 |
"""Yield tuples of attribute values for the given query. |
|---|
| 335 |
|
|---|
| 336 |
Each yielded value will be a list of values, in the same order as |
|---|
| 337 |
the Query.attributes. This facilitates unpacking in iterative |
|---|
| 338 |
consumer code like: |
|---|
| 339 |
|
|---|
| 340 |
for id, name in store.view(Query(Invoice, ['ID', 'Name'], f)): |
|---|
| 341 |
print id, ": ", name |
|---|
| 342 |
|
|---|
| 343 |
This is generally much faster than recall, and should be preferred |
|---|
| 344 |
for performance-sensitive code. |
|---|
| 345 |
""" |
|---|
| 346 |
if not isinstance(query, dejavu.Query): |
|---|
| 347 |
query = dejavu.Query(*query) |
|---|
| 348 |
|
|---|
| 349 |
if self.logflags & logflags.VIEW: |
|---|
| 350 |
self.log(logflags.VIEW.message(query, distinct)) |
|---|
| 351 |
|
|---|
| 352 |
store = self._single_store(query.relation) |
|---|
| 353 |
for row in store.xview(query, order=order, limit=limit, |
|---|
| 354 |
offset=offset, distinct=distinct): |
|---|
| 355 |
yield row |
|---|
| 356 |
|
|---|
| 357 |
def insert_into(self, name, query, distinct=False): |
|---|
| 358 |
"""INSERT matching data INTO a new class and return the class.""" |
|---|
| 359 |
if not isinstance(query, dejavu.Query): |
|---|
| 360 |
query = dejavu.Query(*query) |
|---|
| 361 |
|
|---|
| 362 |
store = self._single_store(query.relation) |
|---|
| 363 |
return store.insert_into(name, query, distinct) |
|---|
| 364 |
|
|---|
| 365 |
|
|---|
| 366 |
|
|---|
| 367 |
def start(self, isolation=None): |
|---|
| 368 |
"""Start a transaction.""" |
|---|
| 369 |
for store in self.stores.itervalues(): |
|---|
| 370 |
|
|---|
| 371 |
|
|---|
| 372 |
if store.start: |
|---|
| 373 |
store.start(isolation) |
|---|
| 374 |
|
|---|
| 375 |
def commit(self): |
|---|
| 376 |
"""Commit the current transaction. |
|---|
| 377 |
|
|---|
| 378 |
If errors occur during this process, they are not trapped here. |
|---|
| 379 |
You must either call rollback yourself (or fix the problem and |
|---|
| 380 |
try to commit again). |
|---|
| 381 |
""" |
|---|
| 382 |
for store in self.stores.itervalues(): |
|---|
| 383 |
if store.commit: |
|---|
| 384 |
store.commit() |
|---|
| 385 |
|
|---|
| 386 |
def rollback(self): |
|---|
| 387 |
"""Roll back the current transaction.""" |
|---|
| 388 |
for store in self.stores.itervalues(): |
|---|
| 389 |
if store.rollback: |
|---|
| 390 |
store.rollback() |
|---|
| 391 |
|
|---|
| 392 |
|
|---|