Contact: fumanchu@aminus.org

Log in as guest/geniusql to create tickets

root/trunk/geniusql/test/zoo_fixture.py

Revision 325 (checked in by lakin, 2 months ago)

As of sqlite3 2.6.0 (python module version) - we can put ON clauses for each join in a multiple join. In fact, not doing so gets incorrect results.

  • Property svn:eol-style set to native
Line 
1 """Test fixture for Geniusql."""
2
3 import datetime
4
5
6 try:
7     set
8 except NameError:
9     from sets import Set as set
10
11 import sys
12 import threading
13 import time
14 import traceback
15 import unittest
16 import warnings
17
18
19 import geniusql
20 from geniusql import errors, typerefs
21 from geniusql.test import tools, test
22
23 from geniusql import logic, logicfuncs
24 logicfuncs.init()
25
26
27 Jan_1_2001 = datetime.date(2001, 1, 1)
28 every13days = [Jan_1_2001 + datetime.timedelta(x * 13) for x in range(20)]
29 every17days = [Jan_1_2001 + datetime.timedelta(x * 17) for x in range(20)]
30 del x
31
32
33 # TODO STRABS: Evaluate if we want this or not, I think we do.
34 warnings.filterwarnings("ignore", "The given precision and scale*")
35
36 class ZooTestsBaseClass(unittest.TestCase):
37     db = None
38     schema = None
39     provider = None
40
41 class Phase1_ZooTests(ZooTestsBaseClass):
42     """
43     The ZooTests setup the schema, and create the tables.
44
45     The rest of the tests depend on them, so we do this one as a first step
46     and the rest as a second step.  Because nose orders its tests alphabetically
47     we will call them "Phase1" and "Phase2" respectively.
48     """
49
50     def assertEqualSet(self, a, b):
51         self.assertEqual(set(a), set(b))
52
53     def test_01_create_tables(self):
54         Animal = self.schema.table('Animal')
55         Animal['ID'] = self.schema.column(int, autoincrement=True, key=True)
56 ##        Animal.add_index('ID')
57         Animal['ZooID'] = self.schema.column(int)
58         Animal['Name'] = self.schema.column(hints={'bytes': 100})
59         Animal['Species'] = self.schema.column(hints={'bytes': 100})
60         Animal['Legs'] = self.schema.column(int, default=4)
61         Animal['PreviousZoos'] = self.schema.column(list, hints={'bytes': 8000})
62         Animal['LastEscape'] = self.schema.column(datetime.datetime)
63         Animal['Birthdate'] = self.schema.column(datetime.datetime, hints={'timezone_aware': True})
64         Animal['Lifespan'] = self.schema.column(float, hints={'precision': 4})
65         Animal['Age'] = self.schema.column(datetime.timedelta)
66         Animal['MotherID'] = self.schema.column(int)
67         Animal['PreferredFoodID'] = self.schema.column(int)
68         Animal['AlternateFoodID'] = self.schema.column(int)
69         Animal.add_index('ZooID')
70         Animal.references['Animal'] = ('ID', 'Animal', 'MotherID')
71         Animal.references['Visit'] = ('ID', 'Visit', 'AnimalID')
72         self.schema['Animal'] = Animal
73
74         Zoo = self.schema.table('Zoo')
75         Zoo['ID'] = self.schema.column(int, autoincrement=True, key=True)
76         Zoo.add_index('ID')
77         Zoo['Name'] = self.schema.column()
78
79         Zoo['Founded'] = self.schema.column(datetime.date)
80         Zoo['Opens'] = self.schema.column(datetime.time)
81         Zoo['LastEscape'] = self.schema.column(datetime.datetime)
82
83         if typerefs.fixedpoint:
84             # Explicitly set precision and scale so test_msaccess
85             # can test CURRENCY type
86             Zoo['Admission'] = self.schema.column(typerefs.fixedpoint.FixedPoint,
87                                          hints={'precision': 6, 'scale': 2})
88         else:
89             Zoo['Admission'] = self.schema.column(float)
90
91         Zoo.references['Animal'] = ('ID', 'Animal', 'ZooID')
92         self.schema['Zoo'] = Zoo
93
94         Food = self.schema.table('Food')
95         Food['ID'] = self.schema.column(int, autoincrement=True, key=True)
96         Food.add_index('ID')
97         Food['Name'] = self.schema.column()
98         Food['NutritionValue'] = self.schema.column(int)
99         Food.references['Animal'] = ('ID', 'Animal', 'PreferredFoodID')
100         Animal.references['Alternate Food'] = ('AlternateFoodID', 'Food', 'ID')
101         self.schema['Food'] = Food
102
103         Vet = self.schema.table('Vet')
104         Vet['ID'] = c = self.schema.column(int, autoincrement=True, key=True)
105         c.initial = 200
106         Vet.add_index('ID')
107         Vet['Name'] = self.schema.column()
108         Vet['NameLen'] = self.schema.column(int)
109         Vet['ZooID'] = self.schema.column(int)
110         Vet.add_index('ZooID')
111         Vet['City'] = self.schema.column()
112         Vet.references['Zoo'] = ('ZooID', 'Zoo', 'ID')
113         Vet.references['Visit'] = ('ID', 'Visit', 'VetID')
114         self.schema['Vet'] = Vet
115
116         Visit = self.schema.table('Visit')
117         Visit['ID'] = self.schema.column(int, autoincrement=True, key=True)
118         Visit.add_index('ID')
119         Visit['VetID'] = self.schema.column(int)
120         Visit.add_index('VetID')
121         Visit['ZooID'] = self.schema.column(int)
122         Visit.add_index('ZooID')
123         Visit['AnimalID'] = self.schema.column(int)
124         Visit.add_index('AnimalID')
125         Visit['Date'] = self.schema.column(datetime.date)
126         Visit.references['Animal'] = ('AnimalID', 'Animal', 'ID')
127         self.schema['Visit'] = Visit
128
129         Exhibit = self.schema.table('Exhibit')
130         # Make this a string to help test vs unicode.
131         # Also, use a smaller field size so Firebird doesn't choke
132         # when forming the primary key.
133         # See http://www.volny.cz/iprenosil/interbase/ip_ib_indexcalculator.htm
134         Exhibit['Name'] = self.schema.column(str, key=True, hints={'bytes': 192})
135         Exhibit.add_index('Name')
136         Exhibit['ZooID'] = self.schema.column(int, key=True)
137         Exhibit.add_index('ZooID')
138         Exhibit['Animals'] = self.schema.column(list)
139         Exhibit['PettingAllowed'] = self.schema.column(bool)
140         Exhibit['Creators'] = self.schema.column(tuple)
141
142         if typerefs.decimal:
143             Exhibit['Acreage'] = self.schema.column(typerefs.decimal.Decimal)
144         else:
145             Exhibit['Acreage'] = self.schema.column(float)
146
147         Exhibit.references['Zoo'] = ('ZooID', 'Zoo', 'ID')
148         self.schema['Exhibit'] = Exhibit
149
150         t = self.schema.table('NothingToDoWithZoos')
151         t['ALong'] = self.schema.column(long, hints={'bytes': 1})
152         t['AFloat'] = self.schema.column(float, hints={'precision': 1})
153         if typerefs.decimal:
154             t['ADecimal'] = self.schema.column(typerefs.decimal.Decimal,
155                                       hints={'precision': 1, 'scale': 1})
156         if typerefs.fixedpoint:
157             t['AFixed'] = self.schema.column(typerefs.fixedpoint.FixedPoint,
158                                     hints={'precision': 1, 'scale': 1})
159         self.schema['NothingToDoWithZoos'] = t
160
161     def test_02_populate(self):
162         wap = self.schema['Zoo'].insert(Name='Wild Animal Park',
163                            Founded=datetime.date(2000, 1, 1),
164                            # 59 can give rounding errors with divmod, which
165                            # AdapterFromADO needs to correct.
166                            Opens=datetime.time(8, 15, 59),
167                            LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7),
168                            Admission=4.95,
169                            )['ID']
170
171         sdz = self.schema['Zoo'].insert(Name = 'San Diego Zoo',
172                            # This early date should play havoc with a number
173                            # of implementations.
174                            Founded = datetime.date(1835, 9, 13),
175                            Opens = datetime.time(9, 0, 0),
176                            Admission = 0,
177                            )['ID']
178
179         self.schema['Zoo'].insert(Name = u'Montr\xe9al Biod\xf4me',
180                   Founded = datetime.date(1992, 6, 19),
181                   Opens = datetime.time(9, 0, 0),
182                   Admission = 11.75,
183                   )
184
185         seaworld = self.schema['Zoo'].insert(Name = 'Sea_World', Admission = 60)['ID']
186
187         # Let's add a crazy futuristic Zoo to test large date values.
188         lp = self.schema['Zoo'].insert(Name = 'Luna Park',
189                                   Founded = datetime.date(2072, 7, 17),
190                                   Opens = datetime.time(0, 0, 0),
191                                   Admission = 134.95,
192                                   )['ID']
193
194         # Animals
195         leopardid = self.schema['Animal'].insert(Species='Leopard', Lifespan=73.5,
196                                             Age=datetime.timedelta(365 * 10))['ID']
197         self.assertEqual(leopardid, 1)
198         self.schema['Animal'].save(ID=leopardid, ZooID=wap,
199                 LastEscape=datetime.datetime(2004, 12, 21, 8, 15, 0, 999907))
200
201         lion = self.schema['Animal'].insert(Species='Lion', ZooID=wap)['ID']
202         self.schema['Animal'].insert(Species='Slug', Legs=1, Lifespan=.75,
203                                 # Test our 8000-byte limit (ok, 7900)
204                                 PreviousZoos=["f" * (7900 - 14)])
205
206         tiger = self.schema['Animal'].insert(Species='Tiger', ZooID=sdz,
207                                     PreviousZoos=['animal\\universe'])['ID']
208
209         # Override Legs.default with itself just to make sure it works.
210         self.schema['Animal'].insert(Species='Bear', Legs=4)
211         # Notice that ostrich.PreviousZoos is [], whereas leopard is None.
212         self.schema['Animal'].insert(Species='Ostrich', Legs=2, PreviousZoos=[],
213                             Lifespan=103.2)
214         self.schema['Animal'].insert(Species='Centipede', Legs=100)
215
216         emp = self.schema['Animal'].insert(Species='Emperor Penguin', Legs=2, ZooID=seaworld)['ID']
217         adelie = self.schema['Animal'].insert(Species='Adelie Penguin', Legs=2, ZooID=seaworld)['ID']
218
219         self.schema['Animal'].insert(Species='Millipede', Legs=1000000, ZooID=sdz,
220                   PreviousZoos=['Wild Animal Park'])
221
222         # Add a mother and child to test relationships
223         bai_yun = self.schema['Animal'].insert(Species='Ape', Name='Bai Yun', Legs=2)
224         self.schema['Animal'].insert(Species='Ape', Name='Hua Mei', Legs=2,
225                                 MotherID=bai_yun['ID'])
226
227         # Exhibits
228         self.schema['Exhibit'].insert(Name = 'The Penguin Encounter',
229                                  ZooID = seaworld,
230                                  Animals = [emp, adelie],
231                                  PettingAllowed = True,
232                                  Acreage = 3.1,
233                                  # See http://www.aminus.net/dejavu/ticket/45
234                                  Creators = (u'Richard F\xfcrst',
235                                              u'Sonja Martin'),
236                                  )
237
238         self.schema['Exhibit'].insert(Name = 'Tiger River',
239                                  ZooID = sdz,
240                                  Animals = [tiger],
241                                  PettingAllowed = False,
242                                  Acreage = 4,
243                                  )
244
245         # Vets
246         cs = self.schema['Vet'].insert(Name = 'Charles Schroeder', ZooID = sdz)
247         self.assertEqual(cs['ID'], self.schema['Vet']['ID'].initial)
248
249         jm = self.schema['Vet'].insert(Name = 'Jim McBain', ZooID = seaworld)['ID']
250
251         # Visits
252         for d in every13days:
253             self.schema['Visit'].insert(VetID=cs['ID'], AnimalID=tiger, Date=d)
254         for d in every17days:
255             self.schema['Visit'].insert(VetID=jm, AnimalID=emp, Date=d)
256
257         # Foods
258         dead_fish = self.schema['Food'].insert(Name="Dead Fish", Nutrition=5)['ID']
259         live_fish = self.schema['Food'].insert(Name="Live Fish", Nutrition=10)['ID']
260         bunnies = self.schema['Food'].insert(Name="Live Bunny Wabbit", Nutrition=10)['ID']
261         steak = self.schema['Food'].insert(Name="T-Bone", Nutrition=7)['ID']
262
263         # Foods --> add preferred and alternate foods
264         self.schema['Animal'].save(ID=lion,
265                 PreferredFoodID=steak, AlternateFoodID=bunnies)
266         self.schema['Animal'].save(ID=tiger,
267                 PreferredFoodID=bunnies, AlternateFoodID=steak)
268         self.schema['Animal'].save(ID=emp,
269                 PreferredFoodID=live_fish, AlternateFoodID=dead_fish)
270         self.schema['Animal'].save(ID=adelie,
271                 PreferredFoodID=live_fish, AlternateFoodID=dead_fish)
272
273     def test_03_Properties(self):
274         # Zoos
275         WAP = self.schema['Zoo'].select(Name='Wild Animal Park')
276         self.assertEqual(WAP['Founded'], datetime.date(2000, 1, 1))
277         self.assertEqual(WAP['Opens'], datetime.time(8, 15, 59))
278         if typerefs.fixedpoint:
279             self.assertEqual(WAP['Admission'], typerefs.fixedpoint.FixedPoint("4.95"))
280         else:
281             self.assertEqual(WAP['Admission'], 4.95)
282
283         SDZ = self.schema['Zoo'].select(Founded=datetime.date(1835, 9, 13))
284         self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13))
285         self.assertEqual(SDZ['Opens'], datetime.time(9, 0, 0))
286         self.assertEqual(SDZ['LastEscape'], None)
287         self.assertEqual(float(SDZ['Admission']), 0)
288
289         Biodome = self.schema['Zoo'].select(Name=u'Montr\xe9al Biod\xf4me')
290         self.assertEqual(Biodome['Name'], u'Montr\xe9al Biod\xf4me')
291         self.assertEqual(Biodome['Founded'], datetime.date(1992, 6, 19))
292         self.assertEqual(Biodome['Opens'], datetime.time(9, 0, 0))
293         self.assertEqual(Biodome['LastEscape'], None)
294         self.assertEqual(float(Biodome['Admission']), 11.75)
295
296         if typerefs.fixedpoint:
297             seaworld = self.schema['Zoo'].select(lambda z: z.Admission ==
298                                             typerefs.fixedpoint.FixedPoint(60))
299         else:
300             seaworld = self.schema['Zoo'].select(lambda z: z.Admission == float(60))
301         self.assertEqual(seaworld['Name'], u'Sea_World')
302
303         # Animals
304         leopard = self.schema['Animal'].select(lambda a: a.Species == 'Leopard')
305         self.assertEqual(leopard['Species'], 'Leopard')
306         self.assertEqual(leopard['Legs'], 4)
307         self.assertEqual(leopard['Lifespan'], 73.5)
308         self.assertEqual(leopard['ZooID'], WAP['ID'])
309         self.assertEqual(leopard['PreviousZoos'], None)
310
311         ostrich = self.schema['Animal'].select(Species='Ostrich')
312         self.assertEqual(ostrich['Species'], 'Ostrich')
313         self.assertEqual(ostrich['Legs'], 2)
314         self.assertEqual(ostrich['ZooID'], None)
315         self.assertEqual(ostrich['PreviousZoos'], [])
316         self.assertEqual(ostrich['LastEscape'], None)
317
318         millipede = self.schema['Animal'].select(Legs=1000000)
319         self.assertEqual(millipede['Species'], 'Millipede')
320         self.assertEqual(millipede['Legs'], 1000000)
321         self.assertEqual(millipede['ZooID'], SDZ['ID'])
322         self.assertEqual(millipede['PreviousZoos'], [WAP['Name']])
323         self.assertEqual(millipede['LastEscape'], None)
324
325         # Test that strings in a list get decoded correctly.
326         # See http://projects.amor.org/dejavu/ticket/50
327         tiger = self.schema['Animal'].select(Species='Tiger')
328         self.assertEqual(tiger['PreviousZoos'], ["animal\\universe"])
329
330         # Test our 8000-byte limit (ok, 7900; the row is too long)
331         # len(pickle.dumps(["f" * (7900 - 14)]) == 7900
332         slug = self.schema['Animal'].select(Species='Slug')
333         self.assertEqual(len(slug['PreviousZoos'][0]), 7900 - 14)
334
335         # Exhibits
336         exes = self.schema['Exhibit'].select_all()
337         self.assertEqual(len(exes), 2)
338         if exes[0]['Name'] == 'The Penguin Encounter':
339             pe = exes[0]
340             tr = exes[1]
341         else:
342             pe = exes[1]
343             tr = exes[0]
344         self.assertEqual(pe['ZooID'], seaworld['ID'])
345         self.assertEqual(len(pe['Animals']), 2)
346         self.assertEqual(float(pe['Acreage']), 3.1)
347         self.assertEqual(pe['PettingAllowed'], True)
348         self.assertEqual(pe['Creators'], (u'Richard F\xfcrst', u'Sonja Martin'))
349
350         self.assertEqual(tr['ZooID'], SDZ['ID'])
351         self.assertEqual(len(tr['Animals']), 1)
352         self.assertEqual(float(tr['Acreage']), 4)
353         self.assertEqual(tr['PettingAllowed'], False)
354
355     def test_04_Expressions(self):
356         def matches(lam, tkey='Animal'):
357             data = self.schema[tkey].select_all(lam)
358             return len(data)
359
360         self.assertEqual(matches(None, 'Zoo'), 5)
361         self.assertEqual(matches(lambda x: True), 12)
362         self.assertEqual(matches(lambda x: x.Legs == 4), 4)
363         self.assertEqual(matches(lambda x: x.Legs == 2), 5)
364         self.assertEqual(matches(lambda x: x.Legs >= 2 and x.Legs < 20), 9)
365         self.assertEqual(matches(lambda x: x.Legs > 10), 2)
366         self.assertEqual(matches(lambda x: x.Lifespan > 70), 2)
367         self.assertEqual(matches(lambda x: x.Species.startswith('L')), 2)
368         self.assertEqual(matches(lambda x: x.Species.endswith('pede')), 2)
369
370         self.assertEqual(matches(lambda x: x.LastEscape != None), 1)
371         self.assertEqual(matches(lambda x: x.LastEscape is not None), 1)
372         self.assertEqual(matches(lambda x: None == x.LastEscape), 11)
373
374         # In operator (containedby)
375         self.assertEqual(matches(lambda x: 'pede' in x.Species), 2)
376         self.assertEqual(matches(lambda x: x.Species in ('Lion', 'Tiger', 'Bear')), 3)
377
378         # Try In with cell references
379         class thing(object): pass
380         pet, pet2 = thing(), thing()
381         pet.Name, pet2.Name = 'Slug', 'Ostrich'
382         self.assertEqual(matches(lambda x: x.Species in (pet.Name, pet2.Name)), 2)
383
384         # logic and other functions
385         self.assertEqual(matches(lambda x: ieq(x.Species, 'slug')), 1)
386         self.assertEqual(matches(lambda x: icontains(x.Species, 'PEDE')), 2)
387         self.assertEqual(matches(lambda x: icontains(('Lion', 'Banana'), x.Species)), 1)
388         f = lambda x: icontainedby(x.Species, ('Lion', 'Bear', 'Leopard'))
389         self.assertEqual(matches(f), 3)
390         name = 'Lion'
391         self.assertEqual(matches(lambda x: len(x.Species) == len(name)), 3)
392
393         # This broke sometime in 2004. Rev 32 seems to have fixed it.
394         self.assertEqual(matches(lambda x: 'i' in x.Species), 7)
395
396         # Test now(), today(), year(), month(), day()
397         self.assertEqual(matches(lambda x: x.Founded != None
398                                  and x.Founded < today(), 'Zoo'), 3)
399         self.assertEqual(matches(lambda x: x.LastEscape == now()), 0)
400         self.assertEqual(matches(lambda x: year(x.LastEscape) == 2004), 1)
401         self.assertEqual(matches(lambda x: month(x.LastEscape) == 12), 1)
402         self.assertEqual(matches(lambda x: day(x.LastEscape) == 21), 1)
403
404         # Test AND, OR with CannotRepresent.
405         # Notice that we reference a method ('count') which no
406         # known SM handles, so it will default back to Expr.eval().
407         # [u'Leopard', u'Slug', u'Bear', u'Ostrich', u'Centipede',
408         #  u'Millipede', u'Ape', u'Ape', u'Lion', u'Tiger',
409         #  u'Emperor Penguin', u'Adelie Penguin']
410         e = lambda x: 'p' in x.Species and x.Species.count('e') > 1
411         self.assertEqual(set([a['Species'] for a in self.schema['Animal'].select_all(e)]),
412                          set(['Centipede', 'Millipede', 'Emperor Penguin']))
413
414         # This broke in MSAccess (storeado) in April 2005, due to a bug in
415         # SQLDecompiler.visit_CALL_FUNCTION (append TOS, not replace!).
416         e = logic.Expression(lambda x, **kw: x.LastEscape != None
417                              and x.LastEscape >= datetime.datetime(kw['Year'], 12, 1)
418                              and x.LastEscape < datetime.datetime(kw['Year'], 12, 31)
419                              )
420         e.bind_args(Year=2004)
421         self.assertEqual(matches(e), 1)
422
423         # Test wildcards in LIKE. This fails with SQLite <= 3.0.8,
424         # so make sure it's always at the end of this method so
425         # it doesn't preclude running the other tests.
426         self.assertEqual(matches(lambda x: "_" in x.Name, 'Zoo'), 1)
427
428         # Temporarily insert a zoo we can test the escaping of
429         # % symbols in LIKE queries
430         some_other_zoo = self.schema['Zoo'].insert(Name = 'Other%%Zoo', Admission = 60)['ID']
431         try:
432             self.assertEqual(matches(lambda x: "%" in x.Name, 'Zoo'), 1)
433         finally:
434             self.schema['Zoo'].delete(ID=some_other_zoo)
435
436         # I noticed this failed on PostgreSQL when testing Table.delete_all.
437         # Granted, not all float comparisons should work perfectly
438         # (and we should mark more of them imperfect), but a straight
439         # comparison with a known INSERTed value should probably work.
440         self.assertEqual(matches(lambda x: x.Lifespan == 103.2), 1)
441
442         # Test scalar columns
443         data = self.db.select((self.schema['Zoo'], lambda z: [z.ID, 'foo']))
444         self.assertEqual(list(data), [[1, 'foo'],
445                                       [2, 'foo'],
446                                       [3, 'foo'],
447                                       [4, 'foo'],
448                                       [5, 'foo'],
449                                       ])
450         # Test SELECT from no table at all
451         data = self.db.select((self.schema, lambda: ['bar'])).scalar()
452         self.assertEqual(data, 'bar')
453
454     def assertEqualSecs(self, dt1, dt2, tolerance=1):
455         """Assert that the two datetimes are within the given tolerance (secs)."""
456         diff = abs(dt1 - dt2)
457         self.assert_(diff < datetime.timedelta(0, tolerance),
458                      "%r and %r should be less than %s seconds apart, "
459                      "but differ by %r instead." % (dt1, dt2, tolerance, diff))
460
461     def test_04a_Binary_Expressions(self):
462
463         def results(tablekey, fieldlist, restriction=None):
464             data = list(self.db.select((self.schema[tablekey], fieldlist, restriction)))
465             data.sort()
466             return data
467
468         td = datetime.timedelta
469
470         # --------------------------- datetime --------------------------- #
471
472         lastescape = self.schema['Zoo'].select(Name='Wild Animal Park')['LastEscape']
473
474         # datetime + timedelta -> datetime
475         nextmonth = results('Zoo', lambda z: [z.LastEscape + td(28)],
476                             lambda z: z.LastEscape != None)
477         self.assertEqual(nextmonth, [[lastescape + td(28)]])
478
479         # datetime - timedelta -> datetime
480         ago = results('Zoo', lambda z: [z.LastEscape - td(14, 32)],
481                       lambda z: z.LastEscape != None)
482         self.assertEqual(ago, [[lastescape - td(14, 32)]])
483
484         # timedelta + datetime -> datetime
485         ago = results('Zoo', lambda z: [td(965, 86333, 300) + z.LastEscape],
486                       lambda z: z.LastEscape != None)
487         self.assertEqualSecs(ago[0][0], td(965, 86333, 300) + lastescape)
488
489         # datetime - datetime -> timedelta  (including the 'now' func)
490         elapsed = results('Zoo', lambda z: (z.Name, now() - z.LastEscape))
491         present = datetime.datetime.now()
492         self.assertEqual(elapsed[:4],
493                          [[u'Luna Park', None],
494                           [u'Montr\xe9al Biod\xf4me', None],
495                           [u'San Diego Zoo', None],
496                           [u'Sea_World', None],
497                           ]
498                          )
499         self.assertEqualSecs(elapsed[4][1], present - lastescape, tolerance=2)
500
501         # -------------------------- timedelta -------------------------- #
502
503         # timedelta + timedelta -> timedelta
504         leo_age = self.schema['Animal'].select(Species='Leopard')['Age']
505         for days in [1000, 10, 1, 0, -1, -10, -500, -1000]:
506             for secs in (0, 1, 33, 100, 1000, 32767, 86399):
507                 diff = td(days, secs)
508                 data = results('Animal',
509                                lambda z: [z.Age + diff],
510                                lambda z: z.LastEscape != None)
511                 self.assertEqual(data[0][0], leo_age + diff)
512
513         # timedelta - timedelta -> timedelta
514         leo_age = self.schema['Animal'].select(Species='Leopard')['Age']
515         for days in [1000, 10, 1, 0, -1, -10, -500, -1000]:
516             for secs in (0, 1, 33, 100, 1000, 32767, 86399):
517                 diff = td(days, secs)
518                 data = results('Animal',
519                                lambda z: [z.Age - diff],
520                                lambda z: z.LastEscape != None)
521                 self.assertEqual(data[0][0], leo_age - diff)
522
523         # ----------------------------- date ----------------------------- #
524
525         founded = results('Zoo', ['Name', 'Founded'], lambda z: z.Founded != None)
526
527         # date + timedelta -> date
528         data = results('Zoo', lambda z: (z.Name, z.Founded + td(365)),
529                        lambda z: z.Founded != None)
530         self.assertEqual(data, [[name, f + td(365)] for name, f in founded])
531
532         # date - timedelta -> date
533         data = results('Zoo', lambda z: (z.Name, z.Founded - td(14, 32)),
534                        lambda z: z.Founded != None)
535         self.assertEqual(data, [[name, f - td(14, 32)] for name, f in founded])
536
537         # timedelta + date -> date
538         data = results('Zoo', lambda z: (z.Name, td(965, 86333, 300) + z.Founded),
539                        lambda z: z.Founded != None)
540         self.assertEqual(data, [[name, td(965, 86333, 300) + f]
541                                 for name, f in founded])
542
543         # date - date -> timedelta
544         data = results('Zoo',
545                        lambda z: (z.Name, z.Founded - datetime.date(1999, 12, 31)),
546                        lambda z: z.Founded != None)
547         self.assertEqual(data, [[name, f - datetime.date(1999, 12, 31)]
548                                 for name, f in founded])
549
550         # date - date -> timedelta  (including the 'today' func)
551         # If this fails only after midnight GMT, then your provider's
552         # "today" function is probably using UTC, not local time.
553         data = results('Zoo', lambda z: (z.Name, today() - z.Founded),
554                        lambda z: z.Founded != None)
555         self.assertEqual(data, [[name, datetime.date.today() - f]
556                                 for name, f in founded])
557
558         # ----------------------------- time ----------------------------- #
559         # TODO - STRABS: Get the time tests working.
560         #opens = results('Zoo', ['Name', 'Opens'], lambda z: z.Opens != None)
561
562         # time + timedelta -> time
563         #data = results('Zoo', lambda z: (z.Name, z.Opens + td(0, 3600*4)),
564                        #lambda z: z.Opens != None)
565         #self.assertEqual(data, [[name, o + td(0, 3600*4)] for name, o in opens])
566
567         # time - timedelta -> time
568         #data = results('Zoo', lambda z: (z.Name, z.Opens - td(0, 60*45)),
569                        #lambda z: z.Opens != None)
570         #self.assertEqual(data, [[name, o - td(0, 60*45)] for name, o in opens])
571
572         # timedelta + time -> time
573         #data = results('Zoo', lambda z: (z.Name, td(0, 300) + z.Opens),
574                        #lambda z: z.Opens != None)
575         #self.assertEqual(data, [[name, td(0, 300) + o]
576                                 #for name, o in opens])
577
578         # time - time -> timedelta
579         #data = results('Zoo',
580                        #lambda z: (z.Name, z.Opens - datetime.time(1999, 12, 31)),
581                        #lambda z: z.Opens != None)
582         #self.assertEqual(data, [[name, f - datetime.time(1999, 12, 31)]
583                                 #for name, f in opens])
584
585     def test_05_Aggregates(self):
586         try:
587             Animal = self.schema['Animal']
588             Visit = self.schema['Visit']
589             Vet = self.schema['Vet']
590             Zoo = self.schema['Zoo']
591
592             # Subset of columns
593             legs = [l for l, in self.db.select((Animal, ['Legs']))]
594             legs.sort()
595             self.assertEqual(legs, [1, 2, 2, 2, 2, 2, 4, 4, 4, 4, 100, 1000000])
596
597             expected = {'Leopard': 73.5,
598                         'Slug': .75,
599                         'Tiger': None,
600                         'Lion': None,
601                         'Bear': None,
602                         'Ostrich': 103.2,
603                         'Centipede': None,
604                         'Emperor Penguin': None,
605                         'Adelie Penguin': None,
606                         'Millipede': None,
607                         'Ape': None,
608                         }
609             for species, lifespan in self.db.select((Animal, ['Species', 'Lifespan'])):
610                 if expected[species] is None:
611                     self.assertEqual(lifespan, None)
612                 else:
613                     self.assertAlmostEqual(expected[species], lifespan, places=5)
614
615             expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park']
616             e = (lambda x: x.Founded != None
617                  and x.Founded <= today()
618                  and x.Founded >= datetime.date(1990, 1, 1))
619             values = [val[0] for val in
620                       self.db.select((self.schema['Zoo'], ['Name'], e))]
621             for name in expected:
622                 self.assert_(name in values)
623
624             # distinct
625             legs = [x[0] for x in
626                     self.db.select((Animal, ['Legs']), distinct=True)]
627             legs.sort()
628             self.assertEqual(legs, [1, 2, 4, 100, 1000000])
629
630             # This may raise a warning on some DB's.
631             f = (lambda x: x.Species == 'Lion')
632             lionlegs = self.db.select((Animal, ['Legs'], f), distinct=True)
633             self.assertEqual(list(lionlegs), [[4]])
634
635             # Test attribute lambdas
636             visits = self.db.select((Animal << Visit << Vet,
637                                  lambda a, v, vet: (a.Species, vet.Name),
638                                  lambda a, v, vet: vet.Name != None),
639                                distinct = True)
640             visits = list(visits)
641             visits.sort()
642             self.assertEqual(visits, [[u'Emperor Penguin', u'Jim McBain'],
643                                       [u'Tiger', u'Charles Schroeder']])
644
645             # Test attribute lambdas with GROUP BY
646             firstvisit = self.db.select((Animal << Visit,
647                                      lambda a, v: (a.Species, min(v.Date))))
648             firstvisit = list(firstvisit)
649             firstvisit.sort()
650             self.assertEqual(firstvisit,
651                              [[u'Adelie Penguin', None],
652                               [u'Ape', None],
653                               [u'Bear', None],
654                               [u'Centipede', None],
655                               [u'Emperor Penguin', datetime.date(2001, 1, 1)],
656                               [u'Leopard', None],
657                               [u'Lion', None],
658                               [u'Millipede', None],
659                               [u'Ostrich', None],
660                               [u'Slug', None],
661                               [u'Tiger', datetime.date(2001, 1, 1)]]
662                              )
663
664             # Test implicit global funcs
665             escapes = self.db.select((Animal, lambda a: (day(a.LastEscape),),
666                                   lambda a: a.LastEscape != None))
667             escapes = list(escapes)
668             escapes.sort()
669             self.assertEqual(escapes, [[21]])
670             # If Python 2.4+, the above 3 lines could just be written:
671             # self.assertEqual(sorted(escapes), [[21]])
672
673             # 'count' agg func
674             visits = self.db.select((Animal << Visit,
675                                  lambda a, v: (a.Species, count(v.Date))))
676             visits = list(visits)
677             visits.sort()
678             self.assertEqual(visits,
679                              [[u'Adelie Penguin', 0],
680                               [u'Ape', 0],
681                               [u'Bear', 0],
682                               [u'Centipede', 0],
683                               [u'Emperor Penguin', 20],
684                               [u'Leopard', 0],
685                               [u'Lion', 0],
686                               [u'Millipede', 0],
687                               [u'Ostrich', 0],
688                               [u'Slug', 0],
689                               [u'Tiger', 20]]
690                              )
691
692             # count agg func with no grouping
693             animals = self.db.select((Animal, lambda a: [count(a.ID)]))
694             self.assertEqual(list(animals), [[12]])
695
696             # 'now' func
697             wap = Zoo.select(Name='Wild Animal Park')
698             WAP_elapsed = (datetime.datetime.now() - wap['LastEscape'])
699             elapsed = self.db.select((Zoo, lambda z: (z.Name, now() - z.LastEscape)))
700             elapsed = list(elapsed)
701             elapsed.sort()
702             self.assertEqual(elapsed[:4],
703                              [[u'Luna Park', None],
704                               [u'Montr\xe9al Biod\xf4me', None],
705                               [u'San Diego Zoo', None],
706                               [u'Sea_World', None],
707                               ]
708                              )
709             # Assert that the expected interval and actual interval are
710             # no more than 5 second apart.
711             diff = abs(elapsed[4][1] - WAP_elapsed)
712             self.assert_(diff <= datetime.timedelta(0, 5), diff)
713         finally:
714             self.db.connections.commit()
715
716     def test_06_Editing(self):
717         Zoo = self.schema['Zoo']
718
719         # Edit
720         SDZ = Zoo.select(Name='San Diego Zoo')
721         Zoo.save(ID=SDZ['ID'],
722                  Name='The San Diego Zoo',
723                  Founded = datetime.date(1900, 1, 1),
724                  Opens = datetime.time(7, 30, 0),
725                  Admission = "35.00")
726
727         # Test edits
728         SDZ = Zoo.select(Name='The San Diego Zoo')
729         self.assertEqual(SDZ['Name'], 'The San Diego Zoo')
730         self.assertEqual(SDZ['Founded'], datetime.date(1900, 1, 1))
731         self.assertEqual(SDZ['Opens'], datetime.time(7, 30, 0))
732         if typerefs.fixedpoint:
733             self.assertEqual(SDZ['Admission'], typerefs.fixedpoint.FixedPoint(35, 2))
734         else:
735             self.assertEqual(SDZ['Admission'], 35.0)
736
737         # Change it back
738         Zoo.save(ID=SDZ['ID'],
739                  Name = 'San Diego Zoo',
740                  Founded = datetime.date(1835, 9, 13),
741                  Opens = datetime.time(9, 0, 0),
742                  Admission = "0")
743
744         # Test re-edits
745         SDZ = Zoo.select(Name='San Diego Zoo')
746         self.assertEqual(SDZ['Name'], 'San Diego Zoo')
747         self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13))
748         self.assertEqual(SDZ['Opens'], datetime.time(9, 0, 0))
749         if typerefs.fixedpoint:
750             self.assertEqual(SDZ['Admission'], typerefs.fixedpoint.FixedPoint(0, 2))
751         else:
752             self.assertEqual(SDZ['Admission'], 0.0)
753
754         # Test a range of dates near 1900. I had some trouble with date
755         # values near MSAccess' epoch (1899, 12, 30).
756         # TODO: add an 'epoch' attribute to each DB with native dates
757         # and test them all like this.
758         WAPID = Zoo.select(Name='Wild Animal Park')['ID']
759         epoch = datetime.datetime(1899, 12, 30)
760         for days in range(-5, +6):
761             for secs in (0, 1, 33, 100, 1000, 32767, 86399):
762                 d = epoch + datetime.timedelta(days, secs)
763                 Zoo.save(ID=WAPID, LastEscape=d)
764                 data = self.db.select((Zoo, ['LastEscape'], lambda z: z.ID == WAPID))
765                 self.assertEqual(list(data)[0][0], d)
766
767     def test_06a_Edit_expressions(self):
768         Vet = self.schema['Vet']
769
770         # Set all Vet.NameLen using a lambda
771         for vet in Vet.select_all():
772             del vet['NameLen']
773             Vet.save(NameLen=lambda v: len(v.Name), **vet)
774         for vet in Vet.select_all():
775             self.assertEqual(vet['NameLen'], len(vet['Name']))
776
777         # Try a lambda with insert
778         SDZ = self.schema['Zoo'].select(Name='San Diego Zoo')
779         gp = Vet.insert(Name = 'Geoff Pye', ZooID = SDZ['ID'],
780                         NameLen = lambda v: 2 + 3)
781         gp = Vet.select(ID=gp['ID'])
782         self.assertEqual(gp['NameLen'], 5)
783
784     def test_07_Multiselect(self):
785         try:
786             f = (lambda z, a: z.Name == 'San Diego Zoo')
787             zooed_animals = list(self.db.select((self.schema['Zoo'] & self.schema['Animal'],
788                                              [('ID',), self.schema['Animal'].keys()],
789                                             f)))
790             self.assertEqual(len(zooed_animals), 2)
791
792             SDZ = self.schema['Zoo'].select(Name='San Diego Zoo')
793             d = []
794             for row in zooed_animals:
795                 self.assertEqual(row[0], SDZ['ID'])
796                 self.assertNotEqual(row, d)
797                 d = row
798
799             # Assert that multiselects with no matching related units returns
800             # no matches for the initial class (if joins are INNER).
801             # We're also going to test that you can combine a one-arg expr
802             # with a two-arg expr.
803             sdexpr = logic.filter(Name='San Diego Zoo')
804             leo = lambda z, a: a.Species == 'Leopard'
805             zooed_animals = list(self.db.select((self.schema['Zoo'] & self.schema['Animal'],
806                                              [('ID',), ('ID', )], sdexpr + leo)))
807             self.assertEqual(len(zooed_animals), 0)
808
809             # Now try the same query with INNER, LEFT, and RIGHT JOINs.
810             zooed_animals = list(self.db.select((self.schema['Zoo'] & self.schema['Animal'],
811                                              [('Name', ), ('Species', )])))
812             self.assertEqual(len(zooed_animals), 6)
813             self.assertEqualSet([tuple(row) for row in zooed_animals],
814                                 [("Wild Animal Park", "Leopard"),
815                                  ("Wild Animal Park", "Lion"),
816                                  ("San Diego Zoo", "Tiger"),
817                                  ("San Diego Zoo", "Millipede"),
818                                  ("Sea_World", "Emperor Penguin"),
819                                  ("Sea_World", "Adelie Penguin")])
820
821             zooed_animals = list(self.db.select((self.schema['Zoo'] >> self.schema['Animal'],
822                                              [('Name', ), ('Species', )])))
823             self.assertEqual(len(zooed_animals), 12)
824             self.assertEqualSet([tuple(row) for row in zooed_animals],
825                                 [("Wild Animal Park", "Leopard"),
826                                  ("Wild Animal Park", "Lion"),
827                                  ("San Diego Zoo", "Tiger"),
828                                  ("San Diego Zoo", "Millipede"),
829                                  ("Sea_World", "Emperor Penguin"),
830                                  ("Sea_World", "Adelie Penguin"),
831                                  (None, "Slug"), (None, "Bear"),
832                                  (None, "Ostrich"), (None, "Centipede"),
833                                  (None, "Ape"), (None, "Ape"),
834                                  ])
835
836             zooed_animals = list(self.db.select((self.schema['Zoo'] << self.schema['Animal'],
837                                              [('Name', ), ('Species', )])))
838             self.assertEqual(len(zooed_animals), 8)
839             self.assertEqualSet([tuple(row) for row in zooed_animals],
840                                 [("Wild Animal Park", "Leopard"),
841                                  ("Wild Animal Park", "Lion"),
842                                  ("Luna Park", None),
843                                  ("San Diego Zoo", "Tiger"),
844                                  ("San Diego Zoo", "Millipede"),
845                                  ("Sea_World", "Emperor Penguin"),
846                                  ("Sea_World", "Adelie Penguin"),
847                                  (u'Montr\xe9al Biod\xf4me', None),
848                                  ])
849
850             # Try a multiple-arg expression
851             f = (lambda a, z: a.Legs >= 4 and z.Admission < 10)
852             animal_zoos = list(self.db.select((self.schema['Animal'] & self.schema['Zoo'],
853                                            [('Species',), self.schema['Zoo'].keys()],
854                                            f), strict=False))
855             self.assertEqual(len(animal_zoos), 4)
856             names = [row[0] for row in animal_zoos]
857             names.sort()
858             self.assertEqual(names, ['Leopard', 'Lion', 'Millipede', 'Tiger'])
859
860             # Let's try three joined classes just for the sadistic fun of it.
861             tree = (self.schema['Animal'] >> self.schema['Zoo']) >> self.schema['Vet']
862             f = (lambda a, z, v: z.Name == 'Sea_World')
863             entries = list(self.db.select((tree, [('ID',), ('ID',), ('ID', 'Name')], f)))
864             self.assertEqual(len(entries), 2)
865
866             # Let's try three joined classes just for the sadistic fun of it.
867             tree = (self.schema['Animal'] & self.schema['Zoo']) & self.schema['Vet']
868             f = (lambda a, z, v: z.Name == 'Sea_World')
869             entries = list(self.db.select((tree, [('ID',), ('ID',), ('ID', 'Name')], f)))
870             self.assertEqual(len(entries), 2)
871
872             tree = self.schema['Animal'] << self.schema['Zoo']
873             f = (lambda a, z, v: z.Name == 'Sea_World')
874             entries = list(self.db.select((tree, [('ID',), ('ID',), ('ID', 'Name')], f)))
875             # THere are only two animals in sea world, so this join will return 2 rows.
876             self.assertEqual(len(entries), 2)
877
878             # Try mentioning the same class twice.
879             tree = (self.schema['Animal'] << self.schema['Animal'])
880             f = (lambda anim, mother: mother.ID != None)
881             animals = list(self.db.select((tree, [(), ('Name', )], f)))
882             self.assertEqual(animals, [['Hua Mei']])
883         finally:
884             self.db.connections.commit()
885
886     def test_08_CustomAssociations(self):
887         try:
888             # Try different association paths
889             std_expected = ['Live Bunny Wabbit', 'Live Fish', 'Live Fish', 'T-Bone']
890             cus_expected = ['Dead Fish', 'Dead Fish', 'Live Bunny Wabbit', 'T-Bone']
891             uj = self.schema['Animal'] & self.schema['Food']
892             for path, expected in [# standard path
893                                    (None, std_expected),
894                                    # custom path
895                                    ('Alternate Food', cus_expected)]:
896                 uj.path = path
897                 foods = list(self.db.select((uj, [(), ('Name',)])))
898                 self.assertEqualSet([name for name, in foods], expected)
899         finally:
900             self.db.connections.commit()
901
902     def test_09_delete(self):
903         ostrich = self.schema['Animal'].select(Species='Ostrich')
904         self.assert_(ostrich is not None)
905
906         self.schema['Animal'].delete(**ostrich)
907
908         ostrich = self.schema['Animal'].select(Species='Ostrich')
909         self.assertEqual(ostrich, None)
910
911         # Re-create the ostrich and try deleting it with a non-ID kwarg.
912         self.schema['Animal'].insert(Species='Ostrich', Legs=2, PreviousZoos=[],
913                             Lifespan=103.2)
914         ostrich = self.schema['Animal'].select(Species='Ostrich')
915         self.assert_(ostrich is not None)
916
917         self.schema['Animal'].delete_all(Species='Ostrich')
918
919         ostrich = self.schema['Animal'].select(Species='Ostrich')
920         self.assertEqual(ostrich, None)
921
922     def test_10_timezone_aware(self):
923
924         if self.db.__class__.__name__.endswith('PgDatabase'):
925             # Exercise the timezone aware dbtype supported by postgresql.
926             class Timezone(datetime.tzinfo):
927                 def __init__(self, offset=0):
928                     self._offset = offset
929                     self._timedelta = datetime.timedelta(seconds=offset * 60)
930                 def utcoffset(self, dt):
931                     return self._timedelta
932                 def dst(self, dt):
933                     return datetime.timedelta(0)
934
935             self.schema['Animal'].insert(
936                 Species='Eagle',
937                 Birthdate=datetime.datetime(2011, 01, 01, 07, 31, 12, 100, tzinfo=Timezone(240))
938             )
939             self.schema['Animal'].insert(
940                 Species='Beagle',
941                 Birthdate=datetime.datetime(2011, 01, 01, 07, 31, 12, 100, tzinfo=Timezone(-240))
942             )
943
944             eagle = self.schema['Animal'].select(Species='Eagle')
945             d = eagle['Birthdate'].astimezone(Timezone(240))
946             self.assertEqual(2011, d.year)
947             self.assertEqual(01, d.month)
948             self.assertEqual(01, d.day)
949             self.assertEqual(07, d.hour)
950             self.assertEqual(31, d.minute)
951             self.assertEqual(12, d.second)
952             self.assertEqual(100, d.microsecond)
953
954             beagle = self.schema['Animal'].select(Species='Beagle')
955             d = beagle['Birthdate'].astimezone(Timezone(-240))
956             self.assertEqual(2011, d.year)
957             self.assertEqual(01, d.month)
958             self.assertEqual(01, d.day)
959             self.assertEqual(07, d.hour)
960             self.assertEqual(31, d.minute)
961             self.assertEqual(12, d.second)
962             self.assertEqual(100, d.microsecond)
963
964             # Ensure that different timezones between server and client work for selects.
965             data, _ = self.db.fetch("SELECT age(now() at time zone 'UTC', now())", self.db.connections.get())
966             server_offset = data[0][0].seconds / 60
967
968             dtz5 = datetime.datetime(2011, 1, 1, 13, 30, 30, 0, tzinfo=Timezone(-server_offset))
969             dtz6 = datetime.datetime(2011, 1, 1, 12, 30, 30, 0, tzinfo=Timezone(-server_offset - 120))
970             dtzn = datetime.datetime(2011, 1, 1, 12, 30, 30, 0)
971             self.schema['Animal'].insert(Species='Cows', Birthdate=dtz5)
972
973             # Using a timezone aware filter should exclude all cows.
974             expr = logic.Expression(lambda x: x.Birthdate >= dtz6 and x.Species == 'Cows')
975             self.assertEqual(None, self.schema['Animal'].select(expr))
976
977             # Using a timezone naive filter should include all cows.
978             expr = logic.Expression(lambda x: x.Birthdate >= dtzn and x.Species == 'Cows')
979             cow = self.schema['Animal'].select(expr)
980             self.assertEqual('Cows', cow['Species'])
981
982     def test_primary_key_support(self):
983         # Drop and re-add the PK on, oh, how about the Animal table?
984         Animal = self.schema['Animal']
985         Animal.set_primary()
986
987         Animal = self.schema.discover(Animal.name)
988         if self.schema.db.pks_must_be_indexed:
989             self.assertEqual(len(Animal.indices), 2)
990         else:
991             # Since we did not add an index on Animal.ID...
992             self.assertEqual(len(Animal.indices), 1)
993
994     def test_insert_into(self):
995         newtable = self.db.insert_into('fishers',
996                                   (self.schema['Animal'] << self.schema['Food'],
997                                     [('ID', 'Species', 'ZooID'),
998                                      ('ID', 'Name')],
999                                     lambda a, f: f.Name == 'Live Fish'))
1000         self.assertEqualSet(newtable.keys(),
1001                             ['ID', 'Species', 'ZooID', 'Food_ID', 'Name'])
1002         results = newtable.select_all()
1003         results.sort()
1004         self.assertEqual(results,
1005                          [{'Food_ID': 2, 'ZooID': 4, 'Name': u'Live Fish',
1006                            'ID': 8, 'Species': u'Emperor Penguin'},
1007                           {'Food_ID': 2, 'ZooID': 4, 'Name': u'Live Fish',
1008                            'ID': 9, 'Species': u'Adelie Penguin'}]
1009                          )
1010
1011     def test_order_by(self):
1012         d = datetime.date
1013
1014         # We're testing several things, here:
1015         # 1. The basic self.db.select call
1016         # 2. ORDER BY with projection
1017         # 3. ORDER BY when projection is in a different column order
1018         self.assertEqual(list(self.db.select((self.schema['Zoo'], ['Founded', 'Name'], None),
1019                                         order=lambda z: [z.Name])),
1020                          [[d(2072, 7, 17), u'Luna Park'],
1021                           [d(1992, 6, 19), u'Montr\xe9al Biod\xf4me'],
1022                           [d(1835, 9, 13), u'San Diego Zoo'],
1023                           [None,           u'Sea_World'],
1024                           [d(2000, 1, 1),  u'Wild Animal Park'],
1025                           ])
1026
1027         # Test ORDER BY for the basic select_all method
1028         zoos = self.schema['Zoo'].select_all(order=lambda z: [z.Name])
1029         self.assertEqual([z['Name'] for z in zoos],
1030                          [u'Luna Park', u'Montr\xe9al Biod\xf4me',
1031                           u'San Diego Zoo', u'Sea_World',
1032                           u'Wild Animal Park'])
1033
1034         # Test reversed()
1035         zoos = self.schema['Zoo'].select_all(order=lambda z: [reversed(z.Name)])
1036         self.assertEqual([z['Name'] for z in zoos],
1037                          [u'Wild Animal Park',
1038                           u'Sea_World', u'San Diego Zoo',
1039                           u'Montr\xe9al Biod\xf4me', u'Luna Park'])
1040
1041         # Test the list format (instead of lambda) for query.order
1042         zoos = self.schema['Zoo'].select_all(order=['Name'])
1043         self.assertEqual([z['Name'] for z in zoos],
1044                          [u'Luna Park', u'Montr\xe9al Biod\xf4me',
1045                           u'San Diego Zoo', u'Sea_World',
1046                           u'Wild Animal Park'])
1047
1048         # Test the 'limit' option
1049         zoos = self.schema['Zoo'].select_all(order=['Name'], limit=3)
1050         self.assertEqual([z['Name'] for z in zoos],
1051                          [u'Luna Park', u'Montr\xe9al Biod\xf4me',
1052                           u'San Diego Zoo'])
1053
1054         # Test limit with CannotRepresent.
1055         # Notice that we reference a method ('count') which no
1056         # known SM handles, so it will default back to Expr.eval().
1057         # Since the expr is imperfect, the provider MUST return
1058         # more rows than specified by limit, which means the
1059         # select_all method must do its own limiting.
1060         # TODO - STRABS: This appears to have been incorrectly ported from
1061         #        dejavu to here - according to the docs, this isn't
1062         #        supported yet.  What it's trying to do is to use as
1063         #        much of the SQL expression as is possible and passing that
1064         #        off to the DB, but overselecting the rows.  IE, we'll get
1065         #        all the rows we wanted, but also potentially more rows.
1066         #        somewhere along the lines we are supposed to be using the
1067         #        expression object to evaluate each row individually and
1068         #        discarding those that don't pass.  For now, this test is
1069         #        invalid.
1070         #data = self.schema['Animal'].select_all(
1071             #lambda x: 'p' in x.Species and x.Species.count('e') > 1,
1072             #limit=2)
1073         #self.assertEqual(len(data), 2)
1074
1075     def test_view_objects(self):
1076         # Create a View
1077         s = geniusql.Statement((self.schema['Visit'] << self.schema['Vet'] << self.schema['Animal'],
1078                                 lambda v, vet, a: [v.ID, v.Date, vet.Name, a.Species],
1079                                 lambda v, vet, a: a.Legs == 4 and month(v.Date) > 5))
1080         if self.db.ordered_views:
1081             s.order = lambda v, vet, a: [reversed(v.Date)]
1082
1083         self.schema['VisitView'] = VisitView = self.schema.view('VisitView', s)
1084
1085         # Read from it. Note the restriction attenuates the one in the view.
1086         if self.db.ordered_views:
1087             data = VisitView.select_all(lambda v: month(v.Date) < 9)
1088         else:
1089             # Microsoft Access doesn't allow ORDER BY in a VIEW.
1090             data = VisitView.select_all(lambda v: month(v.Date) < 9, order=["Date DESC"])
1091         self.assertEqual(data,
1092             [
1093 ##            [{'Date': datetime.date(2001, 9, 5), 'Name': u'Charles Schroeder',
1094 ##              'Species': 'Tiger', 'ID': 20},
1095              {'Date': datetime.date(2001, 8, 23), 'Name': u'Charles Schroeder',
1096               'Species': 'Tiger', 'ID': 19},
1097              {'Date': datetime.date(2001, 8, 10), 'Name': u'Charles Schroeder',
1098               'Species': 'Tiger', 'ID': 18},
1099              {'Date': datetime.date(2001, 7, 28), 'Name': u'Charles Schroeder',
1100               'Species': 'Tiger', 'ID': 17},
1101              {'Date': datetime.date(2001, 7, 15), 'Name': u'Charles Schroeder',
1102               'Species': 'Tiger', 'ID': 16},
1103              {'Date': datetime.date(2001, 7, 2), 'Name': u'Charles Schroeder',
1104               'Species': 'Tiger', 'ID': 15},
1105              {'Date': datetime.date(2001, 6, 19), 'Name': u'Charles Schroeder',
1106               'Species': 'Tiger', 'ID': 14},
1107              {'Date': datetime.date(2001, 6, 6), 'Name': u'Charles Schroeder',
1108               'Species': 'Tiger', 'ID': 13},
1109 ##             {'Date': datetime.date(2001, 5, 24), 'Name': u'Charles Schroeder',
1110 ##              'Species': 'Tiger', 'ID': 12}, ...
1111              ])
1112 ##
1113 ##    def test_zzz_Schema_Upgrade(self):
1114 ##        # Must run last.
1115 ##        zs = ZooSchema(arena)
1116 ##
1117 ##        # In this first upgrade, we simulate the case where the code was
1118 ##        # upgraded, and the database self.schema upgrade performed afterward.
1119 ##        # The Schema.latest property is set, and upgrade() is called with
1120 ##        # no argument (which should upgrade us to "latest").
1121 ##        Animal.set_property("ExhibitID")
1122 ##        # Test numeric default (see hack in storeado for MS Access).
1123 ##        prop = Animal.set_property("Stomachs", int)
1124 ##        prop.default = 1
1125 ##        zs.latest = 2
1126 ##        zs.upgrade()
1127 ##
1128 ##        # In this example, we simulate the developer who wants to put
1129 ##        # model changes inline with database changes (see upgrade_to_3).
1130 ##        # We do not set latest, but instead supply an arg to upgrade().
1131 ##        zs.upgrade(3)
1132 ##
1133 ##        # Test that Animals have a new "Family" property, and an ExhibitID.
1134 ##        box = arena.new_sandbox()
1135 ##        try:
1136 ##            emp = box.unit(Animal, Family='Emperor Penguin')
1137 ##            self.assertEqual(emp.ExhibitID, 'The Penguin Encounter')
1138 ##        finally:
1139 ##            box.flush_all()
1140
1141
1142 class Phase2_IsolationTests(ZooTestsBaseClass):
1143
1144     verbose = False
1145     _transid = 0
1146
1147     def setUp(self):
1148         try:
1149             self.old_implicit = self.db.connections.implicit_trans
1150             self.db.connections.implicit_trans = False
1151             self.old_tkey = self.db.connections.id
1152             # Use an explicit 'transid' for the transaction key
1153             self.db.connections.id = lambda: self.transid
1154         except AttributeError:
1155             self.old_implicit = None
1156
1157     def tearDown(self):
1158         if self.old_implicit is not None:
1159             self.db.connections.implicit_trans = self.old_implicit
1160             self.db.connections.id = self.old_tkey
1161
1162     def restore(self):
1163         self.transid = 0
1164
1165         self.db.connections.start()
1166         try:
1167             jim = self.schema['Vet'].select(Name = 'Jim McBain')
1168             self.schema['Vet'].save(ID = jim['ID'], City = None)
1169         except:
1170             self.db.connections.rollback()
1171             raise
1172         else:
1173             self.db.connections.commit()
1174
1175     def cleanup_boxes(self):
1176         try:
1177             self.transid = 1
1178             self.db.connections.rollback()
1179         except: pass
1180         try:
1181             self.transid = 2
1182             self.db.connections.rollback()
1183         except: pass
1184
1185     def attempt(self, testfunc, anomaly_name, level):
1186         print ".",
1187         self.restore()
1188
1189         self.transid = 1
1190         self.db.connections.start(level)
1191
1192         self.transid = 2
1193         self.db.connections.start(level)
1194
1195         try:
1196             testfunc(level)
1197         except AssertionError:
1198             self.cleanup_boxes()
1199             if level.forbids(anomaly_name):
1200                 warnings.warn("%r allowed anomaly %r." %
1201                               (level, anomaly_name))
1202         except:
1203             if self.db.is_timeout_error(sys.exc_info()[1]):
1204                 self.cleanup_boxes()
1205                 if not level.forbids(anomaly_name):
1206                     warnings.warn("%r prevented anomaly %r with an error." %
1207                                   (level, anomaly_name))
1208             else:
1209                 self.cleanup_boxes()
1210                 raise
1211         else:
1212             self.cleanup_boxes()
1213             if not level.forbids(anomaly_name):
1214                 warnings.warn("%r prevented anomaly %r." %
1215                               (level, anomaly_name))
1216
1217     def _get_transid(self):
1218         return self._transid
1219     def _set_transid(self, val):
1220         if self.verbose:
1221             print val,
1222         self._transid = val
1223     transid = property(_get_transid, _set_transid)
1224
1225     def test_dirty_read(self):
1226         def dirty_read(level):
1227             # Write City 1
1228             self.transid = 1
1229             jim1 = self.schema['Vet'].select(Name = 'Jim McBain')
1230             self.schema['Vet'].save(ID = jim1['ID'], City = "Addis Ababa")
1231
1232             # Read City 2.
1233             self.transid = 2
1234             jim2 = self.schema['Vet'].select(Name = 'Jim McBain')
1235             # If READ UNCOMMITTED or lower, this should fail
1236             assert jim2['City'] is None
1237
1238         for level in geniusql.isolation.levels:
1239             if self.verbose:
1240                 print
1241                 print level,
1242             if level.name in self.db.connections.isolation_levels:
1243                 self.attempt(dirty_read, "Dirty Read", level)
1244
1245     def test_nonrepeatable_read(self):
1246         def nonrepeatable_read(level):
1247             # Read City 1
1248             self.transid = 1
1249             jim1 = self.schema['Vet'].select(Name = 'Jim McBain')
1250             val1 = jim1['City']
1251             assert val1 is None
1252
1253             # Write City 2.
1254             self.transid = 2
1255             jim2 = self.schema['Vet'].select(Name = 'Jim McBain')
1256             self.schema['Vet'].save(ID=jim2['ID'], City = "Tehachapi")
1257             self.db.connections.commit()
1258
1259             # Re-read City 1
1260             self.transid = 1
1261             jim1 = self.schema['Vet'].select(Name = 'Jim McBain')
1262             # If READ COMMITTED or lower, this should fail
1263             assert jim1['City'] == val1
1264
1265         for level in geniusql.isolation.levels:
1266             if self.verbose:
1267                 print
1268                 print level,
1269             if level.name in self.db.connections.isolation_levels:
1270                 self.attempt(nonrepeatable_read, "Nonrepeatable Read", level)
1271
1272     def test_phantom(self):
1273         def phantom(level):
1274             # Read City 1
1275             self.transid = 1
1276             pvets = self.schema['Vet'].select_all(City = 'Poughkeepsie')
1277             assert len(pvets) == 0
1278
1279             # Write City 2.
1280             self.transid = 2
1281             jim2 = self.schema['Vet'].select(Name = 'Jim McBain')
1282             self.schema['Vet'].save(ID = jim2['ID'], City = "Poughkeepsie")
1283             self.db.connections.commit()
1284
1285             # Re-read City 1
1286             self.transid = 1
1287             pvets = self.schema['Vet'].select_all(City = 'Poughkeepsie')
1288             # If REPEATABLE READ or lower, this should fail
1289             assert len(pvets) == 0
1290
1291         for level in geniusql.isolation.levels:
1292             if self.verbose:
1293                 print
1294                 print level,
1295             if level.name in self.db.connections.isolation_levels:
1296                 self.attempt(phantom, "Phantom", level)
1297
1298
1299 class Phase2_ConcurrencyTests(ZooTestsBaseClass):
1300
1301     def test_Multithreading(self):
1302         if self.db.__class__.__name__ == 'SQLiteDatabase':
1303             print "These multithreading tests will often times segfault python with sqlite."
1304             if raw_input("Are you sure you want to continue (Y/n)").lower().startswith("n"):
1305                 print "Skipping"
1306                 return
1307
1308         f = lambda x: x.Legs == 4 and x.Lifespan is not None
1309         def box_per_thread():
1310             # Notice that, although we write changes in each thread,
1311             # we only assert the unchanged data, since the order of
1312             # thread execution can not be guaranteed.
1313             quadrupeds = self.schema['Animal'].select_all(f)
1314             self.assertEqual(len(quadrupeds), 1)
1315             first = quadrupeds[0]
1316             self.schema['Animal'].save(ID=first['ID'], Lifespan = first['Lifespan'] + 1.0)
1317
1318         ts = []
1319         # PostgreSQL, for example, has a default max_connections of 100.
1320         for x in range(99):
1321             t = threading.Thread(target=box_per_thread)
1322             t.start()
1323             ts.append(t)
1324         for t in ts:
1325             t.join()
1326
1327
1328 class Phase2_TransactionTests(ZooTestsBaseClass):
1329
1330     def test_Implicit_Transactions(self):
1331         old_implicit = self.db.connections.implicit_trans
1332         try:
1333             def commit_test():
1334                 """Test transaction commit."""
1335                 try:
1336                     try:
1337                         now = datetime.time(8, 18, 28)
1338                         WAP = self.schema['Zoo'].select(Name='Wild Animal Park')
1339                         self.schema['Zoo'].save(ID=WAP['ID'], Opens = now)
1340                         self.db.connections.commit()
1341
1342                         WAP = self.schema['Zoo'].select(Name='Wild Animal Park')
1343                         self.assertEqual(WAP['Opens'], now)
1344                     except:
1345                         traceback.print_exc()
1346                         raise
1347                 finally:
1348                     self.db.connections.commit()
1349
1350             def rollback_test():
1351                 """Test transaction rollback."""
1352                 try:
1353                     SDZ = self.schema['Zoo'].select(Name='San Diego Zoo')
1354                     self.schema['Zoo'].save(ID=SDZ['ID'],
1355                                        Name = 'The One and Only San Diego Zoo',
1356                                        Founded = datetime.date(2039, 9, 13))
1357                     self.db.connections.rollback()
1358
1359                     SDZ = self.schema['Zoo'].select(Name='San Diego Zoo')
1360                     self.assertEqual(SDZ['Name'], 'San Diego Zoo')
1361                     self.assertEqual(SDZ['Founded'], datetime.date(1835, 9, 13))
1362                 finally:
1363                     self.db.connections.commit()
1364
1365             self.db.connections.implicit_trans = True
1366             commit_test()
1367             rollback_test()
1368
1369             self.db.connections.implicit_trans = False
1370
1371             self.db.connections.start()
1372             commit_test()
1373
1374             self.db.connections.start()
1375             rollback_test()
1376         finally:
1377             self.db.connections.implicit_trans = old_implicit
1378
1379
1380 class Phase2_NumericTests(ZooTestsBaseClass):
1381
1382     def test_numbers(self):
1383         float_prec = 53
1384         box = arena.new_sandbox()
1385         try:
1386             print "precision:",
1387             # PostgreSQL should be able to go up to 1000 decimal digits (~= 2 ** 10),
1388             # but SQL constants don't actually overflow until 2 ** 15. Meh.
1389             self.db = getattr(arena.stores['testSM'], "self.db", None)
1390             if self.db:
1391                 import math
1392                 maxprec = self.db.typeset.numeric_max_precision
1393                 if maxprec == 0:
1394                     # SQLite, for example, must always use TEXT.
1395                     # So we might as well try... oh... how about 3?
1396                     overflow_prec = 3
1397                 else:
1398                     overflow_prec = int(math.log(maxprec, 2)) + 1
1399             else:
1400                 overflow_prec = 8
1401
1402             dc = typerefs.decimal.getcontext()
1403
1404             for prec in xrange(overflow_prec + 1):
1405                 p = 2 ** prec
1406                 print p,
1407                 if p > dc.prec:
1408                     dc.prec = p
1409
1410                 # We don't need to test <type long> at different 'scales'.
1411                 long_done = False
1412                 # Test scales at both extremes and the median
1413                 for s in (0, int(prec/2), max(prec-1, 0)):
1414                     s = 2 ** s
1415
1416                     # Modify the model and storage
1417                     if not long_done:
1418                         arena.drop_property(NothingToDoWithZoos, 'ALong')
1419                         NothingToDoWithZoos.ALong.hints['bytes'] = p
1420                         arena.add_property(NothingToDoWithZoos, 'ALong')
1421                     if p <= float_prec:
1422                         arena.drop_property(NothingToDoWithZoos, 'AFloat')
1423                         NothingToDoWithZoos.AFloat.hints['precision'] = p
1424                         arena.add_property(NothingToDoWithZoos, 'AFloat')
1425                     if typerefs.decimal:
1426                         arena.drop_property(NothingToDoWithZoos, 'ADecimal')
1427                         NothingToDoWithZoos.ADecimal.hints['precision'] = p
1428                         NothingToDoWithZoos.ADecimal.hints['scale'] = s
1429                         arena.add_property(NothingToDoWithZoos, 'ADecimal')
1430                     if typerefs.fixedpoint:
1431                         arena.drop_property(NothingToDoWithZoos, 'AFixed')
1432                         NothingToDoWithZoos.AFixed.hints['precision'] = p
1433                         NothingToDoWithZoos.AFixed.hints['scale'] = s
1434                         arena.add_property(NothingToDoWithZoos, 'AFixed')
1435
1436                     # Create an instance and set the specified precision/scale
1437                     nothing = NothingToDoWithZoos()
1438                     if not long_done:
1439                         Lval = (16 ** p) - 1
1440                         setattr(nothing, 'ALong', Lval)
1441                     if p <= float_prec:
1442                         fval = float(((2 ** p) - 1) / (2 ** s))
1443                         setattr(nothing, 'AFloat', fval)
1444                     nval = "1" * p
1445                     nval = nval[:-s] + "." + nval[-s:]
1446                     if typerefs.decimal:
1447                         dval = typerefs.decimal.Decimal(nval)
1448                         setattr(nothing, 'ADecimal', dval)
1449                     if typerefs.fixedpoint:
1450                         # fixedpoint uses "precision" where we use "scale";
1451                         # that is, number of digits after the decimal point.
1452                         fpval = typerefs.fixedpoint.FixedPoint(nval, s)
1453                         setattr(nothing, 'AFixed', fpval)
1454                     box.memorize(nothing)
1455
1456                     # Flush and retrieve the object. Use comparisons to test
1457                     # decompilation of imperfect_type when using large numbers.
1458                     if not long_done:
1459                         box.flush_all()
1460                         nothing = box.unit(NothingToDoWithZoos, ALong=Lval)
1461                         if nothing is None:
1462                             self.fail("Unit not found by long property. "
1463                                       "prec=%s scale=%s" % (p, s))
1464                     if p <= float_prec:
1465                         box.flush_all()
1466                         nothing = box.unit(NothingToDoWithZoos, AFloat=fval)
1467                         if nothing is None:
1468                             self.fail("Unit not found by float property. "
1469                                       "prec=%s scale=%s" % (p, s))
1470                     if typerefs.decimal:
1471                         box.flush_all()
1472                         nothing = box.unit(NothingToDoWithZoos, ADecimal=dval)
1473                         if nothing is None:
1474                             self.fail("Unit not found by decimal property. "
1475                                       "prec=%s scale=%s" % (p, s))
1476                     if typerefs.fixedpoint:
1477                         box.flush_all()
1478                         nothing = box.unit(NothingToDoWithZoos, AFixed=fpval)
1479                         if nothing is None:
1480                             self.fail("Unit not found by fixedpoint property. "
1481                                       "prec=%s scale=%s" % (p, s))
1482
1483                     # Test retrieved values.
1484                     if not long_done:
1485                         if nothing.ALong != Lval:
1486                             self.fail("%s != %s prec=%s scale=%s" %
1487                                       (`nothing.ALong`, `Lval`, p, s))
1488                     if p <= float_prec:
1489                         if nothing.AFloat != fval:
1490                             self.fail("%s != %s prec=%s scale=%s" %
1491                                       (`nothing.AFloat`, `fval`, p, s))
1492                     if typerefs.decimal:
1493                         if nothing.ADecimal != dval:
1494                             self.fail("%s != %s prec=%s scale=%s" %
1495                                       (`nothing.ADecimal`, `dval`, p, s))
1496                     if typerefs.fixedpoint:
1497                         if nothing.AFixed != fpval:
1498                             self.fail("%s != %s prec=%s scale=%s" %
1499                                       (`nothing.AFixed`, `fpval`, p, s))
1500                     nothing.forget()
1501                     box.flush_all()
1502                     long_done = True
1503         finally:
1504             box.flush_all()
1505
1506
1507 class Phase2_ConnectionTests(ZooTestsBaseClass):
1508
1509     def test_ConnClose(self):
1510         print "skipped (this feature is not yet implemented in all stores)",
1511         return
1512
1513         Animal = self.schema['Animal']
1514
1515         # Start a sticky transaction so we use the same conn.
1516         connmgr = self.db.connections
1517         connmgr.start()
1518         # Test a normal query.
1519         self.assertEqual(len(Animal.select_all(lambda x: x.Legs == 4)), 4)
1520
1521         # Take the connection down on our side.
1522         txkey = connmgr.id()
1523         conn = connmgr.transactions[txkey]
1524         connmgr._del_conn(conn)
1525
1526         # Try another query.
1527         try:
1528             fourlegs = len(Animal.select_all(lambda x: x.Legs == 4))
1529             self.assertEqual(fourlegs, 4)
1530         except:
1531             del connmgr.transactions[txkey]
1532         else:
1533             raise AssertionError("Connection did not fail.")
1534
1535     def test_ConnUnreachable(self):
1536         if self.db.__class__.__name__ == 'SQLiteDatabase' and self.db.name == ":memory:":
1537             print "In memory :sqlite: databases cannot perform this test. Skipping"
1538             return
1539
1540         if self.db.__class__.__name__ == 'FirebirdDatabase':
1541             test_sql = "SELECT 42 FROM rdb$database;"
1542         else:
1543             test_sql = "SELECT 42;"
1544
1545         conn = self.db.connections.get()
1546         data, _ = self.db.fetch(test_sql, conn=conn)
1547         self.assertEqual(int(data[0][0]), 42)
1548
1549         perform = raw_input("Disable the server and hit Enter (or 'N' to skip).")
1550         if perform.lower().startswith('n'):
1551             print "skipped",
1552             return
1553
1554         try:
1555             self.db.fetch(test_sql, conn=conn)
1556         except:
1557             pass
1558         else:
1559             self.fail("db fetch did not raise an error.")
1560
1561         raw_input("Enable the server and hit Enter.")
1562         data, _ = self.db.fetch(test_sql, conn=conn)
1563         self.assertEqual(int(data[0][0]), 42)
1564
1565
1566 class Phase2_SQLInjectionTests(ZooTestsBaseClass):
1567
1568     debug = False
1569
1570     def test_Typing(self):
1571         from geniusql.deparse import CannotRepresent
1572
1573         try:
1574             data = self.db.select((self.schema['Zoo'], ['ID'], lambda z: z.Name > 12))
1575         except ValueError, x:
1576             if "safely be translated to SQL" not in x.args[0]:
1577                 raise
1578         else:
1579             self.fail("Type mismatch did not raise an error.")
1580
1581         try:
1582             data = self.db.select((self.schema['Zoo'], lambda z: [z.Founded + 13.0]))
1583         except CannotRepresent, x:
1584             self.assert_(x.args[0].startswith("No binary function '+' between "),
1585                          x.args[0])
1586         else:
1587             self.fail("Type mismatch did not raise an error.")
1588
1589         # Test attribute queries that are missing the containing list
1590         try:
1591             data = self.db.select((self.schema['Zoo'], lambda z: z.Founded + 13.0))
1592         except ValueError, x:
1593             self.assertEqual(x.args[0],
1594                              "Attribute AST roots must be Tuple or List, not Add")
1595         else:
1596             self.fail("Missing attribute list brackets did not raise ValueError.")
1597
1598     def test_Escaping(self):
1599         # Test broadening of results
1600         for val in [r"\'' or 1=1",
1601                     r"\'' or 1=1 or '",
1602                     r"\' or 1=1 or ",
1603                     r"\' or 1=1 or '",
1604                     r"Ape' or 1=1 or '",
1605                     r"Ape' or 1=1--",
1606                     ]:
1607             data = self.db.select((self.schema['Animal'], ['ID'],
1608                               lambda a: a.Species == val))
1609             self.assertEqual(len(list(data)), 0,
1610                              "%r was illegally allowed to broaden query results.")
1611
1612         try:
1613             span = "4.0 or 1=1"
1614             data = self.db.select((self.schema['Animal'], ['ID'],
1615                               lambda a: a.Lifespan == span))
1616         except Exception, x:
1617             if self.debug:
1618                 print x
1619         else:
1620             self.assertEqual(len(list(data)), 0)
1621
1622         # Test multiple statement injection with mismatched types
1623         try:
1624             data = self.db.select((self.schema['Animal'], ['ID'],
1625                               lambda a: a.Lifespan > '4; DELETE * FROM animal'))
1626         except Exception, x:
1627             if self.debug:
1628                 print x
1629         else:
1630             self.fail("Injection of multiple statements did not error.")
1631
1632         # Test multiple statement injection with bare scalars
1633         try:
1634             data = self.db.select((self.schema['Animal'], ['ID'],
1635                               lambda a: a.Lifespan > 4 and 'DELETE * FROM animal'))
1636         except Exception, x:
1637             if self.debug:
1638                 print x
1639         else:
1640             self.fail("Injection of multiple statements did not error. Returned %r" % list(data))
1641
1642
1643 ##class ZooSchema(dejavu.Schema):
1644 ##
1645 ##    # We set "latest" to 1 so we can test upgrading manually.
1646 ##    latest = 1
1647 ##
1648 ##    def upgrade_to_2(self):
1649 ##        self.arena.add_property(Animal, "Stomachs")
1650 ##        self.arena.add_property(Animal, "ExhibitID")
1651 ##        box = self.arena.new_sandbox()
1652 ##        for exhibit in box.recall(Exhibit):
1653 ##            for animalID in exhibit.Animals:
1654 ##                # Use the Sandbox magic recaller method.
1655 ##                a = box.Animal(animalID)
1656 ##                if a:
1657 ##                    # Exhibits are identified by ZooID and Name
1658 ##                    a.ZooID = exhibit.ZooID
1659 ##                    a.ExhibitID = exhibit.Name
1660 ##        box.flush_all()
1661 ##
1662 ##    def upgrade_to_3(self):
1663 ##        Animal.remove_property("Species")
1664 ##        Animal.set_property("Family")
1665 ##
1666 ##        # Note that we drop this column in a separate step from step 2.
1667 ##        # If we had mixed model properties and SM properties in step 2,
1668 ##        # we could have done this all in one step. But this is a better
1669 ##        # demonstration of the possibilities. ;)
1670 ##        Exhibit.remove_property("Animals")
1671 ##        self.arena.drop_property(Exhibit, "Animals")
1672 ##
1673 ##        self.arena.rename_property(Animal, "Species", "Family")
1674
Note: See TracBrowser for help on using the browser.