Contact: fumanchu@aminus.org

Log in as guest/geniusql to create tickets

root/trunk/geniusql/test/test.py

Revision 311 (checked in by lakin, 7 months ago)

further fixes to the whitespace.

  • Property svn:eol-style set to native
Line 
1 import datetime
2 import getopt
3 import os
4 localDir = os.path.dirname(__file__)
5 import sys
6 import unittest
7
8 import geniusql
9
10
11 class TestHarness(object):
12     """A test harness for Geniusql."""
13
14     cover = False
15     conquer = False
16
17     def __init__(self, available_tests):
18         """Constructor to populate the TestHarness instance.
19
20         available_tests should be a list of module names (strings).
21         """
22         self.available_tests = available_tests
23         self.tests = []
24
25     def load(self, args=sys.argv[1:]):
26         """Populate a TestHarness from sys.argv.
27
28         args defaults to sys.argv[1:], but you can provide a different
29             set of args if you like.
30         """
31
32         longopts = ['help', 'cover', 'conquer']
33         longopts.extend(self.available_tests)
34         longopts.extend(['skip_zoo', 'skip_concurrency', 'skip_isolation',
35                          'skip_connection'])
36         try:
37             opts, args = getopt.getopt(args, "", longopts)
38         except getopt.GetoptError:
39             # print help information and exit
40             self.help()
41             sys.exit(2)
42
43         self.tests = []
44
45         for o, a in opts:
46             if o == '--help':
47                 self.help()
48                 sys.exit()
49             elif o == '--cover':
50                 self.cover = True
51             elif o == '--conquer':
52                 self.conquer = True
53             elif o.startswith('--skip'):
54                 pass
55             else:
56                 o = o[2:]
57                 if o in self.available_tests and o not in self.tests:
58                     self.tests.append(o)
59
60         if not self.tests:
61             self.tests = self.available_tests[:]
62
63     def help(self):
64         """Print help for test.py command-line options."""
65
66         print """
67 Geniusql Test Program
68     Usage:
69         test.py --<testname> --cover --conquer --skip_zoo --skip_concurrency
70                 --skip_isolation --skip_connection
71
72         --cover: Run coverage tools
73         --conquer: use pyconquer to trace calls
74
75         tests:"""
76         for name in self.available_tests:
77             print '        --' + name
78
79     def run(self):
80         """Run the test harness."""
81         self.load()
82
83         if self.cover:
84             self.start_coverage()
85             try:
86                 self._run()
87             finally:
88                 self.stop_coverage()
89         elif self.conquer:
90             import pyconquer
91             f = os.path.join(os.path.dirname(__file__), "conquer.log")
92             tr = pyconquer.Logger("geniusql", events=pyconquer.all_events)
93             tr.out = open(f, "wb")
94             try:
95                 tr.start()
96                 self._run()
97             finally:
98                 tr.stop()
99                 tr.out.close()
100         else:
101             self._run()
102
103     def _run(self):
104         """Run the test harness."""
105         # Delay the import of any geniusql module so coverage can work
106         # on the import (including class and func defs) as well.
107         from geniusql.test import tools
108         tools.prefer_parent_path()
109
110         import geniusql
111         print "Python version:", sys.version.split()[0]
112         print "Geniusql version:", geniusql.__version__
113
114         for testmod in self.tests:
115             print
116             print "Testing %s..." % testmod[5:]
117
118             mod = __import__(testmod, globals(), locals(), [''])
119             if hasattr(mod, 'opts'):
120                 mod.opts['Prefix'] = 'test'
121
122             if hasattr(mod, 'run'):
123                 mod.run()
124             else:
125                 suite = unittest.TestLoader().loadTestsFromName(testmod)
126                 tools.TestRunner.run(suite)
127
128     def start_coverage(self):
129         """Start the coverage tool.
130
131         To use this feature, you need to download 'coverage.py',
132         either Gareth Rees' original implementation:
133         http://www.garethrees.org/2001/12/04/python-coverage/
134
135         or Ned Batchelder's enhanced version:
136         http://www.nedbatchelder.com/code/modules/coverage.html
137
138         If neither module is found in PYTHONPATH,
139         coverage is silently(!) disabled.
140         """
141         try:
142             from coverage import the_coverage as coverage
143             c = os.path.join(localDir, "coverage.cache")
144             coverage.cache_default = c
145             if c and os.path.exists(c):
146                 os.remove(c)
147             coverage.start()
148         except ImportError:
149             coverage = None
150         self.coverage = coverage
151
152     def stop_coverage(self):
153         """Stop the coverage tool, save results, and report."""
154         if self.coverage:
155             self.coverage.save()
156             self.report_coverage()
157
158     def report_coverage(self):
159         """Print a summary from the code coverage tool."""
160
161         # Assume we want to cover everything in "../../geniusql/"
162         basedir = os.path.normpath(os.path.join(os.getcwd(), localDir, '../'))
163         basedir = basedir.lower()
164         self.coverage.get_ready()
165
166         morfs = []
167         for x in self.coverage.cexecuted:
168             if x.lower().startswith(basedir):
169                 morfs.append(x)
170
171         total_statements = 0
172         total_executed = 0
173
174         print
175         print "CODE COVERAGE (this might take a while)",
176         for morf in morfs:
177             sys.stdout.write(".")
178             sys.stdout.flush()
179 ##            name = os.path.split(morf)[1]
180             if morf.find('test') != -1:
181                 continue
182             try:
183                 _, statements, _, missing, readable  = self.coverage.analysis2(morf)
184                 n = len(statements)
185                 m = n - len(missing)
186                 total_statements = total_statements + n
187                 total_executed = total_executed + m
188             except KeyboardInterrupt:
189                 raise
190             except:
191                 # No, really! We truly want to ignore any other errors.
192                 pass
193
194         pc = 100.0
195         if total_statements > 0:
196             pc = 100.0 * total_executed / total_statements
197
198         print ("\nTotal: %s Covered: %s Percent: %2d%%"
199                % (total_statements, total_executed, pc))
200
201
202 class Provider(object):
203
204     db = None
205     schema = None
206     logname = os.path.join(localDir, "geniusqltest.log")
207
208     def geniusqllog(self, message):
209         """Geniusql logger (writes to error.log)."""
210         if isinstance(message, unicode):
211             message = message.encode('utf8')
212         s = "%s %s" % (datetime.datetime.now().isoformat(), message)
213         f = open(self.logname, 'ab')
214         f.write(s + '\n')
215         f.close()
216
217     def setup(self, provider, opts):
218         """Set up storage for Zoo classes."""
219         self.db = geniusql.db(provider, **opts)
220         self.db.log = self.geniusqllog
221         if self.db.name != ":memory:":
222             # sqlite in memory database will be created immediately upon connection.
223             assert self.db.exists() == False
224         self.db.create()
225         assert self.db.exists() == True
226
227         self.schema = self.db.schema()
228         self.schema.create()
229
230     def teardown(self):
231         """Tear down storage for Zoo classes."""
232         try:
233             self.schema.drop()
234         except (AttributeError, NotImplementedError):
235             pass
236         try:
237             self.db.drop()
238         except (AttributeError, NotImplementedError):
239             pass
240         self.db.connections.shutdown()
241
242 #-------------------------------------------------------------------------------
243 class DBConfig(object):
244     """
245     A helper object to ease the setup of tests at the module level in geniusql.
246
247     Example:
248         # Initialize an in memory sqlite database.
249         db = test.DBConfig('sqlite', {'name': ':memory:'})
250
251         # Register the setup/teardown functions at the module level for nose.
252         setUp = db.setUp
253         tearDown = db.tearDown
254
255
256         # Instantiate a class that will need access to the db/schema
257         # NOTE:
258         class Phase1_ZooTests(zoo_fixture.Phase1_ZooTests):
259             pass
260         db.register_db_test_class(Phase1_ZooTests)
261
262     """
263     #---------------------------------------------------------------------------
264     def __init__(self, db_type, opts):
265         self.tests = []
266         self.db_type = db_type
267         self.opts = opts
268         self.db = None
269         self.schema = None
270         self.provider = None
271
272     #---------------------------------------------------------------------------
273     def register_db_test_class(self, test_class):
274         """
275         Registers the given class as needing access to the db/schema.
276
277         When the setup is run, we will create a db/schema and dynamically
278         attach it to each class registered in this manner as the 'db' and
279         'schema' attributes respectively.
280         """
281         self.tests.append(test_class)
282
283     #---------------------------------------------------------------------------
284     def setUp(self, obj):
285         """
286         This method will setup the specified db class with the given options.
287
288         It will also ensure that all registered test classes have access to the
289         db/schema attributes.
290         """
291         self.provider = Provider()
292         self.provider.setup(self.db_type, self.opts)
293
294         # Attach it to all of the tests that need access
295         for test_class in self.tests:
296             test_class.provider = self.provider
297             test_class.db = self.provider.db
298             test_class.schema = self.provider.schema
299
300     #---------------------------------------------------------------------------
301     def tearDown(self, obj):
302         """
303         Tear down the given provider.
304         """
305         self.provider.teardown()
306
307
Note: See TracBrowser for help on using the browser.