Changeset 51
- Timestamp:
- 01/12/05 07:48:53
- Files:
-
- trunk/__init__.py (modified) (4 diffs)
- trunk/containers.py (modified) (2 diffs)
- trunk/doc/intro.html (modified) (2 diffs)
- trunk/doc/modeling.html (modified) (2 diffs)
- trunk/engines.py (modified) (2 diffs)
- trunk/storage/__init__.py (modified) (1 diff)
- trunk/storage/db.py (modified) (14 diffs)
- trunk/storage/sockets.py (modified) (3 diffs)
- trunk/storage/storeado.py (modified) (11 diffs)
- trunk/storage/storemysql.py (modified) (5 diffs)
- trunk/storage/storepypgsql.py (modified) (2 diffs)
- trunk/storage/zoo_fixture.py (modified) (10 diffs)
- trunk/test_containers.py (modified) (1 diff)
- trunk/test_dejavu.py (modified) (1 diff)
- trunk/zoo.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/__init__.py
r50 r51 500 500 self.defaultStore = None 501 501 self.stores = {} 502 self. roster = Prism('name', 'cls', 'store')502 self._registered_classes = {} 503 503 self.associations = Graph() 504 504 self.engine_functions = {} … … 517 517 stores.sort() 518 518 519 import xray520 519 for order, name, options in stores: 521 storage_mgr_class = xray.classes(options[u'Class']) 522 store = storage_mgr_class(name, self, options) 523 524 unitClasses = [] 525 for x in options.get('Units', '').split(","): 526 clsname = x.strip() 527 if clsname: 528 unitClasses.append(clsname) 529 530 self.add_store(name, store, unitClasses) 531 532 def add_store(self, name, store, unitClasses=[]): 533 """Register a StorageManager.""" 520 self.add_store(name, options[u'Class'], options) 521 522 def add_store(self, name, store, options=None): 523 """add_store(name, store, options=None). Register a StorageManager. 524 525 The 'store' argument may be the name of a Storage Manager class; 526 if so, it must be importable (that is, it must have the full dotted 527 package name). 528 """ 529 530 if isinstance(store, basestring): 531 import xray 532 store = xray.classes(store)(name, self, options or {}) 533 534 534 self.stores[name] = store 535 536 # Fill Roster, a Prism of class-associated data. 537 if unitClasses: 538 for clsname in unitClasses: 539 if clsname: 540 self.roster.add(name=clsname, cls=None, store=store) 541 else: 535 if not store.classnames: 542 536 self.defaultStore = store 543 537 … … 559 553 def register(self, cls): 560 554 """register(cls) -> Assert that Units of class 'cls' will be handled.""" 561 try: 562 row = self.roster.row_number(name=cls.__name__) 563 except ValueError: 564 self.roster.add(name=cls.__name__, cls=cls, store=None) 565 else: 566 # We left cls == None in _load(). Set it now. 567 self.roster.facets['cls'][row] = cls 555 # We must allow modules to register classes before any stores have 556 # been added, but not overwrite a store which has already been found. 557 if cls not in self._registered_classes: 558 self._registered_classes[cls] = None 568 559 569 560 # Register any association(s) in an undirected graph. … … 577 568 578 569 def class_by_name(self, classname): 579 return self.roster.cls(name=classname) 570 for cls in self._registered_classes: 571 if cls.__name__ == classname: 572 return cls 573 raise KeyError("No registered class found for '%s'." % classname) 580 574 581 575 def storage(self, cls): 582 s = self.roster.store(cls=cls) 583 if s is None: 584 s = self.defaultStore 585 if s is None: 586 raise DejavuError("No storage defined for class '%s'" % 587 cls.__name__) 588 return s 576 found = self._registered_classes.get(cls) 577 578 if found: 579 return found 580 581 # Search all stores for the class name. 582 clsname = cls.__name__ 583 for store in self.stores.itervalues(): 584 if clsname in store.classnames: 585 found = store 586 break 587 found = found or self.defaultStore 588 if found is None: 589 raise KeyError("No store found for '%s' and no " 590 "default store." % clsname) 591 592 self._registered_classes[cls] = found 593 return found 589 594 590 595 def create_storage(self, cls): 596 """create_storage(cls). Create storage space for cls.""" 591 597 self.storage(cls).create_storage(cls) 598 599 def migrate_class(self, cls, new_store): 600 """migrate_class(cls, new_store). Copy all units of cls to new store.""" 601 new_store.create_storage(cls) 602 box = self.new_sandbox() 603 for unit in box.recall(cls): 604 new_store.reserve(unit) 605 self._registered_classes[cls] = new_store 606 box.flush(cls) 607 608 def migrate(self, new_store, old_store=None): 609 """migrate(new_store, old_store=None). Copy all units (of old_store) to new store.""" 610 for cls in self._registered_classes: 611 store = self.storage(cls) 612 if old_store is None or old_store is store: 613 self.migrate_class(cls, new_store) 592 614 593 615 trunk/containers.py
r50 r51 9 9 and map. 10 10 11 Find herein the containers: warehouse , Graph, Index, and Prism.11 Find herein the containers: warehouse and Graph. 12 12 13 13 This work, including the source code, documentation … … 171 171 return shortest 172 172 173 174 missing=object()175 176 class Index(dict):177 """Index(*indices) -> A container of objects, indexed by attribute.178 179 Sometimes, we want to store objects in an associative array,180 but need lookup tools which are faster than simply iterating181 through each object in the array. For example, given a set of182 1,000,000 Artwork objects, it might be too expensive to find183 all works by Picasso by examining each Artwork, accessing its184 'Artist' attribute, and comparing that to 'Picasso'.185 186 One solution to this issue is to build multiple asociative arrays,187 where each keyset represents an index of attributes (such as 'Artist').188 Databases are the most common implementers of this technique.189 Perhaps a more complex diagram will help:190 191 'Name' 'Artist' 'Rate' Attributes192 / \ | / \193 Guer Frenzy Picasso 125.99 300 "Handles"194 | | | | |195 | | / \ | |196 obj1 obj2 obj1 obj2 obj1 obj2197 ^ ^ ^ ^ ^ ^198 | | | | | |199 | +----------|----+-----------|-------+- obj2200 | | |201 +------------------+----------------+- obj1202 203 Notice that each of the two objects is referenced exactly three204 times; that is, once for each attribute which we have declared205 to be an indexed attribute. Also notice that these are references,206 not copies.207 208 Our implementation of such an Index looks like this:209 {'Name': {'Guernica': [obj1], 'Frenzy': [obj2]},210 'Artist': {'Picasso': [obj1, obj2]},211 'Rate': {300.00: [obj2], 125.99: [obj1]},212 }213 214 A note about naming: nested dictionaries are somewhat difficult to keep215 in your head, because one level's "key" is another level's "value".216 In the Index class, the names are:217 Index =218 {attribute: {handle: bucket,219 handle: bucket,220 ...,221 },222 attribute: {handle: bucket,223 handle: bucket,224 ...,225 },226 }227 ...where each 'bucket' is a list of objects. In the example above,228 'Artist' is an attribute, 'Picasso' is a handle (one of the existing229 values of obj.Artist), and [obj1, obj2] is a bucket.230 231 store(obj): add obj to each index based on attributes of obj.232 233 retrieve(attr, cmpfunc, *args): match objects based on indexed attributes.234 attr = the indexed attribute to look up.235 cmpfunc = a callable to which we will pass each instance attribute,236 expecting a true or false result. For example, to retrieve all237 objects with a 'Rate' attribute greater than 200, call:238 retrieve('Rate', lambda x: x > 200)239 --OR--240 retrieve('Rate', operator.gt, 200)241 *args: any extra arguments to be passed to cmpfunc.242 """243 244 def __init__(self, *indices):245 for attr in indices:246 self[attr] = {}247 248 def store(self, obj):249 """Add obj to self, one reference per index."""250 for attr, index in self.iteritems():251 bucket = index.setdefault(getattr(obj, attr), [])252 if obj not in bucket:253 bucket.append(obj)254 255 def reindex(self, obj, attr, oldhandle=missing):256 """The attr of obj has changed. Move it to the correct index."""257 index = self[attr]258 if oldhandle is missing:259 for handle, bucket in index.iteritems():260 if obj in bucket:261 bucket.remove(obj)262 if len(bucket) == 0:263 del index[handle]264 break265 else:266 self._deindex(obj, attr, oldhandle)267 bucket = index.setdefault(getattr(obj, attr), [])268 if obj not in bucket:269 bucket.append(obj)270 271 def _deindex(self, obj, attr, handle):272 index = self[attr]273 try:274 bucket = index[handle]275 except KeyError:276 pass277 else:278 try:279 bucket.remove(obj)280 except ValueError:281 pass282 if len(bucket) == 0:283 del index[handle]284 285 def remove(self, obj):286 """Delete obj from each index."""287 for attr in self.iterkeys():288 handle = getattr(obj, attr)289 self._deindex(obj, attr, handle)290 291 def handles(self, attr):292 """Generator for all handles for the attribute."""293 # Just let KeyErrors propagate outward.294 for handle in self[attr].iterkeys():295 yield handle296 297 def bucket(self, attr, handle):298 """Obtain a list of all objects in a given bucket."""299 # Just let KeyErrors propagate outward.300 return self[attr][handle]301 302 def exists(self, attr, handle):303 """Return True if the handle exists in attr, False otherwise."""304 return self[attr].has_key(handle)305 306 def retrieve(self, attr, cmpfunc=None, *args):307 """Obtain stored objects based on the value of an attribute.308 309 If cmpfunc is None, return all objects. Otherwise, we include310 each bucket's objects if cmpfunc(handle, *args) is True.311 """312 product = []313 if cmpfunc is None:314 for bucket in self[attr].itervalues():315 product.extend(bucket)316 else:317 for handle, bucket in self[attr].iteritems():318 if cmpfunc(handle, *args):319 product.extend(bucket)320 return product321 322 323 class Prism(object):324 """Prism(*indices) -> A colection which simplifies attribute retrieval.325 326 For many homogeneous collections, it is common to index the contained327 objects by an attribute of those objects. For example, if all of your328 objects have an 'id' attribute, you might collect them in a dictionary329 where each key is the id, and each value is the corresponding object.330 331 Some collections, however, benefit from lookups in multiple directions,332 which we might call 'facets' of the contained objects. The Prism class333 provides the ability to specify multiple indices/facets (all indices334 must be unique).335 336 In addition, multi-facet lookups are most commonly used to retrieve,337 not the contained object, but one of the other facets of that object.338 The Prism class provides this behavior via 'function refraction'; if339 you call Prism.far_facet_name(facet_name=value), this is equivalent to340 the dict-style call: indexes[facet_name].get(value).far_facet_name.341 For example, if you have the 'id' attribute of an object, and want to342 retrieve the 'color' attribute, call Prism.color(id=392).343 344 However, rather than store objects, the Prism stores values; you can345 think of it as a table instead of a set of objects. The issue with this346 is that, if one of your facets isn't a unique index, you'll get back347 the first match, not all matches. But sometimes that's what you want.348 349 Finally, there aren't any convenience methods yet for modifying entries;350 you can add and retrieve pretty well, but not mutate or remove without351 mucking about with internals. row_number() should help a bit.352 """353 354 def __init__(self, *facets):355 self.facets = {}356 for name in facets:357 self.facets[name] = []358 self._add_refractor(name)359 360 def _add_refractor(self, name):361 """Add an instancemethod to self which looks up far facets."""362 def refractor(self, **facet):363 """Return the '%s' value associated with the specified facet.""" % name364 if len(facet) != 1:365 raise TypeError(u'%s() takes exactly one keyword argument '366 u'(%s given).' % (name, len(facet)))367 return self.facets[name][self.row_number(**facet)]368 setattr(self, name, new.instancemethod(refractor, self, Prism))369 370 def add(self, **facets):371 """add(**facets) -> add a row to the Prism."""372 for name, row in self.facets.iteritems():373 row.append(facets.get(name))374 375 def row_number(self, **facet):376 """row_number(**facet) -> row number, given one facet's key+value."""377 k, v = facet.popitem()378 f = self.facets[k]379 try:380 return f.index(v)381 except ValueError:382 raise ValueError("'%s' is not a known %s" % (v, k))383 384 def row(self, **facet):385 """row(**facet) -> all facets of a row given one facet's key+value."""386 number = self.row_number(**facet)387 return dict([(k, v[number]) for k, v in self.facets.iteritems()])388 trunk/doc/intro.html
r50 r51 70 70 zookeeper.py 71 71 <pre>import dejavu 72 from dejavu.storage.storeado import StorageManagerADO_MSAccess as SM73 74 72 75 73 class Zoo(dejavu.Unit): … … 91 89 conf = {u'Connect': 92 90 r"PROVIDER=MICROSOFT.JET.OLEDB.4.0;DATA SOURCE=C:\zookeeper.mdb;"} 93 arena.add_store( 'main', SM("mySM", arena, conf))91 arena.add_store("main", "dejavu.storage.storeado.StorageManagerADO_MSAccess", conf)) 94 92 arena.register_all(globals())</pre> 95 93 trunk/doc/modeling.html
r50 r51 511 511 <h4>Loading Stores</h4> 512 512 <p>You <b>may</b> manually set up Storage Managers by calling 513 <tt>Arena().add_store(name, store, unitClasses)</tt>. But, you513 <tt>Arena().add_store(name, store, options)</tt>. But, you 514 514 probably shouldn't. Instead, allow your deployers to decide for 515 515 themselves which storage solution(s) to use. You can do this by calling … … 521 521 522 522 <h4>Registering Unit Classes</h4> 523 <p>The <tt>Arena</tt> object maintains a registry of Unit classes called a 524 <tt>roster</tt>. A roster is like a three-way map between Unit classes, 525 their names, and their assigned StorageManagers. You shouldn't manipulate 526 this structure on your own; instead, use the <tt>register</tt> or 527 <tt>register_all</tt> methods to register each Unit class.</p> 523 <p>The <tt>Arena</tt> object maintains a registry of Unit classes 524 (<tt>._registered_classes</tt>). You shouldn't manipulate this structure 525 on your own; instead, use the <tt>register</tt> or <tt>register_all</tt> 526 methods to register each Unit class.</p> 528 527 529 528 <p>The <tt>Arena</tt> object also manages the associations between Unit trunk/engines.py
r50 r51 259 259 if self.Expiration is not None: 260 260 if self.Expiration <= datetime.datetime.now(): 261 msg = ("Forgetting Engine %s. Exp = %s. Now = %s" %262 (self.ID, self.Expiration, datetime.datetime.now()))263 self.sandbox.arena.application.logger.info(msg)264 261 self.forget() 265 262 raise dejavu.UnrecallableError … … 462 459 expr = pickle.loads(operand) 463 460 expr.bind_args(**self.args) 461 464 462 A = self.sets[setID] 465 463 A.acquire() trunk/storage/__init__.py
r50 r51 21 21 self.name = name 22 22 self.arena = arena 23 self.classnames = [x.strip() for x 24 in allOptions.get('Units', '').split(",") 25 if x.strip()] 23 26 self.shutdownOrder = int(allOptions.get('Shutdown Order', '0')) 24 27 trunk/storage/db.py
r50 r51 16 16 """ 17 17 18 import time 18 19 import weakref 19 20 import Queue … … 84 85 # actual precision depends on platform. PostgreSQL 85 86 # DOUBLE is 8 bytes (15 decimal-digit precision). 86 return u"DOUBLE "87 return u"DOUBLE PRECISION" 87 88 88 89 def coerce_str(self, cls, key): … … 169 170 ## elif bytes == 3: 170 171 ## return "MEDIUMINT" 171 el if bytes == 4:172 else: 172 173 return u"INTEGER" 173 174 174 175 175 … … 238 238 coerce_decimal = tostr 239 239 coerce_decimal_Decimal = tostr 240 241 def pickle(self, value): 242 return self.coerce_str(pickle.dumps(value)) 243 244 coerce_dict = pickle 245 240 246 coerce_fixedpoint_FixedPoint = tostr 241 247 coerce_float = tostr 242 248 coerce_int = tostr 243 249 244 def coerce_list(self, value): 245 return self.coerce_str(pickle.dumps(value)) 250 coerce_list = pickle 246 251 247 252 coerce_long = tostr … … 252 257 return "'" + value + "'" 253 258 254 coerce_tuple = coerce_list259 coerce_tuple = pickle 255 260 256 261 coerce_unicode = coerce_str … … 423 428 # The entire expression could not be evaluated. 424 429 result = self.adapter.bool_true 430 if result == self.adapter.coerce_bool(True): 431 result = self.adapter.bool_true 432 if result == self.adapter.coerce_bool(False): 433 result = self.adapter.bool_false 425 434 return result 426 435 … … 429 438 comp = self.stack.pop() 430 439 trueval = self.adapter.bool_true 440 falseval = self.adapter.bool_false 431 441 while terms: 432 442 term, operation = terms.pop() … … 435 445 if comp is cannot_represent: 436 446 comp = trueval 447 448 # Blurg. SQL Server is *so* picky. 449 if term == self.adapter.coerce_bool(True): 450 term = trueval 451 if term == self.adapter.coerce_bool(False): 452 term = falseval 453 if comp == self.adapter.coerce_bool(True): 454 comp = trueval 455 if comp == self.adapter.coerce_bool(False): 456 comp = falseval 457 437 458 comp = "(%s) %s (%s)" % (term, operation.upper(), comp) 438 459 self.stack.append(comp) … … 647 668 toAdapter = AdapterToSQL() 648 669 fromAdapter = AdapterFromDB() 670 debug_connections = False 649 671 650 672 def __init__(self, name, arena, allOptions={}): … … 659 681 else: 660 682 self.pool = None 683 self.retry = 5 684 self.threaded = True 661 685 662 686 self.prefix = allOptions.get(u'Prefix', u"djv") … … 707 731 708 732 def connection(self): 709 if self.pool is not None:733 if not self.threaded: 710 734 try: 711 return self.pool.get_nowait() 712 except Queue.Empty: 713 pass 714 715 conn = self._get_conn() 716 w = ConnectionWrapper(conn) 717 self.refs[weakref.ref(w, self.release)] = conn 718 return w 735 return self.refs['conn'] 736 except KeyError: 737 self.refs['conn'] = conn = self._get_conn() 738 return conn 739 740 retry = 0 741 while True: 742 if self.pool is not None: 743 try: 744 conn = self.pool.get_nowait() 745 # Okay, this is freaky. If we wrap here, all goes well. 746 # If we wrap on Queue.put(), mysql crashes after 1700 747 # or so inserts (when migrating Access tables to MySQL). 748 # Go figure. 749 w = ConnectionWrapper(conn) 750 self.refs[weakref.ref(w, self.release)] = w.conn 751 if self.debug_connections: 752 print "-->get %s" % self.__class__.__name__ 753 return w 754 except Queue.Empty: 755 pass 756 757 try: 758 conn = self._get_conn() 759 w = ConnectionWrapper(conn) 760 self.refs[weakref.ref(w, self.release)] = w.conn 761 if self.debug_connections: 762 print "create %s" % self.__class__.__name__ 763 return w 764 except OutOfConnectionsError: 765 retry += 1 766 if retry < self.retry: 767 time.sleep(retry * 1) 768 conn = None 769 continue 770 raise OutOfConnectionsError() 719 771 720 772 def release(self, ref): 721 773 conn = self.refs[ref] 722 774 del self.refs[ref] 775 723 776 if self.pool is not None: 724 777 try: 725 self.pool.put_nowait(ConnectionWrapper(conn)) 778 self.pool.put_nowait(conn) 779 if self.debug_connections: 780 print "<--put %s" % self.__class__.__name__ 781 return 726 782 except Queue.Full: 727 getattr(conn, self.close_connection_method)() 783 pass 784 785 getattr(conn, self.close_connection_method)() 786 if self.debug_connections: 787 print "___close___ %s" % self.__class__.__name__ 728 788 729 789 def shutdown(self): … … 783 843 def execute(self, query, conn=None): 784 844 """execute(query, conn=None) -> result set.""" 785 try:786 if conn is None:787 conn = self.connection()788 return conn.query(query.encode('utf8'))789 except Exception, x:790 x.args += (query,)791 # Dereference the connection so that release() is called back.792 conn = None793 raise x845 ## try: 846 if conn is None: 847 conn = self.connection() 848 return conn.query(query.encode('utf8')) 849 ## except Exception, x: 850 ## x.args += (query,) 851 ## # Dereference the connection so that release() is called back. 852 ## conn = None 853 ## raise x 794 854 795 855 def fetch(self, query, conn=None): … … 850 910 The ID should be supplied by UnitSequencers via reserve(). 851 911 """ 912 cls = unit.__class__ 852 913 tablename = self.tablename(unit) 853 id = self.identifier("ID") 854 coerce = self.fromAdapter.coerce 855 expectedType = unit.__class__.property_type("ID") 914 i = self.identifier 856 915 self.reserve_lock.acquire() 857 916 try: 858 917 if unit.ID is None: 859 data, cols = self.fetch(u'SELECT %s FROM %s;' % (id, tablename)) 918 # Examine all existing IDs and grant the "next" one. 919 data, cols = self.fetch(u'SELECT %s FROM %s;' % (i('ID'), tablename)) 920 coerce = self.fromAdapter.coerce 860 921 coltype = cols[0][1] 922 expectedType = cls.property_type("ID") 861 923 data = [coerce(row[0], coltype, expectedType) for row in data] 862 924 unit.ID = unit.sequencer.next(data) 925 del data 926 del cols 927 928 fields = [] 929 values = [] 930 for key in cls.properties(): 931 subtype = self.expanded_columns.get((cls.__name__, key)) 932 if subtype: 933 self.save_expanded(unit, key, subtype) 934 else: 935 val = self.toAdapter.coerce(getattr(unit, key)) 936 fields.append(i(key)) 937 values.append(val) 863 938 864 939 self.execute('INSERT INTO %s (%s) VALUES (%s);' % 865 (tablename, id, self.toAdapter.coerce(unit.ID))) 940 (tablename, u", ".join(fields), 941 u", ".join(values))) 942 unit.cleanse() 866 943 finally: 867 944 self.reserve_lock.release() … … 1058 1135 if acceptable: 1059 1136 yield unitset 1137 1138 1139 class OutOfConnectionsError(dejavu.DejavuError): 1140 """Exception raised when a database store has run out of connections.""" 1141 pass 1142 trunk/storage/sockets.py
r50 r51 1 import fixedpoint2 1 try: 3 2 import cPickle as pickle 4 3 except ImportError: 5 4 import pickle 5 6 6 import socket 7 7 import select 8 9 10 def _define_fixedpoint_states(): 11 """Add methods to fixedpoint to support pickling.""" 12 import fixedpoint 13 14 if not hasattr(fixedpoint.FixedPoint, "__getstate__"): 15 def __getstate__(self): 16 return (self.n, self.p) 17 fixedpoint.FixedPoint.__getstate__ = __getstate__ 18 19 def __setstate__(self, v): 20 self.n, self.p = v 21 fixedpoint.FixedPoint.__setstate__ = __setstate__ 8 22 9 23 … … 12 26 if isinstance(unit, dict): 13 27 for key, value in unit.iteritems(): 14 if isinstance(value, fixedpoint.FixedPoint): 15 # Unpicklable 16 data[key] = (value.n, value.p) 17 else: 18 data[key] = value 28 if value.__class__.__name__ == "FixedPoint": 29 _define_fixedpoint_states() 30 data[key] = value 19 31 else: 20 32 # Assume it's a dejavu.Unit. 21 33 for key in unit.__class__.properties(): 22 34 value = getattr(unit, key) 23 if isinstance(value, fixedpoint.FixedPoint): 24 # Unpicklable 25 data[key] = (value.n, value.p) 26 else: 27 data[key] = value 35 if value.__class__.__name__ == "FixedPoint": 36 _define_fixedpoint_states() 37 data[key] = value 28 38 return pickle.dumps(data) 29 39 … … 33 43 cls = unit.__class__ 34 44 for key in cls.properties(): 35 value = attrdict[key]36 if cls.property_type(key) == fixedpoint.FixedPoint:37 if value is not None:38 n, p = value39 value = fixedpoint.FixedPoint(0, 2)40 value.n, value.p = n, p41 45 # Set the attribute directly to avoid __set__ overhead. 42 unit._properties[key] = value46 unit._properties[key] = attrdict[key] 43 47 44 48 def dechunk(data): trunk/storage/storeado.py
r50 r51 8 8 import pywintypes 9 9 import datetime 10 10 11 try: 11 12 import cPickle as pickle … … 17 18 except ImportError: 18 19 pass 20 21 try: 22 # Builtin in Python 2.5? 23 decimal 24 except NameError: 25 try: 26 # Module in Python 2.3, 2.4 27 import decimal 28 except ImportError: 29 pass 30 31 import warnings 19 32 20 33 import dejavu … … 330 343 # res.CursorLocation = adUseClient 331 344 try: 332 # 'conn' will be a ConnectionWrapper object, which .Open 333 # won't accept. Pass the unwrapped connection instead. 334 res.Open(query, conn.conn, adOpenForwardOnly, adLockReadOnly) 345 if self.threaded: 346 # 'conn' will be a ConnectionWrapper object, which .Open 347 # won't accept. Pass the unwrapped connection instead. 348 res.Open(query, conn.conn, adOpenForwardOnly, adLockReadOnly) 349 else: 350 res.Open(query, conn, adOpenForwardOnly, adLockReadOnly) 335 351 except pywintypes.com_error, x: 336 352 try: … … 339 355 pass 340 356 x.args += (query, ) 357 conn = None 341 358 raise x 342 359 … … 351 368 data = zip(*data) 352 369 res.Close() 370 conn = None 353 371 354 372 return data, columns … … 374 392 def coerce_bool(self, value): 375 393 if value: 376 return ' (1=1)'377 return ' (1=0)'394 return '1' 395 return '0' 378 396 379 397 … … 381 399 382 400 numeric_max_precision = 38 401 402 def coerce_bool(self, cls, key): return u"BIT" 383 403 384 404 def coerce_datetime_datetime(self, cls, key): … … 398 418 # Okay, what the @#$%& is wrong with Redmond??!?! We can't even 399 419 # compare TEXT or NTEXT fields??!? Fine. We'll deny such, and 400 # warn the deployer with less swearing .420 # warn the deployer with less swearing and exclamation points. 401 421 import warnings 402 422 warnings.warn("You have defined a string property without " … … 468 488 class FieldTypeAdapter_MSAccess(db.FieldTypeAdapter): 469 489 470 numeric_max_precision = 28 490 numeric_max_precision = 15 491 492 def coerce_bool(self, cls, key): return u"BIT" 493 494 def coerce_datetime_datetime(self, cls, key): return u"DATETIME" 495 def coerce_datetime_date(self, cls, key): return u"DATETIME" 496 def coerce_datetime_time(self, cls, key): return u"DATETIME" 497 498 def numeric_type(self, cls, key, precision, scale): 499 if precision > self.numeric_max_precision: 500 warnings.warn("Decimal precision %s > maximum %s for %s.%s, " 501 "using %s. Values may be stored incorrectly." 502 % (precision, self.numeric_max_precision, 503 cls.__name__, key, self.__class__.__name__)) 504 precision = self.numeric_max_precision 505 if scale > 4: 506 warnings.warn("Decimal scale %s > maximum 4 for %s.%s, " 507 "using %s. Values may be stored incorrectly." 508 % (scale, cls.__name__, key, 509 self.__class__.__name__)) 510 511 # MS Access doesn't let us control precision and scale directly. 512 # From http://support.microsoft.com/?kbid=104977 513 # ORACLE number Microsoft Access data type 514 # --------------------------------------------------- 515 # Scale = 0 and 516 # precision <= 4 Integer 517 # precision <= 9 Long Integer 518 # precision <= 15 Double 519 # Scale > 0 and <= 4 520 # precision <= 15 Double 521 # Scale > 4 and/or 522 # precision > 15 Text 523 if scale == 0: 524 if precision <= 4: 525 return "INTEGER" 526 elif precision <= 9: 527 return "LONG" 528 return "DOUBLE" 529 530 def coerce_decimal_Decimal(self, cls, key): 531 prop = getattr(cls, key) 532 precision = int(prop.hints.get('precision', '0')) 533 if precision == 0: 534 precision = decimal.getcontext().prec 535 # Assume most people use decimal for money; default scale = 2. 536 scale = int(prop.hints.get(u'scale', 2)) 537 return self.numeric_type(cls, key, precision, scale) 538 539 def coerce_fixedpoint_FixedPoint(self, cls, key): 540 prop = getattr(cls, key) 541 precision = int(prop.hints.get('precision', '0')) 542 if precision == 0: 543 precision = self.numeric_max_precision 544 # Assume most people use decimal for money; default scale = 2. 545 scale = int(prop.hints.get(u'scale', 2)) 546 return self.numeric_type(cls, key, precision, scale) 547 548 def coerce_int(self, cls, key): 549 prop = getattr(cls, key) 550 bytes = int(prop.hints.get(u'bytes', '4')) 551 if bytes == 1: 552 return "BIT" 553 else: 554 return u"INTEGER" 555 556 def coerce_long(self, cls, key): 557 prop = getattr(cls, key) 558 bytes = int(prop.hints.get(u'bytes', 0)) 559 return self.numeric_type(cls, key, precision, 0) 471 560 472 561 def coerce_str(self, cls, key): … … 478 567 return u"VARCHAR(%s)" % bytes 479 568 else: 480 # MEMO is 1 gigabytewhen set programatically (only 64K when set569 # MEMO is 1 GB max when set programatically (only 64K when set 481 570 # in Access UI). But then, 1 GB is the limit for the whole DB. 482 571 return u"MEMO" … … 517 606 self.cursorType = int(allOptions.get(u'CursorType', adOpenDynamic)) 518 607 self.lockType = int(allOptions.get(u'LockType', adLockOptimistic)) 608 # MS Access can't use a pool, because there doesn't seem 609 # to be a commit timeout. 519 610 self.pool = None 611 self.threaded = False 612 self.debug_connections = True 520 613 521 614 def create_database(self): trunk/storage/storemysql.py
r50 r51 26 26 27 27 class AdapterFromMySQL(db.AdapterFromDB): 28 29 def coerce_bool(self, value, coltype): 30 if isinstance(value, basestring): 31 # either '0' or '1' 32 value = (value == '1') 33 return bool(value) 28 34 29 35 def coerce_unicode(self, value, coltype): … … 141 147 conn = _mysql.connect(**self.connargs) 142 148 except _mysql.OperationalError, x: 143 if self.CreateIfMissing: 149 if x.args[0] == 1040: # Too many connections 150 raise db.OutOfConnectionsError 151 elif x.args[0] == 1049 and self.CreateIfMissing: 144 152 self.create_database() 145 153 conn = _mysql.connect(**self.connargs) … … 150 158 def _template_conn(self): 151 159 tmplconn = self.connargs.copy() 152 tmplconn['db'] = ' '160 tmplconn['db'] = 'mysql' 153 161 return _mysql.connect(**tmplconn) 154 162 … … 165 173 self.execute(sql, conn) 166 174 conn.close() 175 176 def create_storage(self, unitClass): 177 # MySQL won't allow indexes on a BLOB field without a specific length. 178 tablename = self.tablename(unitClass) 179 180 coerce = self.typeAdapter.coerce 181 fields = [] 182 for key in unitClass.properties(): 183 fields.append(u'%s %s' % (self.identifier(key), 184 coerce(unitClass, key))) 185 self.execute(u'CREATE TABLE %s (%s);' % (tablename, ", ".join(fields))) 186 187 for index in unitClass.indices(): 188 i = self.identifier(self.prefix, "i", unitClass.__name__, index) 189 190 dbtype = coerce(unitClass, index) 191 if dbtype.endswith('BLOB') or dbtype == 'TEXT': 192 self.execute(u'CREATE INDEX %s ON %s (%s(%s));' % 193 (i, tablename, self.identifier(index), 255)) 194 else: 195 self.execute(u'CREATE INDEX %s ON %s (%s);' % 196 (i, tablename, self.identifier(index))) 167 197 168 198 def fetch(self, query, conn=None): … … 180 210 res = conn.store_result() 181 211 return res.fetch_row(0, 0), res.describe() 182 212 213 def destroy(self, unit): 214 """destroy(unit). Delete the unit.""" 215 self.execute(u'DELETE FROM %s WHERE %s = %s;' % 216 (self.tablename(unit), self.identifier("ID"), 217 self.toAdapter.coerce(unit.ID))) 218 trunk/storage/storepypgsql.py
r50 r51 1 1 # Use libpq directly to avoid all of the DB-API overhead. 2 2 from pyPgSQL import libpq 3 import time4 3 import datetime 5 4 import dejavu … … 50 49 k, v = atom.split("=", 1) 51 50 setattr(self, k, v) 52 53 self.retry = 554 51 55 52 def _get_conn(self): 56 retry = 0 57 while True: 58 try: 59 return libpq.PQconnectdb(self.connstring) 60 except libpq.DatabaseError, x: 61 msg = x.args[0] 62 if msg.endswith('does not exist\n'): 63 if self.CreateIfMissing: 64 self.create_database() 65 return libpq.PQconnectdb(self.connstring) 66 elif msg.startswith('could not connect'): 67 retry += 1 68 if retry < self.retry: 69 time.sleep(retry * 0.1) 70 continue 71 x.args += ("Try increasing your Postgres server's max_connections",) 72 raise x 53 try: 54 return libpq.PQconnectdb(self.connstring) 55 except libpq.DatabaseError, x: 56 msg = x.args[0] 57 if msg.endswith('does not exist\n'): 58 if self.CreateIfMissing: 59 self.create_database() 60 return libpq.PQconnectdb(self.connstring) 61 elif msg.startswith('could not connect'): 62 raise db.OutOfConnectionsError 63 raise x 73 64 74 65 def _template_conn(self): trunk/storage/zoo_fixture.py
r50 r51 11 11 import dejavu 12 12 from dejavu import logic, zoo 13 14 try: 15 import fixedpoint 16 except ImportError: 17 fixedpoint = None 13 18 14 19 … … 46 51 box.memorize(Biodome) 47 52 53 seaworld = zoo.Zoo(Name = 'Sea World', 54 Admission = "60", 55 ) 56 box.memorize(seaworld) 57 48 58 # Animals 49 leopard = zoo.Animal(Name='Leopard', Legs=4 )59 leopard = zoo.Animal(Name='Leopard', Legs=4, Lifespan=73.5) 50 60 self.assertEqual(leopard.PreviousZoos, None) 51 61 box.memorize(leopard) … … 53 63 leopard.LastEscape = datetime.datetime(2004, 12, 21, 8, 15, 0) 54 64 55 box.memorize(zoo.Animal(Name='Slug', Legs=1)) 56 box.memorize(zoo.Animal(Name='Tiger', Legs=4)) 65 box.memorize(zoo.Animal(Name='Slug', Legs=1, Lifespan=.75)) 66 tiger = zoo.Animal(Name='Tiger', Legs=4) 67 box.memorize(tiger) 57 68 box.memorize(zoo.Animal(Name='Lion', Legs=4)) 58 69 box.memorize(zoo.Animal(Name='Bear', Legs=4)) 59 70 # Notice that ostrich.PreviousZoos is [], whereas leopard is None. 60 box.memorize(zoo.Animal(Name='Ostrich', Legs=2, PreviousZoos=[])) 71 box.memorize(zoo.Animal(Name='Ostrich', Legs=2, PreviousZoos=[], 72 Lifespan=103.2)) 61 73 box.memorize(zoo.Animal(Name='Centipede', Legs=100)) 62 74 … … 66 78 box.memorize(adelie) 67 79 80 seaworld.add(emp, adelie) 81 68 82 millipede = zoo.Animal(Name='Millipede', Legs=1000000) 69 83 millipede.PreviousZoos = [WAP.ID] 70 84 box.memorize(millipede) 71 85 72 SDZ.add( emp, adelie, millipede)86 SDZ.add(tiger, millipede) 73 87 74 88 # Exhibits 75 89 pe = zoo.Exhibit(Name = 'The Penguin Encounter', 76 ZooID = SDZ.ID,90 ZooID = seaworld.ID, 77 91 Animals = [emp.ID, adelie.ID], 92 PettingAllowed = True, 78 93 Acreage = "3.21", 79 94 ) 80 95 box.memorize(pe) 96 97 tr = zoo.Exhibit(Name = 'Tiger River', 98 ZooID = SDZ.ID, 99 Animals = [tiger.ID], 100 PettingAllowed = False, 101 Acreage = "4", 102 ) 103 box.memorize(tr) 81 104 82 105 box.flush_all() … … 110 133 self.assertEqual(float(Biodome.Admission), 11.75) 111 134 135 if fixedpoint: 136 seaworld = box.unit(zoo.Zoo, Admission = fixedpoint.FixedPoint(60)) 137 else: 138 seaworld = box.unit(zoo.Zoo, Admission = float(60)) 139 self.assertNotEqual(seaworld, None) 140 112 141 # Animals 113 142 leopard = box.unit(zoo.Animal, Name='Leopard') 114 143 self.assertEqual(leopard.Name, 'Leopard') 115 144 self.assertEqual(leopard.Legs, 4) 145 self.assertEqual(leopard.Lifespan, 73.5) 116 146 self.assertEqual(leopard.ZooID, WAP.ID) 117 147 self.assertEqual(leopard.PreviousZoos, None) … … 135 165 # Exhibits 136 166 exes = [x for x in box.recall(zoo.Exhibit)] 137 self.assertEqual(len(exes), 1) 138 pe = exes[0] 139 self.assertEqual(pe.Name, "The Penguin Encounter") 140 self.assertEqual(pe.ZooID, SDZ.ID) 167 self.assertEqual(len(exes), 2) 168 if exes[0].Name == 'The Penguin Encounter': 169 pe = exes[0] 170 tr = exes[1] 171 else: 172 pe = exes[1] 173 tr = exes[0] 174 self.assertEqual(pe.ZooID, seaworld.ID) 141 175 self.assertEqual(len(pe.Animals), 2) 142 176 self.assertEqual(float(pe.Acreage), 3.21) 177 self.assertEqual(pe.PettingAllowed, True) 178 179 self.assertEqual(tr.ZooID, SDZ.ID) 180 self.assertEqual(len(tr.Animals), 1) 181 self.assertEqual(float(tr.Acreage), 4) 182 self.assertEqual(tr.PettingAllowed, False) 143 183 144 184 box.flush_all() … … 155 195 zoos = [x for x in box.recall(zoo.Zoo)] 156 196 self.assertEqual(zoos[0].dirty(), False) 157 self.assertEqual(len(zoos), 3)197 self.assertEqual(len(zoos), 4) 158 198 self.assertEqual(matches(lambda x: True), 10) 159 199 self.assertEqual(matches(lambda x: x.Legs == 4), 4) … … 161 201 self.assertEqual(matches(lambda x: x.Legs >= 2 and x.Legs < 20), 7) 162 202 self.assertEqual(matches(lambda x: x.Legs > 10), 2) 203 self.assertEqual(matches(lambda x: x.Lifespan > 70), 2) 163 204 self.assertEqual(matches(lambda x: x.Name.startswith('L')), 2) 164 205 self.assertEqual(matches(lambda x: x.Name.endswith('pede')), 2) … … 212 253 (zoo.Animal, None))] 213 254 SDZ = box.unit(zoo.Zoo, Name='San Diego Zoo') 214 self.assertEqual(len(zooed_animals), 3)255 self.assertEqual(len(zooed_animals), 2) 215 256 aid = 0 216 257 for z, a in zooed_animals: … … 218 259 self.assertNotEqual(id(a), aid) 219 260 aid = id(a) 220 ##221 ##def test_5_Multithreading(self):222 ##f = logic.Expression(lambda x: x.Legs == 4)223 ##def thread_recall():224 ### Notice we only do reads in this thread, not writes, since225 ### the order of thread execution can not be guaranteed.226 ##box = zoo.arena.new_sandbox()227 ##quadrupeds = [x for x in box.recall(zoo.Animal, f)]228 ##self.assertEqual(len(quadrupeds), 4)229 ##230 ##ts = []231 ### PostgreSQL, for example, has a default max_connections of 100.232 ##for x in range(99):233 ##t = threading.Thread(target=thread_recall)234 ##t.start()235 ##ts.append(t)236 ##for t in ts:237 ##t.join()261 262 def test_5_Multithreading(self): 263 f = logic.Expression(lambda x: x.Legs == 4) 264 def thread_recall(): 265 # Notice we only do reads in this thread, not writes, since 266 # the order of thread execution can not be guaranteed. 267 box = zoo.arena.new_sandbox() 268 quadrupeds = [x for x in box.recall(zoo.Animal, f)] 269 self.assertEqual(len(quadrupeds), 4) 270 271 ts = [] 272 # PostgreSQL, for example, has a default max_connections of 100. 273 for x in range(99): 274 t = threading.Thread(target=thread_recall) 275 t.start() 276 ts.append(t) 277 for t in ts: 278 t.join() 238 279 239 280 240 281 def setup_SM(SM_class, opts): 241 """main(SM_class, PreviousZoos) -> Set up the arena and SM for Zoo.""" 242 if isinstance(SM_class, basestring): 243 import xray 244 SM_class = xray.classes(SM_class) 245 testSM = SM_class("test", zoo.arena, opts) 246 zoo.arena.add_store('testSM', testSM) 282 """setup_SM(SM_class, opts). Set up storage for Zoo classes.""" 283 zoo.arena.add_store('testSM', SM_class, opts) 247 284 248 285 for cls in (zoo.Animal, zoo.Zoo, zoo.Exhibit): trunk/test_containers.py
r50 r51 160 160 161 161 162 class Artwork(object):163 def __init__(self, Name='', Artist='', Rate=''):164 self.Name = Name165 self.Artist = Artist166 self.Rate = Rate167 168 169 class IndexTests(unittest.TestCase):170 171 def test_Index(self):172 artworks = containers.Index('Name', 'Artist', 'Rate')173 for index in artworks:174 self.assertEqual(index in ['Name', 'Artist', 'Rate'], True)175 176 # Store a single object.177 guernica = Artwork('Guernica', 'Pablo Picasso', 185.00)178 artworks.store(guernica)179 self.assertEqual(artworks.bucket('Name', 'Guernica'), [guernica])180 181 # Add another that should collide on an indexed attribute.182 frenzy = Artwork('Frenzy', 'Pablo Picasso', 203.45)183 artworks.store(frenzy)184 self.assertEqual(artworks.bucket('Artist', 'Pablo Picasso'),185 [guernica, frenzy])186 187 # Add a bunch.188 magrittes = [Artwork('Empire of Light', 'Rene Magritte', 512.13),189 Artwork('The Rape', 'Rene Magritte', 121.99),190 Artwork('The Son of Man', 'Rene Magritte', 1024.33),191 Artwork('The Difficult Crossing', 'Rene Magritte', 0.32),192 ]193 for work in magrittes:194 artworks.store(work)195 196 # Manually find all artworks rated under 200197 cheapos = []198 for rate in artworks.handles('Rate'):199 if rate < 200:200 cheapos.extend(artworks.bucket('Rate', rate))201 expected = [magrittes[1], guernica, magrittes[3]]202 self.assertEqual(len(cheapos), len(expected))203 for work in expected:204 self.assertEqual(work in cheapos, True)205 206 # Retrieve() all artworks rated over 200.207 assets = artworks.retrieve('Rate', operator.gt, 200)208 expected = [magrittes[2], frenzy, magrittes[0]]209 self.assertEqual(len(assets), len(expected))210 for work in expected:211 self.assertEqual(work in assets, True)212 213 # Retrieve() all artworks. Use two different attributes214 # and compare the results.215 byrate = artworks.retrieve('Rate')216 byartist = artworks.retrieve('Artist')217 self.assertEqual(len(byrate), len(byartist))218 for work in byrate:219 self.assertEqual(work in byartist, True)220 221 # Delete a single artwork.222 artworks.remove(guernica)223 self.assertEqual(artworks.bucket('Artist', 'Pablo Picasso'), [frenzy])224 self.assertRaises(KeyError, artworks.bucket, 'Name', 'Guernica')225 226 # Change an indexed value of an artwork.227 guernica.Rate = 300.0228 artworks.reindex(guernica, 'Rate')229 self.assertEqual(artworks.retrieve('Rate', operator.eq, 300), [guernica])230 231 # Reindex with the known last value.232 guernica.Rate = 135.0233 artworks.reindex(guernica, 'Rate', 300.0)234 self.assertEqual(artworks.retrieve('Rate', operator.eq, 135), [guernica])235 self.assertEqual(artworks.retrieve('Rate', operator.eq, 300), [])236 self.assertRaises(KeyError, artworks.bucket, 'Rate', 300.0)237 238 239 class PrismTests(unittest.TestCase):240 241 def test_Prism(self):242 artworks = containers.Prism('Name', 'Artist', 'Rate')243 for index in artworks.facets.keys():244 self.assertEqual(index in ['Name', 'Artist', 'Rate'], True)245 246 # Store a single row.247 artworks.add(Name='Guernica', Artist='Pablo Picasso', Rate=185.00)248 self.assertEqual(artworks.Artist(Name='Guernica'), 'Pablo Picasso')249 250 # Add another that should collide on an indexed attribute.251 artworks.add(Name='Frenzy', Artist='Pablo Picasso', Rate=203.45)252 self.assertEqual(artworks.Name(Artist='Pablo Picasso'), 'Guernica')253 self.assertEqual(artworks.Rate(Artist='Pablo Picasso'), 185.00)254 255 # Add a row with missing facets.256 artworks.add(Name='Empire of Light', Artist='Rene Magritte')257 self.assertEqual(artworks.Rate(Artist='Rene Magritte'), None)258 # Fill in the missing value.259 i = artworks.row_number(Name='Empire of Light')260 artworks.facets['Rate'][i] = 512.13261 self.assertEqual(artworks.Rate(Artist='Rene Magritte'), 512.13)262 263 264 162 def run_tests(): 265 163 try: trunk/test_dejavu.py
r50 r51 4 4 import dejavu 5 5 from dejavu import zoo, storage 6 zoo.arena.add_store("default", storage.CachingProxy("default", zoo.arena))6 zoo.arena.add_store("default", "storage.CachingProxy") 7 7 8 8 trunk/zoo.py
r50 r51 38 38 PreviousZoos = UnitProperty(list) 39 39 LastEscape = EscapeProperty(datetime.datetime) 40 Lifespan = UnitProperty(float, hints={'bytes': 4}) 40 41 41 42 associate(Zoo, 'ID', Animal, 'ZooID') … … 47 48 ZooID = UnitProperty(int) 48 49 Animals = UnitProperty(list) 50 PettingAllowed = UnitProperty(bool) 49 51 if decimal: 50 52 Acreage = UnitProperty(decimal.Decimal)
