Changeset 8
- Timestamp:
- 02/13/07 01:48:48
- Files:
-
- trunk/geniusql/__init__.py (modified) (4 diffs)
- trunk/geniusql/providers/ado.py (modified) (5 diffs)
- trunk/geniusql/providers/firebird.py (modified) (2 diffs)
- trunk/geniusql/providers/mysql.py (modified) (2 diffs)
- trunk/geniusql/providers/psycopg.py (modified) (2 diffs)
- trunk/geniusql/providers/pypgsql.py (modified) (2 diffs)
- trunk/geniusql/providers/sqlite.py (modified) (2 diffs)
- trunk/geniusql/test/zoo_fixture.py (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/geniusql/__init__.py
r7 r8 18 18 19 19 import threading 20 21 from dejavu import logic 22 20 23 from geniusql import xray 21 22 24 from geniusql import errors, typerefs 23 25 … … 342 344 return i 343 345 344 def select_all(self, restriction=None): 346 347 # ---------------------------- OLTP/CRUD ---------------------------- # 348 349 def id_clause(self, **inputs): 350 """Return an SQL expression for the identifiers of the given table.""" 351 coerce = self.db.adaptertosql.coerce 352 pairs = [] 353 for key, col in self.iteritems(): 354 if col.key: 355 val = coerce(inputs[key], col.dbtype) 356 pairs.append("%s = %s" % (col.qname, val)) 357 return " AND ".join(pairs) 358 359 def insert(self, **inputs): 360 """Insert a row and return {idcolkey: newid}.""" 361 coerce_out = self.db.adaptertosql.coerce 362 coerce_in = self.db.adapterfromdb.coerce 363 364 fields = [] 365 idkeys = [] 366 values = [] 367 for key, col in self.iteritems(): 368 if col.autoincrement: 369 # Skip this field, since we're using a sequencer 370 idkeys.append(key) 371 continue 372 if key in inputs: 373 val = coerce_out(inputs[key], col.dbtype) 374 fields.append(col.qname) 375 values.append(val) 376 377 transconn = self.db.get_transaction() 378 379 fields = ", ".join(fields) 380 values = ", ".join(values) 381 self.db.execute('INSERT INTO %s (%s) VALUES (%s);' % 382 (self.qname, fields, values), transconn) 383 384 if idkeys: 385 newids = self._grab_new_ids(idkeys, transconn) 386 for key in newids.keys(): 387 col = self[key] 388 newids[key] = coerce_in(newids[key], col.dbtype, col.pytype) 389 return newids 390 else: 391 return {} 392 393 def _grab_new_ids(self, idkeys, conn): 394 # Override this to fetch and return new autoincrement values. 395 raise NotImplementedError 396 397 def save(self, **inputs): 398 """Update a row using the given inputs.""" 399 parms = [] 400 coerce = self.db.adaptertosql.coerce 401 for key, val in inputs.iteritems(): 402 col = self[key] 403 if col.autoincrement: 404 # Skip this field, since we're using a sequencer 405 pass 406 else: 407 val = coerce(val, col.dbtype) 408 parms.append('%s = %s' % (col.qname, val)) 409 410 if parms: 411 sql = ('UPDATE %s SET %s WHERE %s;' % 412 (self.qname, ", ".join(parms), self.id_clause(**inputs))) 413 self.db.execute(sql, self.db.get_transaction()) 414 415 def select_all(self, restriction=None, **kwargs): 345 416 """Yield data dicts matching the given restriction.""" 417 if restriction and not isinstance(restriction, logic.Expression): 418 restriction = logic.Expression(restriction) 419 if kwargs: 420 f = logic.filter(**kwargs) 421 if restriction: 422 restriction += f 423 else: 424 restriction = f 425 346 426 attrs = self.keys() 347 427 data = self.db.select(self, attrs, restriction) … … 354 434 yield row 355 435 356 def select_one(self, restriction=None ):436 def select_one(self, restriction=None, **kwargs): 357 437 """Return a single data dict matching the given restriction (or None).""" 358 438 try: 359 return self.select_all(restriction ).next()439 return self.select_all(restriction, **kwargs).next() 360 440 except StopIteration: 361 441 return None … … 937 1017 raise errors.TransactionLock("Unlock called inside transaction.") 938 1018 del self.transactions[key] 939 940 # OLTP/CRUD #941 942 def id_clause(self, tablekey, **inputs):943 """Return an SQL expression for the identifiers of the given table."""944 t = self[tablekey]945 coerce = self.adaptertosql.coerce946 pairs = []947 for key, col in t.iteritems():948 if col.key:949 val = coerce(inputs[key], col.dbtype)950 pairs.append("%s = %s" % (col.qname, val))951 return " AND ".join(pairs)952 953 def insert(self, tablekey, **inputs):954 """Insert a row and return {idcolkey: newid}."""955 t = self[tablekey]956 coerce_out = self.adaptertosql.coerce957 coerce_in = self.adapterfromdb.coerce958 959 fields = []960 idkeys = []961 values = []962 for key, col in t.iteritems():963 if col.autoincrement:964 # Skip this field, since we're using a sequencer965 idkeys.append(key)966 continue967 if key in inputs:968 val = coerce_out(inputs[key], col.dbtype)969 fields.append(col.qname)970 values.append(val)971 972 transconn = self.get_transaction()973 974 fields = ", ".join(fields)975 values = ", ".join(values)976 self.execute('INSERT INTO %s (%s) VALUES (%s);' %977 (t.qname, fields, values), transconn)978 979 if idkeys:980 newids = self._grab_new_ids(t, idkeys, transconn)981 for key in newids.keys():982 col = t[key]983 newids[key] = coerce_in(newids[key], col.dbtype, col.pytype)984 return newids985 else:986 return {}987 988 def _grab_new_ids(self, table, idkeys):989 # Override this to fetch and return new autoincrement values.990 raise NotImplementedError991 992 def save(self, tablekey, **inputs):993 """Update a row using the given inputs."""994 t = self[tablekey]995 996 parms = []997 coerce = self.adaptertosql.coerce998 for key, val in inputs.iteritems():999 col = t[key]1000 if col.autoincrement:1001 # Skip this field, since we're using a sequencer1002 pass1003 else:1004 val = coerce(val, col.dbtype)1005 parms.append('%s = %s' % (col.qname, val))1006 1007 if parms:1008 sql = ('UPDATE %s SET %s WHERE %s;' %1009 (t.qname, ", ".join(parms), self.id_clause(tablekey, **inputs)))1010 self.execute(sql, self.get_transaction())1011 1019 1012 1020 trunk/geniusql/providers/ado.py
r7 r8 842 842 self.db.execute("EXEC sp_rename '%s.%s', '%s', 'COLUMN'" % 843 843 (self.name, oldcol.name, newcol.name)) 844 845 def _grab_new_ids(self, idkeys, conn): 846 """Insert a row using the table's SERIAL field.""" 847 # For some reason, using SCOPE_IDENTITY or IDENTITY failed (returned 848 # None) when retrieving ID's just after a 99-thread-test ran. Moving 849 # the multithreading test fixed it. IDENT_CURRENT worked regardless. 850 data, _ = self.db.fetch("SELECT IDENT_CURRENT('%s');" % self.qname, 851 conn) 852 return {idkeys[0]: data[0][0]} 844 853 845 854 … … 920 929 return '%s %s%s' % (column.qname, dbtype, clause) 921 930 922 def _grab_new_ids(self, table, idkeys, conn):923 """Insert a row using the table's SERIAL field."""924 # For some reason, using SCOPE_IDENTITY or IDENTITY failed (returned925 # None) when retrieving ID's just after a 99-thread-test ran. Moving926 # the multithreading test fixed it. IDENT_CURRENT worked regardless.927 data, _ = self.fetch("SELECT IDENT_CURRENT('%s');" % table.qname, conn)928 return {idkeys[0]: data[0][0]}929 930 931 # Transactions # 931 932 … … 1045 1046 1046 1047 1048 class MSAccessTable(ADOTable): 1049 1050 def _grab_new_ids(self, idkeys, conn): 1051 data, _ = self.db.fetch("SELECT @@IDENTITY;", conn) 1052 return {idkeys[0]: data[0][0]} 1053 1054 1047 1055 class MSAccessDatabase(ADODatabase): 1048 1056 … … 1050 1058 adaptertosql = AdapterToADOSQL_MSAccess() 1051 1059 typeadapter = TypeAdapter_MSAccess() 1060 tableclass = MSAccessTable 1052 1061 1053 1062 def version(self): … … 1138 1147 return '%s %s' % (column.qname, dbtype) 1139 1148 1140 def _grab_new_ids(self, table, idkeys, conn):1141 data, _ = self.fetch("SELECT @@IDENTITY;", conn)1142 return {idkeys[0]: data[0][0]}1143 1144 1149 # Connecting # 1145 1150 trunk/geniusql/providers/firebird.py
r7 r8 255 255 self.db.execute("ALTER TABLE %s ALTER COLUMN %s TO %s;" % 256 256 (self.qname, oldcol.qname, newcol.qname)) 257 258 def insert(self, **inputs): 259 """Insert a row and return {idcolkey: newid}.""" 260 coerce_out = self.db.adaptertosql.coerce 261 coerce_in = self.db.adapterfromdb.coerce 262 263 newids = {} 264 fields = [] 265 values = [] 266 for key, col in self.iteritems(): 267 if col.autoincrement: 268 # This advances the generator and returns its new value. 269 data, _ = self.db.fetch("SELECT GEN_ID(%s, 1) FROM RDB$DATABASE;" 270 % col.sequence_name) 271 newid = coerce_in(data[0][0], col.dbtype, col.pytype) 272 newids[key] = newid 273 274 val = coerce_out(newid, col.dbtype) 275 fields.append(col.qname) 276 values.append(val) 277 elif key in inputs: 278 val = coerce_out(inputs[key], col.dbtype) 279 fields.append(col.qname) 280 values.append(val) 281 282 fields = ", ".join(fields) 283 values = ", ".join(values) 284 self.db.execute('INSERT INTO %s (%s) VALUES (%s);' 285 % (self.qname, fields, values), 286 self.db.get_transaction()) 287 288 return newids 257 289 258 290 … … 659 691 return ("KInterbasDB Version: %r\nServer Version: %r" 660 692 % (kinterbasdb.__version__, svcCon.getServerVersion())) 661 662 def insert(self, tablekey, **inputs): 663 """Insert a row and return {idcolkey: newid}.""" 664 t = self[tablekey] 665 coerce_out = self.adaptertosql.coerce 666 coerce_in = self.adapterfromdb.coerce 667 668 newids = {} 669 fields = [] 670 values = [] 671 for key, col in t.iteritems(): 672 if col.autoincrement: 673 # This advances the generator and returns its new value. 674 data, _ = self.fetch("SELECT GEN_ID(%s, 1) FROM RDB$DATABASE;" 675 % col.sequence_name) 676 newid = coerce_in(data[0][0], col.dbtype, col.pytype) 677 newids[key] = newid 678 679 val = coerce_out(newid, col.dbtype) 680 fields.append(col.qname) 681 values.append(val) 682 elif key in inputs: 683 val = coerce_out(inputs[key], col.dbtype) 684 fields.append(col.qname) 685 values.append(val) 686 687 transconn = self.get_transaction() 688 689 fields = ", ".join(fields) 690 values = ", ".join(values) 691 self.execute('INSERT INTO %s (%s) VALUES (%s);' % 692 (t.qname, fields, values), transconn) 693 694 return newids 695 696 697 fields = ", ".join(fields) 698 values = ", ".join(values) 699 self.db.execute('INSERT INTO %s (%s) VALUES (%s);' % 700 (t.qname, fields, values)) 701 702 693 694 trunk/geniusql/providers/mysql.py
r7 r8 159 159 (self.qname, oldcol.qname, newcol.qname, 160 160 oldcol.dbtype)) 161 162 def _grab_new_ids(self, idkeys, conn): 163 return {idkeys[0]: conn.insert_id()} 161 164 162 165 … … 476 479 return res.fetch_row(0, 0), res.describe() 477 480 478 def _grab_new_ids(self, table, idkeys, conn):479 return {idkeys[0]: conn.insert_id()}480 481 481 def create_database(self): 482 482 self.lock("Creating database. Transactions not allowed.") trunk/geniusql/providers/psycopg.py
r7 r8 139 139 140 140 indexsetclass = PsycoPgIndexSet 141 142 def _grab_new_ids(self, idkeys, conn): 143 newids = {} 144 for idkey in idkeys: 145 col = self[idkey] 146 seq = col.sequence_name 147 data, _ = self.db.fetch("SELECT last_value FROM %s;" % seq, conn) 148 newids[idkey] = data[0][0] 149 return newids 141 150 142 151 … … 420 429 return data, coldefs 421 430 422 def _grab_new_ids(self, table, idkeys, conn):423 newids = {}424 for idkey in idkeys:425 col = table[idkey]426 seq = col.sequence_name427 data, _ = self.fetch("SELECT last_value FROM %s;" % seq, conn)428 newids[idkey] = data[0][0]429 return newids430 431 431 def create_database(self): 432 432 self.lock("Creating database. Transactions not allowed.") trunk/geniusql/providers/pypgsql.py
r7 r8 126 126 127 127 indexsetclass = PgIndexSet 128 129 def _grab_new_ids(self, idkeys, conn): 130 newids = {} 131 for idkey in idkeys: 132 col = self[idkey] 133 seq = col.sequence_name 134 data, _ = self.db.fetch("SELECT last_value FROM %s;" % seq, conn) 135 newids[idkey] = data[0][0] 136 return newids 128 137 129 138 … … 386 395 return data, columns 387 396 388 def _grab_new_ids(self, table, idkeys, conn):389 """Insert a row using the table's SERIAL field."""390 newids = {}391 for idkey in idkeys:392 col = table[idkey]393 seq = col.sequence_name394 data, _ = self.fetch("SELECT last_value FROM %s;" % seq, conn)395 newids[idkey] = data[0][0]396 return newids397 398 397 def create_database(self): 399 398 self.lock("Creating database. Transactions not allowed.") trunk/geniusql/providers/sqlite.py
r7 r8 412 412 finally: 413 413 db.unlock() 414 415 def _grab_new_ids(self, idkeys, conn): 416 if _lastrowid_support: 417 new_id = conn.lastrowid 418 else: 419 new_id = conn.sqlite_last_insert_rowid() 420 return {idkeys[0]: new_id} 414 421 415 422 … … 615 622 % column.sequence_name) 616 623 column.sequence_name = None 617 618 def _grab_new_ids(self, table, idkeys, conn):619 if _lastrowid_support:620 new_id = conn.lastrowid621 else:622 new_id = conn.sqlite_last_insert_rowid()623 return {idkeys[0]: new_id}624 624 625 625 def columnclause(self, column): trunk/geniusql/test/zoo_fixture.py
r7 r8 141 141 142 142 def test_2_populate(self): 143 newids = db.insert('Zoo',Name='Wild Animal Park',143 wap = db['Zoo'].insert(Name='Wild Animal Park', 144 144 Founded=datetime.date(2000, 1, 1), 145 145 # 59 can give rounding errors with divmod, which … … 148 148 LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7), 149 149 Admission=4.95, 150 ) 151 wap = int(newids['ID']) 152 153 newids = db.insert('Zoo', Name = 'San Diego Zoo', 150 )['ID'] 151 152 sdz = db['Zoo'].insert(Name = 'San Diego Zoo', 154 153 # This early date should play havoc with a number 155 154 # of implementations. … … 157 156 Opens = datetime.time(9, 0, 0), 158 157 Admission = 0, 159 ) 160 sdz = int(newids['ID']) 161 162 db.insert('Zoo', Name = u'Montr\xe9al Biod\xf4me', 158 )['ID'] 159 160 db['Zoo'].insert(Name = u'Montr\xe9al Biod\xf4me', 163 161 Founded = datetime.date(1992, 6, 19), 164 162 Opens = datetime.time(9, 0, 0), … … 166 164 ) 167 165 168 newids = db.insert('Zoo', Name = 'Sea_World', Admission = 60) 169 seaworld = int(newids['ID']) 166 seaworld = db['Zoo'].insert(Name = 'Sea_World', Admission = 60)['ID'] 170 167 171 168 # Animals 172 newids = db.insert('Animal', Species='Leopard', Lifespan=73.5) 173 leopardid = int(newids['ID']) 169 leopardid = db['Animal'].insert(Species='Leopard', Lifespan=73.5)['ID'] 174 170 self.assertEqual(leopardid, 1) 175 db .save('Animal',ID=leopardid, ZooID=wap,171 db['Animal'].save(ID=leopardid, ZooID=wap, 176 172 LastEscape=datetime.datetime(2004, 12, 21, 8, 15, 0, 999907)) 177 173 178 lion = db .insert('Animal',Species='Lion', ZooID=wap)['ID']179 db .insert('Animal',Species='Slug', Legs=1, Lifespan=.75,174 lion = db['Animal'].insert(Species='Lion', ZooID=wap)['ID'] 175 db['Animal'].insert(Species='Slug', Legs=1, Lifespan=.75, 180 176 # Test our 8000-byte limit 181 177 PreviousZoos=["f" * (8000 - 14)]) 182 178 183 newids = db.insert('Animal', Species='Tiger', ZooID=sdz, 184 PreviousZoos=['animal\\universe']) 185 tiger = int(newids['ID']) 179 tiger = db['Animal'].insert(Species='Tiger', ZooID=sdz, 180 PreviousZoos=['animal\\universe'])['ID'] 186 181 187 182 # Override Legs.default with itself just to make sure it works. 188 db .insert('Animal',Species='Bear', Legs=4)183 db['Animal'].insert(Species='Bear', Legs=4) 189 184 # Notice that ostrich.PreviousZoos is [], whereas leopard is None. 190 db .insert('Animal',Species='Ostrich', Legs=2, PreviousZoos=[],185 db['Animal'].insert(Species='Ostrich', Legs=2, PreviousZoos=[], 191 186 Lifespan=103.2) 192 db.insert('Animal', Species='Centipede', Legs=100) 193 194 emp = db.insert('Animal', Species='Emperor Penguin', Legs=2, ZooID=seaworld) 195 emp = int(emp['ID']) 196 adelie = db.insert('Animal', Species='Adelie Penguin', Legs=2, ZooID=seaworld) 197 adelie = int(adelie['ID']) 198 199 db.insert('Animal', Species='Millipede', Legs=1000000, ZooID=sdz, 187 db['Animal'].insert(Species='Centipede', Legs=100) 188 189 emp = db['Animal'].insert(Species='Emperor Penguin', Legs=2, ZooID=seaworld)['ID'] 190 adelie = db['Animal'].insert(Species='Adelie Penguin', Legs=2, ZooID=seaworld)['ID'] 191 192 db['Animal'].insert(Species='Millipede', Legs=1000000, ZooID=sdz, 200 193 PreviousZoos=['Wild Animal Park']) 201 194 202 ## # Add a mother and child to test relationships 203 ## bai_yun = Animal(Species='Ape', Legs=2) 204 ## box.memorize(bai_yun) # ID = 11 205 ## self.assertEqual(bai_yun.ID, 11) 206 ## hua_mei = Animal(Species='Ape', Legs=2, MotherID=bai_yun.ID) 207 ## box.memorize(hua_mei) # ID = 12 208 ## self.assertEqual(hua_mei.ID, 12) 195 # Add a mother and child to test relationships 196 bai_yun = db['Animal'].insert(Species='Ape', Legs=2)['ID'] 197 db['Animal'].insert(Species='Ape', Legs=2, MotherID=bai_yun) 209 198 210 199 # Exhibits 211 db .insert('Exhibit',Name = 'The Penguin Encounter',212 ZooID = seaworld,213 Animals = [emp, adelie],214 PettingAllowed = True,215 Acreage = 3.1,216 # See ticket #45217 Creators = (u'Richard F\xfcrst', u'Sonja Martin'),218 )219 220 db .insert('Exhibit',Name = 'Tiger River',200 db['Exhibit'].insert(Name = 'The Penguin Encounter', 201 ZooID = seaworld, 202 Animals = [emp, adelie], 203 PettingAllowed = True, 204 Acreage = 3.1, 205 # See ticket #45 206 Creators = (u'Richard F\xfcrst', u'Sonja Martin'), 207 ) 208 209 db['Exhibit'].insert(Name = 'Tiger River', 221 210 ZooID = sdz, 222 211 Animals = [tiger], … … 226 215 227 216 # Vets 228 newids = db.insert('Vet', Name = 'Charles Schroeder', ZooID = sdz) 229 cs = int(newids['ID']) 217 cs = db['Vet'].insert(Name = 'Charles Schroeder', ZooID = sdz)['ID'] 230 218 self.assertEqual(cs, db['Vet']['ID'].initial) 231 219 232 newids = db.insert('Vet', Name = 'Jim McBain', ZooID = seaworld) 233 jm = int(newids['ID']) 220 jm = db['Vet'].insert(Name = 'Jim McBain', ZooID = seaworld)['ID'] 234 221 235 222 # Visits 236 223 for d in every13days: 237 db .insert('Visit',VetID=cs, AnimalID=tiger, Date=d)224 db['Visit'].insert(VetID=cs, AnimalID=tiger, Date=d) 238 225 for d in every17days: 239 db .insert('Visit',VetID=jm, AnimalID=emp, Date=d)226 db['Visit'].insert(VetID=jm, AnimalID=emp, Date=d) 240 227 241 228 # Foods 242 dead_fish = db .insert('Food',Name="Dead Fish", Nutrition=5)['ID']243 live_fish = db .insert('Food',Name="Live Fish", Nutrition=10)['ID']244 bunnies = db .insert('Food',Name="Live Bunny Wabbit", Nutrition=10)['ID']245 steak = db .insert('Food',Name="T-Bone", Nutrition=7)['ID']229 dead_fish = db['Food'].insert(Name="Dead Fish", Nutrition=5)['ID'] 230 live_fish = db['Food'].insert(Name="Live Fish", Nutrition=10)['ID'] 231 bunnies = db['Food'].insert(Name="Live Bunny Wabbit", Nutrition=10)['ID'] 232 steak = db['Food'].insert(Name="T-Bone", Nutrition=7)['ID'] 246 233 247 234 # Foods --> add preferred and alternate foods 248 db .save('Animal',ID=lion,235 db['Animal'].save(ID=lion, 249 236 PreferredFoodID=steak, AlternateFoodID=bunnies) 250 db .save('Animal',ID=tiger,237 db['Animal'].save(ID=tiger, 251 238 PreferredFoodID=bunnies, AlternateFoodID=steak) 252 db .save('Animal',ID=emp,239 db['Animal'].save(ID=emp, 253 240 PreferredFoodID=live_fish, AlternateFoodID=dead_fish) 254 db .save('Animal',ID=adelie,241 db['Animal'].save(ID=adelie, 255 242 PreferredFoodID=live_fish, AlternateFoodID=dead_fish) 256 243 257 244 def test_3_Properties(self): 258 245 # Zoos 259 data = db.select(db['Zoo'], ('Founded', 'Opens', 'Admission'), 260 logic.Expression(lambda z: z.Name == 'Wild Animal Park')) 261 wap = data.next() 262 self.assertEqual(wap[0], datetime.date(2000, 1, 1)) 263 self.assertEqual(wap[1], datetime.time(8, 15, 59)) 246 WAP = db['Zoo'].select_one(Name='Wild Animal Park') 247 self.assertEqual(WAP['Founded'], datetime.date(2000, 1, 1)) 248 self.assertEqual(WAP['Opens'], datetime.time(8, 15, 59)) 264 249 if typerefs.fixedpoint: 265 self.assertEqual( wap[2], typerefs.fixedpoint.FixedPoint("4.95"))250 self.assertEqual(WAP['Admission'], typerefs.fixedpoint.FixedPoint("4.95")) 266 251 else: 267 self.assertEqual( wap[2], 4.95)268 269 SDZ = db['Zoo'].select_one( lambda z: z.Founded ==datetime.date(1835, 9, 13))252 self.assertEqual(WAP['Admission'], 4.95) 253 254 SDZ = db['Zoo'].select_one(Founded=datetime.date(1835, 9, 13)) 270 255 self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13)) 271 256 self.assertEqual(SDZ['Opens'], datetime.time(9, 0, 0)) … … 273 258 self.assertEqual(float(SDZ['Admission']), 0) 274 259 275 Biodome = db['Zoo'].select_one( lambda z: z.Name ==u'Montr\xe9al Biod\xf4me')260 Biodome = db['Zoo'].select_one(Name=u'Montr\xe9al Biod\xf4me') 276 261 self.assertEqual(Biodome['Name'], u'Montr\xe9al Biod\xf4me') 277 262 self.assertEqual(Biodome['Founded'], datetime.date(1992, 6, 19)) … … 286 271 seaworld = db['Zoo'].select_one(lambda z: z.Admission == float(60)) 287 272 self.assertEqual(seaworld['Name'], u'Sea_World') 288 ## 289 ## # Animals 290 ## leopard = box.unit(Animal, Species='Leopard') 291 ## self.assertEqual(leopard.Species, 'Leopard') 292 ## self.assertEqual(leopard.Legs, 4) 293 ## self.assertEqual(leopard.Lifespan, 73.5) 294 ## self.assertEqual(leopard.ZooID, WAP.ID) 295 ## self.assertEqual(leopard.PreviousZoos, None) 296 #### self.assertEqual(leopard.LastEscape, 297 #### datetime.datetime(2004, 12, 21, 8, 15, 0, 999907)) 298 ## 299 ## ostrich = box.unit(Animal, Species='Ostrich') 300 ## self.assertEqual(ostrich.Species, 'Ostrich') 301 ## self.assertEqual(ostrich.Legs, 2) 302 ## self.assertEqual(ostrich.ZooID, None) 303 ## self.assertEqual(ostrich.PreviousZoos, []) 304 ## self.assertEqual(ostrich.LastEscape, None) 305 ## 306 ## millipede = box.unit(Animal, Legs=1000000) 307 ## self.assertEqual(millipede.Species, 'Millipede') 308 ## self.assertEqual(millipede.Legs, 1000000) 309 ## self.assertEqual(millipede.ZooID, SDZ.ID) 310 ## self.assertEqual(millipede.PreviousZoos, [WAP.Name]) 311 ## self.assertEqual(millipede.LastEscape, None) 312 ## 313 ## # Test that strings in a list get decoded correctly. 314 ## # See http://projects.amor.org/dejavu/ticket/50 315 ## tiger = box.unit(Animal, Species='Tiger') 316 ## self.assertEqual(tiger.PreviousZoos, ["animal\\universe"]) 317 ## 318 ## # Test our 8000-byte limit. 319 ## # len(pickle.dumps(["f" * (8000 - 14)]) == 8000 320 ## slug = box.unit(Animal, Species='Slug') 321 ## self.assertEqual(len(slug.PreviousZoos[0]), 8000 - 14) 322 ## 323 ## # Exhibits 324 ## exes = box.recall(Exhibit) 325 ## self.assertEqual(len(exes), 2) 326 ## if exes[0].Name == 'The Penguin Encounter': 327 ## pe = exes[0] 328 ## tr = exes[1] 329 ## else: 330 ## pe = exes[1] 331 ## tr = exes[0] 332 ## self.assertEqual(pe.ZooID, seaworld.ID) 333 ## self.assertEqual(len(pe.Animals), 2) 334 ## self.assertEqual(float(pe.Acreage), 3.1) 335 ## self.assertEqual(pe.PettingAllowed, True) 336 ## self.assertEqual(pe.Creators, (u'Richard F\xfcrst', u'Sonja Martin')) 337 ## 338 ## self.assertEqual(tr.ZooID, SDZ.ID) 339 ## self.assertEqual(len(tr.Animals), 1) 340 ## self.assertEqual(float(tr.Acreage), 4) 341 ## self.assertEqual(tr.PettingAllowed, False) 342 ## 343 ## def test_4_Expressions(self): 344 ## box = arena.new_sandbox() 345 ## try: 346 ## def matches(lam, cls=Animal): 347 ## # We flush_all to ensure a DB hit each time. 348 ## box.flush_all() 349 ## return len(box.recall(cls, (lam))) 350 ## 351 ## zoos = box.recall(Zoo) 352 ## self.assertEqual(zoos[0].dirty(), False) 353 ## self.assertEqual(len(zoos), 4) 354 ## self.assertEqual(matches(lambda x: True), 12) 355 ## self.assertEqual(matches(lambda x: x.Legs == 4), 4) 356 ## self.assertEqual(matches(lambda x: x.Legs == 2), 5) 357 ## self.assertEqual(matches(lambda x: x.Legs >= 2 and x.Legs < 20), 9) 358 ## self.assertEqual(matches(lambda x: x.Legs > 10), 2) 359 ## self.assertEqual(matches(lambda x: x.Lifespan > 70), 2) 360 ## self.assertEqual(matches(lambda x: x.Species.startswith('L')), 2) 361 ## self.assertEqual(matches(lambda x: x.Species.endswith('pede')), 2) 362 ## self.assertEqual(matches(lambda x: x.LastEscape != None), 1) 363 ## self.assertEqual(matches(lambda x: x.LastEscape is not None), 1) 364 ## self.assertEqual(matches(lambda x: None == x.LastEscape), 11) 365 ## 366 ## # In operator (containedby) 367 ## self.assertEqual(matches(lambda x: 'pede' in x.Species), 2) 368 ## self.assertEqual(matches(lambda x: x.Species in ('Lion', 'Tiger', 'Bear')), 3) 369 ## 370 ## # Try In with cell references 371 ## class thing(object): pass 372 ## pet, pet2 = thing(), thing() 373 ## pet.Name, pet2.Name = 'Slug', 'Ostrich' 374 ## self.assertEqual(matches(lambda x: x.Species in (pet.Name, pet2.Name)), 2) 375 ## 376 ## # logic and other functions 377 ## self.assertEqual(matches(lambda x: dejavu.ieq(x.Species, 'slug')), 1) 378 ## self.assertEqual(matches(lambda x: dejavu.icontains(x.Species, 'PEDE')), 2) 379 ## self.assertEqual(matches(lambda x: dejavu.icontains(('Lion', 'Banana'), x.Species)), 1) 380 ## f = lambda x: dejavu.icontainedby(x.Species, ('Lion', 'Bear', 'Leopard')) 381 ## self.assertEqual(matches(f), 3) 382 ## name = 'Lion' 383 ## self.assertEqual(matches(lambda x: len(x.Species) == len(name)), 3) 384 ## 385 ## # This broke sometime in 2004. Rev 32 seems to have fixed it. 386 ## self.assertEqual(matches(lambda x: 'i' in x.Species), 7) 387 ## 388 ## # Test now(), today(), year(), month(), day() 389 ## self.assertEqual(matches(lambda x: x.Founded != None 390 ## and x.Founded < dejavu.today(), Zoo), 3) 391 ## self.assertEqual(matches(lambda x: x.LastEscape == dejavu.now()), 0) 392 ## self.assertEqual(matches(lambda x: dejavu.year(x.LastEscape) == 2004), 1) 393 ## self.assertEqual(matches(lambda x: dejavu.month(x.LastEscape) == 12), 1) 394 ## self.assertEqual(matches(lambda x: dejavu.day(x.LastEscape) == 21), 1) 395 ## 396 ## 397 ## # Test AND, OR with cannot_represent. 398 ## # Notice that we reference a method ('count') which no 399 ## # known SM handles, so it will default back to Expr.eval(). 400 ## self.assertEqual(matches(lambda x: 'p' in x.Species 401 ## and x.Species.count('e') > 1), 3) 402 ## 403 ## # This broke in MSAccess (storeado) in April 2005, due to a bug in 404 ## # db.SQLDecompiler.visit_CALL_FUNCTION (append TOS, not replace!). 405 ## box.flush_all() 406 ## e = logic.Expression(lambda x, **kw: x.LastEscape != None 407 ## and x.LastEscape >= datetime.datetime(kw['Year'], 12, 1) 408 ## and x.LastEscape < datetime.datetime(kw['Year'], 12, 31) 409 ## ) 410 ## e.bind_args(Year=2004) 411 ## units = box.recall(Animal, e) 412 ## self.assertEqual(len(units), 1) 413 ## 414 ## # Test wildcards in LIKE. This fails with SQLite <= 3.0.8, 415 ## # so make sure it's always at the end of this method so 416 ## # it doesn't preclude running the other tests. 417 ## box.flush_all() 418 ## units = box.recall(Zoo, lambda x: "_" in x.Name) 419 ## self.assertEqual(len(units), 1) 420 ## finally: 421 ## box.flush_all() 422 ## 273 274 # Animals 275 leopard = db['Animal'].select_one(lambda a: a.Species == 'Leopard') 276 self.assertEqual(leopard['Species'], 'Leopard') 277 self.assertEqual(leopard['Legs'], 4) 278 self.assertEqual(leopard['Lifespan'], 73.5) 279 self.assertEqual(leopard['ZooID'], WAP['ID']) 280 self.assertEqual(leopard['PreviousZoos'], None) 281 282 ostrich = db['Animal'].select_one(Species='Ostrich') 283 self.assertEqual(ostrich['Species'], 'Ostrich') 284 self.assertEqual(ostrich['Legs'], 2) 285 self.assertEqual(ostrich['ZooID'], None) 286 self.assertEqual(ostrich['PreviousZoos'], []) 287 self.assertEqual(ostrich['LastEscape'], None) 288 289 millipede = db['Animal'].select_one(Legs=1000000) 290 self.assertEqual(millipede['Species'], 'Millipede') 291 self.assertEqual(millipede['Legs'], 1000000) 292 self.assertEqual(millipede['ZooID'], SDZ['ID']) 293 self.assertEqual(millipede['PreviousZoos'], [WAP['Name']]) 294 self.assertEqual(millipede['LastEscape'], None) 295 296 # Test that strings in a list get decoded correctly. 297 # See http://projects.amor.org/dejavu/ticket/50 298 tiger = db['Animal'].select_one(Species='Tiger') 299 self.assertEqual(tiger['PreviousZoos'], ["animal\\universe"]) 300 301 # Test our 8000-byte limit. 302 # len(pickle.dumps(["f" * (8000 - 14)]) == 8000 303 slug = db['Animal'].select_one(Species='Slug') 304 self.assertEqual(len(slug['PreviousZoos'][0]), 8000 - 14) 305 306 # Exhibits 307 exes = list(db['Exhibit'].select_all()) 308 self.assertEqual(len(exes), 2) 309 if exes[0]['Name'] == 'The Penguin Encounter': 310 pe = exes[0] 311 tr = exes[1] 312 else: 313 pe = exes[1] 314 tr = exes[0] 315 self.assertEqual(pe['ZooID'], seaworld['ID']) 316 self.assertEqual(len(pe['Animals']), 2) 317 self.assertEqual(float(pe['Acreage']), 3.1) 318 self.assertEqual(pe['PettingAllowed'], True) 319 self.assertEqual(pe['Creators'], (u'Richard F\xfcrst', u'Sonja Martin')) 320 321 self.assertEqual(tr['ZooID'], SDZ['ID']) 322 self.assertEqual(len(tr['Animals']), 1) 323 self.assertEqual(float(tr['Acreage']), 4) 324 self.assertEqual(tr['PettingAllowed'], False) 325 326 def test_4_Expressions(self): 327 def matches(lam, tkey='Animal'): 328 return len(list(db[tkey].select_all(lam))) 329 330 self.assertEqual(matches(None, 'Zoo'), 4) 331 self.assertEqual(matches(lambda x: True), 12) 332 self.assertEqual(matches(lambda x: x.Legs == 4), 4) 333 self.assertEqual(matches(lambda x: x.Legs == 2), 5) 334 self.assertEqual(matches(lambda x: x.Legs >= 2 and x.Legs < 20), 9) 335 self.assertEqual(matches(lambda x: x.Legs > 10), 2) 336 self.assertEqual(matches(lambda x: x.Lifespan > 70), 2) 337 self.assertEqual(matches(lambda x: x.Species.startswith('L')), 2) 338 self.assertEqual(matches(lambda x: x.Species.endswith('pede')), 2) 339 self.assertEqual(matches(lambda x: x.LastEscape != None), 1) 340 self.assertEqual(matches(lambda x: x.LastEscape is not None), 1) 341 self.assertEqual(matches(lambda x: None == x.LastEscape), 11) 342 343 # In operator (containedby) 344 self.assertEqual(matches(lambda x: 'pede' in x.Species), 2) 345 self.assertEqual(matches(lambda x: x.Species in ('Lion', 'Tiger', 'Bear')), 3) 346 347 # Try In with cell references 348 class thing(object): pass 349 pet, pet2 = thing(), thing() 350 pet.Name, pet2.Name = 'Slug', 'Ostrich' 351 self.assertEqual(matches(lambda x: x.Species in (pet.Name, pet2.Name)), 2) 352 353 # logic and other functions 354 import dejavu 355 self.assertEqual(matches(lambda x: dejavu.ieq(x.Species, 'slug')), 1) 356 self.assertEqual(matches(lambda x: dejavu.icontains(x.Species, 'PEDE')), 2) 357 self.assertEqual(matches(lambda x: dejavu.icontains(('Lion', 'Banana'), x.Species)), 1) 358 f = lambda x: dejavu.icontainedby(x.Species, ('Lion', 'Bear', 'Leopard')) 359 self.assertEqual(matches(f), 3) 360 name = 'Lion' 361 self.assertEqual(matches(lambda x: len(x.Species) == len(name)), 3) 362 363 # This broke sometime in 2004. Rev 32 seems to have fixed it. 364 self.assertEqual(matches(lambda x: 'i' in x.Species), 7) 365 366 # Test now(), today(), year(), month(), day() 367 self.assertEqual(matches(lambda x: x.Founded != None 368 and x.Founded < dejavu.today(), 'Zoo'), 3) 369 self.assertEqual(matches(lambda x: x.LastEscape == dejavu.now()), 0) 370 self.assertEqual(matches(lambda x: dejavu.year(x.LastEscape) == 2004), 1) 371 self.assertEqual(matches(lambda x: dejavu.month(x.LastEscape) == 12), 1) 372 self.assertEqual(matches(lambda x: dejavu.day(x.LastEscape) == 21), 1) 373 374 # Test AND, OR with cannot_represent. 375 # Notice that we reference a method ('count') which no 376 # known SM handles, so it will default back to Expr.eval(). 377 self.assertEqual(matches(lambda x: 'p' in x.Species 378 and x.Species.count('e') > 1), 3) 379 380 # This broke in MSAccess (storeado) in April 2005, due to a bug in 381 # SQLDecompiler.visit_CALL_FUNCTION (append TOS, not replace!). 382 e = logic.Expression(lambda x, **kw: x.LastEscape != None 383 and x.LastEscape >= datetime.datetime(kw['Year'], 12, 1) 384 and x.LastEscape < datetime.datetime(kw['Year'], 12, 31) 385 ) 386 e.bind_args(Year=2004) 387 self.assertEqual(matches(e), 1) 388 389 # Test wildcards in LIKE. This fails with SQLite <= 3.0.8, 390 # so make sure it's always at the end of this method so 391 # it doesn't preclude running the other tests. 392 self.assertEqual(matches(lambda x: "_" in x.Name, 'Zoo'), 1) 393 423 394 ## def test_5_Aggregates(self): 424 395 ## box = arena.new_sandbox() … … 474 445 ## finally: 475 446 ## box.flush_all() 476 ## 477 ## def test_6_Editing(self): 478 ## # Edit 479 ## box = arena.new_sandbox() 480 ## try: 481 ## SDZ = box.unit(Zoo, Name='San Diego Zoo') 482 ## SDZ.Name = 'The San Diego Zoo' 483 ## SDZ.Founded = datetime.date(1900, 1, 1) 484 ## SDZ.Opens = datetime.time(7, 30, 0) 485 ## SDZ.Admission = "35.00" 486 ## finally: 487 ## box.flush_all() 488 ## 489 ## # Test edits 490 ## box = arena.new_sandbox() 491 ## try: 492 ## SDZ = box.unit(Zoo, Name='The San Diego Zoo') 493 ## self.assertEqual(SDZ.Name, 'The San Diego Zoo') 494 ## self.assertEqual(SDZ.Founded, datetime.date(1900, 1, 1)) 495 ## self.assertEqual(SDZ.Opens, datetime.time(7, 30, 0)) 496 ## if typerefs.fixedpoint: 497 ## self.assertEqual(SDZ.Admission, typerefs.fixedpoint.FixedPoint(35, 2)) 498 ## else: 499 ## self.assertEqual(SDZ.Admission, 35.0) 500 ## finally: 501 ## box.flush_all() 502 ## 503 ## # Change it back 504 ## box = arena.new_sandbox() 505 ## try: 506 ## SDZ = box.unit(Zoo, Name='The San Diego Zoo') 507 ## SDZ.Name = 'San Diego Zoo' 508 ## SDZ.Founded = datetime.date(1835, 9, 13) 509 ## SDZ.Opens = datetime.time(9, 0, 0) 510 ## SDZ.Admission = "0" 511 ## finally: 512 ## box.flush_all() 513 ## 514 ## # Test re-edits 515 ## box = arena.new_sandbox() 516 ## try: 517 ## SDZ = box.unit(Zoo, Name='San Diego Zoo') 518 ## self.assertEqual(SDZ.Name, 'San Diego Zoo') 519 ## self.assertEqual(SDZ.Founded, datetime.date(1835, 9, 13)) 520 ## self.assertEqual(SDZ.Opens, datetime.time(9, 0, 0)) 521 ## if typerefs.fixedpoint: 522 ## self.assertEqual(SDZ.Admission, typerefs.fixedpoint.FixedPoint(0, 2)) 523 ## else: 524 ## self.assertEqual(SDZ.Admission, 0.0) 525 ## finally: 526 ## box.flush_all() 527 ## 447 448 def test_6_Editing(self): 449 # Edit 450 SDZ = db['Zoo'].select_one(Name='San Diego Zoo') 451 db['Zoo'].save(ID=SDZ['ID'], Name='The San Diego Zoo', 452 Founded = datetime.date(1900, 1, 1), 453 Opens = datetime.time(7, 30, 0), 454 Admission = "35.00") 455 456 # Test edits 457 SDZ = db['Zoo'].select_one(Name='The San Diego Zoo') 458 self.assertEqual(SDZ['Name'], 'The San Diego Zoo') 459 self.assertEqual(SDZ['Founded'], datetime.date(1900, 1, 1)) 460 self.assertEqual(SDZ['Opens'], datetime.time(7, 30, 0)) 461 if typerefs.fixedpoint: 462 self.assertEqual(SDZ['Admission'], typerefs.fixedpoint.FixedPoint(35, 2)) 463 else: 464 self.assertEqual(SDZ['Admission'], 35.0) 465 466 # Change it back 467 db['Zoo'].save(ID=SDZ['ID'], Name = 'San Diego Zoo', 468 Founded = datetime.date(1835, 9, 13), 469 Opens = datetime.time(9, 0, 0), 470 Admission = "0") 471 472 # Test re-edits 473 SDZ = db['Zoo'].select_one(Name='San Diego Zoo') 474 self.assertEqual(SDZ['Name'], 'San Diego Zoo') 475 self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13)) 476 self.assertEqual(SDZ['Opens'], datetime.time(9, 0, 0)) 477 if typerefs.fixedpoint: 478 self.assertEqual(SDZ['Admission'], typerefs.fixedpoint.FixedPoint(0, 2)) 479 else: 480 self.assertEqual(SDZ['Admission'], 0.0) 481 528 482 ## def test_7_Multirecall(self): 529 483 ## box = arena.new_sandbox()
