Contact: fumanchu@aminus.org

Log in as guest/geniusql to create tickets

root/trunk/geniusql/conns.py

Revision 310 (checked in by lakin, 4 months ago)

geniusql - fixing whitespcae

  • Property svn:eol-style set to native
Line 
1 """Objects for managing database connections and transactions with Geniusql."""
2
3 __all__ = [
4     'ConnectionFactory',
5     'ConnectionManager',
6     'ConnectionPool',
7     'ConnectionWrapper',
8     'SingleConnection',
9 ]
10
11 import Queue
12 import threading
13 import time
14 import weakref
15
16 from geniusql import errors, isolation as _isolation
17
18
19 class ConnectionWrapper(object):
20     """Connection object wrapper, so it can be used as a weak reference."""
21
22     def __init__(self, conn=None):
23         self.conn = conn
24
25     def __getattr__(self, attr):
26         return getattr(self.conn, attr)
27
28
29 class ConnectionFactoryBase(object):
30     """A base class for connection factories."""
31
32     def __init__(self, open, close, retry=5):
33         self.open = open
34         self.close = close
35         if isinstance(retry, (list, tuple)):
36             self.iterations = retry
37         else:
38             self.iterations = [x + 1 for x in range(retry)]
39
40     def _create(self):
41         """Return an (unwrapped) connection."""
42         exc = None
43         for i in self.iterations:
44             try:
45                 return self.open()
46             except errors.OutOfConnectionsError, exc:
47                 time.sleep(i)
48
49         if exc:
50             args = exc.args
51         else:
52             args = ["No connection found in %r iterations." % self.iterations]
53         raise errors.OutOfConnectionsError(*args)
54
55
56 class ConnectionFactory(ConnectionFactoryBase):
57     """A connection factory which creates a new connection for each request."""
58
59     def __init__(self, open, close, retry=5):
60         ConnectionFactoryBase.__init__(self, open, close, retry)
61         self.refs = {}
62
63     def __call__(self):
64         """Return a (wrapped) connection."""
65         conn = self._create()
66         w = ConnectionWrapper(conn)
67         self.refs[weakref.ref(w, self._release)] = w.conn
68         return w
69
70     def _release(self, ref):
71         """Release a connection."""
72         self.close(self.refs.pop(ref))
73
74     def reset(self, conn):
75         """Reset a (failed) connection."""
76         self.close(conn.conn)
77         conn.conn = self._create()
78
79     def shutdown(self):
80         """Release all database connections."""
81         # Empty self.refs.
82         while self.refs:
83             ref, conn = self.refs.popitem()
84             self.close(conn)
85
86
87 class ConnectionPool(ConnectionFactoryBase):
88     """A database connection factory which keeps a pool of connections."""
89
90     def __init__(self, open, close, retry=5, size=10):
91         ConnectionFactoryBase.__init__(self, open, close, retry)
92         self.refs = {}
93         self.pool = Queue.Queue(size)
94
95     def __call__(self):
96         """Return a (wrapped) connection from the pool."""
97         try:
98             conn = self.pool.get_nowait()
99         except Queue.Empty:
100             conn = self._create()
101
102         # Okay, this is freaky. If we wrap here, all goes well.
103         # If we wrap on Queue.put(), mysql crashes after 1700
104         # or so inserts (when migrating Access tables to MySQL).
105         # Go figure.
106         w = ConnectionWrapper(conn)
107         self.refs[weakref.ref(w, self._release)] = w.conn
108         return w
109
110     def _release(self, ref):
111         """Release a connection."""
112         conn = self.refs.pop(ref)
113         try:
114             self.pool.put_nowait(conn)
115             return
116         except Queue.Full:
117             pass
118         self.close(conn)
119
120     def reset(self, conn):
121         """Reset a (failed) connection."""
122         refkey = None
123         for ref, bareconn in self.refs.items():
124             if bareconn is conn.conn:
125                 refkey = ref
126                 break
127
128         # The 'conn' is actually a ConnectionWrapper instance.
129         self.close(conn.conn)
130         # Pop the weakref out in case _create raises OutOfConnectionsError.
131         self.refs.pop(refkey, None)
132
133         conn.conn = self._create()
134
135         # Replace the bare conn in self.refs.
136         if refkey is None:
137             refkey = weakref.ref(conn, self._release)
138         self.refs[refkey] = conn.conn
139
140     def shutdown(self):
141         """Release all database connections."""
142         # Empty the pool.
143         while True:
144             try:
145                 self.pool.get(block=False)
146             except Queue.Empty:
147                 break
148
149         # Empty self.refs.
150         while self.refs:
151             ref, conn = self.refs.popitem()
152             self.close(conn)
153
154
155 class ConnectionPerThread(ConnectionFactoryBase):
156     """A database connection factory which uses one connection per thread.
157
158     This is useful for SQLite; from http://www.sqlite.org/c_interface.html:
159
160         "If SQLite is compiled with the THREADSAFE preprocessor macro set
161         to 1, then it is safe to use SQLite from two or more threads of
162         the same process at the same time. But each thread should have
163         its own sqlite* pointer returned from sqlite_open. It is never safe
164         for two or more threads to access the same sqlite* pointer at the
165         same time.
166
167         In precompiled SQLite libraries available on the website, the Unix
168         versions are compiled with THREADSAFE turned off but the windows
169         versions are compiled with THREADSAFE turned on. If you need
170         something different that this you will have to recompile."
171
172     See also http://www.sqlite.org/faq.html#q8
173     """
174
175     def __init__(self, open, close, retry=5):
176         ConnectionFactoryBase.__init__(self, open, close, retry)
177         self.refs = {}
178
179     def __call__(self):
180         """Return the connection for the current thread."""
181         threadid = threading._get_ident()
182         try:
183             return self.refs[threadid]
184         except KeyError:
185             conn = self._create()
186             self.refs[threadid] = conn
187             return ConnectionWrapper(conn)
188
189     def reset(self, conn):
190         """Reset a (failed) connection."""
191         refkey = None
192         for ref, bareconn in self.refs.items():
193             if bareconn is conn.conn:
194                 refkey = ref
195                 break
196
197         # The 'conn' is actually a ConnectionWrapper instance.
198         self.close(conn.conn)
199         conn.conn = self._create()
200
201         # Replace the bare conn in self.refs.
202         if refkey is not None:
203             self.refs[refkey] = conn.conn
204
205     def shutdown(self):
206         """Release all database connections."""
207         # Empty the conn map.
208         while self.refs:
209             threadid, conn = self.refs.popitem()
210             self.close(conn)
211
212
213 class SingleConnection(ConnectionFactoryBase):
214     """A single database connection for all consumers.
215
216     Use this when your database cannot handle multiple connections at once,
217     but can handle multiple threads using the same connection.
218     """
219
220     def __init__(self, open, close, retry=5):
221         ConnectionFactoryBase.__init__(self, open, close, retry)
222         # Delay opening the connection, because the
223         # SM may need to create the database first.
224         self._conn = None
225
226     def __call__(self):
227         """Return our only connection."""
228         if self._conn is None:
229             self._conn = self._create()
230         return ConnectionWrapper(self._conn)
231
232     def reset(self, conn):
233         """Reset a (failed) connection."""
234         # The 'conn' is actually a ConnectionWrapper instance.
235         self.close(conn.conn)
236         conn.conn = self._conn = self._create()
237
238     def shutdown(self):
239         """Release all database connections."""
240         if self._conn is not None:
241             self.close(self._conn)
242             self._conn = None
243
244
245 class ConnectionManager(object):
246
247     retry = 5
248     poolsize = 10
249     implicit_trans = False
250
251     # Change this to 'error' if you don't want autocommit on schema ops.
252     contention = 'commit'
253
254     # The "default_isolation" value should be a value native to the DB.
255     default_isolation = None
256
257     # The values in "isolation_levels" should match the names of
258     # IsolationLevel objects in isolation.py
259     isolation_levels = ["READ UNCOMMITTED", "READ COMMITTED",
260                         "REPEATABLE READ", "SERIALIZABLE"]
261
262     # Any SQL to execute per connection (inside _get_conn)
263     # before returning the connection to the caller.
264     initial_sql = None
265
266     def __init__(self, db):
267         self.transactions = {}
268         self.db = db
269         self._set_factory()
270
271     def _set_factory(self):
272         if self.poolsize > 0:
273             self._factory = ConnectionPool(self._get_conn, self._del_conn,
274                                            self.retry, self.poolsize)
275         else:
276             self._factory = ConnectionFactory(self._get_conn, self._del_conn,
277                                               self.retry)
278
279     def shutdown(self):
280         """Release all database connections."""
281         self._factory.shutdown()
282
283     def _get_conn(self):
284         """Create and return a connection object."""
285         # Override this with the connection call for your DB. Example:
286         #     return libpq.PQconnectdb(self.connstring)
287         raise NotImplementedError
288
289     def _del_conn(self, conn):
290         """Close a connection object."""
291         # Override this with the close call (if any) for your DB.
292         conn.close()
293
294     def get(self, isolation=None):
295         """Return the (possibly new) connection for the current transaction.
296
297         If we are already in a transaction, this returns the connection for
298         that transaction. The "current transaction" context is determined by
299         self.id(); by default, this is the current thread ID (but subclasses
300         are free to change this). If there is no "current transaction", then
301         a new connection object is obtained (usually from a pool).
302
303         If self.implicit_trans is True, a new connection will automatically
304         call "START TRANSACTION". It will also be associated with self.id(),
305         and any subsequent calls to this method will then return the same
306         connection object. If self.implicit_trans is False, new connections
307         won't be STARTed or stored.
308         """
309         key = self.id()
310         if key in self.transactions:
311             conn = self.transactions[key]
312             if isinstance(conn, errors.TransactionLock):
313                 raise conn
314         else:
315             conn = self._factory()
316             if self.implicit_trans:
317                 self._start_transaction(conn, isolation)
318                 # We MUST execute START before putting the conn in
319                 # self.transactions so that dead connections have a chance
320                 # to reconnect.
321                 self.transactions[key] = conn
322         return conn
323
324     def reset(self, conn):
325         """Reset the given (failed) connection."""
326         # If in a transaction, error, but first remove the conn from
327         # self.transactions (in a thread-safe way).
328         for key, txconn in self.transactions.items():
329             if txconn is conn:
330                 self.transactions.pop(key, None)
331                 raise errors.TransactionDisconnected()
332
333         self._factory.reset(conn)
334
335     def id(self):
336         """The current transaction id."""
337         return threading._get_ident()
338
339     def start(self, isolation=None):
340         """Start a transaction. Not needed if self.implicit_trans is True."""
341         key = self.id()
342         if key in self.transactions:
343             conn = self.transactions[key]
344             if isinstance(conn, errors.TransactionLock):
345                 raise conn
346         else:
347             conn = self._factory()
348             self._start_transaction(conn, isolation)
349             # We MUST execute START before putting the conn in
350             # self.transactions so that dead connections have a chance
351             # to reconnect.
352             self.transactions[key] = conn
353
354     def _start_transaction(self, conn, isolation=None):
355         """Start a transaction."""
356         self.db.execute("START TRANSACTION;", conn)
357         self.isolate(conn, isolation)
358
359     def isolate(self, conn, isolation=None):
360         """Set the isolation level of the given connection.
361
362         If 'isolation' is None, our default_isolation will be used for new
363         connections. Valid values for the 'isolation' argument may be native
364         values for your particular database. However, it is recommended you
365         pass items from the global 'levels' list instead; these will be
366         automatically replaced with native values.
367
368         For many databases, this must be executed after START TRANSACTION.
369         """
370         if isolation is None:
371             isolation = self.default_isolation
372
373         if isinstance(isolation, _isolation.IsolationLevel):
374             # Map the given IsolationLevel object to a native value.
375             isolation = isolation.name
376             if isolation not in self.isolation_levels:
377                 raise ValueError("IsolationLevel %r not allowed by %s. "
378                                  "Try one of %r instead."
379                                  % (isolation, self.__class__.__name__,
380                                     self.isolation_levels))
381
382         # This is SQL92 syntax, and should work with most DB's.
383         self.db.execute("SET TRANSACTION ISOLATION LEVEL %s;" % isolation, conn)
384
385     def rollback(self):
386         """Roll back the current transaction, if any."""
387         key = self.id()
388         if key in self.transactions:
389             self.db.execute("ROLLBACK;", self.transactions[key])
390             del self.transactions[key]
391         else:
392             # This is critical in order to support polygonal SM structures
393             # (same store being called twice by separate proxies).
394             pass
395
396     def commit(self):
397         """Commit the current transaction, if any."""
398         try:
399             conn = self.transactions.pop(self.id())
400         except KeyError:
401             # This is critical in order to support polygonal SM structures
402             # (same store being called twice by separate proxies).
403             pass
404         else:
405             self.db.execute("COMMIT;", conn)
406
407     def lock(self, msg=None):
408         """Deny transactions during schema operations (DDL statements).
409
410         Any code which calls this should also call 'unlock' in a try/finally:
411
412         db.connections.lock('dropping storage')
413         try:
414             db.execute("DROP TABLE %s;" % tablename)
415         finally:
416             db.connections.unlock()
417         """
418         key = self.id()
419         if key in self.transactions:
420             if isinstance(self.transactions[key], errors.TransactionLock):
421                 return
422             if self.contention == 'error':
423                 raise errors.TransactionLock("Schema operations are not "
424                                              "allowed inside transactions.")
425             self.commit()
426
427         if msg is None:
428             msg = "Transactions not allowed at the moment."
429         self.transactions[key] = errors.TransactionLock(msg)
430
431     def unlock(self):
432         """Allow transactions."""
433         key = self.id()
434         trans = self.transactions.get(key, None)
435         if trans is None:
436             return
437         if not isinstance(trans, errors.TransactionLock):
438             raise errors.TransactionLock("Unlock called inside transaction.")
439         del self.transactions[key]
440
441     def in_transaction(self):
442         """Return True if the current context is in a transaction.
443
444         This also returns True if the current context is executing DDL
445         statements, or is barred from starting a transaction for some
446         other reason.
447         """
448         trans = self.transactions.get(self.id())
449         if trans is None or isinstance(trans, errors.TransactionLock):
450             return False
451         return True
452
Note: See TracBrowser for help on using the browser.