Changeset 240
- Timestamp:
- 11/26/07 16:59:56
- Files:
-
- trunk/geniusql/conns.py (modified) (14 diffs)
- trunk/geniusql/errors.py (modified) (1 diff)
- trunk/geniusql/objects.py (modified) (1 diff)
- trunk/geniusql/providers/msaccess.py (modified) (1 diff)
- trunk/geniusql/providers/mysql.py (modified) (2 diffs)
- trunk/geniusql/providers/pypgsql.py (modified) (2 diffs)
- trunk/geniusql/providers/sqlite.py (modified) (1 diff)
- trunk/geniusql/test/zoo_fixture.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/geniusql/conns.py
r198 r240 25 25 26 26 27 class ConnectionFactory (object):28 """A connection factory which creates a new connection for each request."""27 class ConnectionFactoryBase(object): 28 """A base class for connection factories.""" 29 29 30 30 def __init__(self, open, close, retry=5): 31 31 self.open = open 32 32 self.close = close 33 self.retry = retry 34 self.refs = {} 35 36 def __call__(self): 37 """Return a connection.""" 33 if isinstance(retry, (list, tuple)): 34 self.iterations = retry 35 else: 36 self.iterations = [x + 1 for x in range(retry)] 37 38 def _create(self): 39 """Return an (unwrapped) connection.""" 38 40 exc = None 39 for i in xrange(self.retry):41 for i in self.iterations: 40 42 try: 41 conn = self.open() 42 w = ConnectionWrapper(conn) 43 self.refs[weakref.ref(w, self._release)] = w.conn 44 return w 43 return self.open() 45 44 except errors.OutOfConnectionsError, exc: 46 time.sleep(i + 1)47 conn = None45 time.sleep(i) 46 48 47 if exc: 49 48 args = exc.args 50 49 else: 51 args = ["No connection found in %r iterations." % self. retry]50 args = ["No connection found in %r iterations." % self.iterations] 52 51 raise errors.OutOfConnectionsError(*args) 52 53 54 class ConnectionFactory(ConnectionFactoryBase): 55 """A connection factory which creates a new connection for each request.""" 56 57 def __init__(self, open, close, retry=5): 58 ConnectionFactoryBase.__init__(self, open, close, retry) 59 self.refs = {} 60 61 def __call__(self): 62 """Return a (wrapped) connection.""" 63 conn = self._create() 64 w = ConnectionWrapper(conn) 65 self.refs[weakref.ref(w, self._release)] = w.conn 66 return w 53 67 54 68 def _release(self, ref): 55 69 """Release a connection.""" 56 70 self.close(self.refs.pop(ref)) 71 72 def reset(self, conn): 73 """Reset a (failed) connection.""" 74 self.close(conn.conn) 75 conn.conn = self._create() 57 76 58 77 def shutdown(self): … … 64 83 65 84 66 class ConnectionPool( object):85 class ConnectionPool(ConnectionFactoryBase): 67 86 """A database connection factory which keeps a pool of connections.""" 68 87 69 def __init__(self, open, close, size=10, retry=5): 70 self.open = open 71 self.close = close 88 def __init__(self, open, close, retry=5, size=10): 89 ConnectionFactoryBase.__init__(self, open, close, retry) 72 90 self.refs = {} 73 91 self.pool = Queue.Queue(size) 74 self.retry = retry75 if isinstance(self.retry, (list, tuple)):76 self.iterations = self.retry77 else:78 self.iterations = [x + 1 for x in range(self.retry)]79 92 80 93 def __call__(self): 81 """Return a connection from the pool.""" 82 exc = None 83 for i in self.iterations: 84 try: 85 conn = self.pool.get_nowait() 86 # Okay, this is freaky. If we wrap here, all goes well. 87 # If we wrap on Queue.put(), mysql crashes after 1700 88 # or so inserts (when migrating Access tables to MySQL). 89 # Go figure. 90 w = ConnectionWrapper(conn) 91 self.refs[weakref.ref(w, self._release)] = w.conn 92 return w 93 except Queue.Empty: 94 pass 95 96 try: 97 conn = self.open() 98 w = ConnectionWrapper(conn) 99 self.refs[weakref.ref(w, self._release)] = w.conn 100 return w 101 except errors.OutOfConnectionsError, exc: 102 time.sleep(i) 103 conn = None 104 105 if exc: 106 args = exc.args 107 else: 108 args = ["No connection found in %r iterations." % self.iterations] 109 raise errors.OutOfConnectionsError(*args) 94 """Return a (wrapped) connection from the pool.""" 95 try: 96 conn = self.pool.get_nowait() 97 except Queue.Empty: 98 conn = self._create() 99 100 # Okay, this is freaky. If we wrap here, all goes well. 101 # If we wrap on Queue.put(), mysql crashes after 1700 102 # or so inserts (when migrating Access tables to MySQL). 103 # Go figure. 104 w = ConnectionWrapper(conn) 105 self.refs[weakref.ref(w, self._release)] = w.conn 106 return w 110 107 111 108 def _release(self, ref): … … 119 116 self.close(conn) 120 117 118 def reset(self, conn): 119 """Reset a (failed) connection.""" 120 refkey = None 121 for ref, bareconn in self.refs.items(): 122 if bareconn is conn.conn: 123 refkey = ref 124 break 125 126 # The 'conn' is actually a ConnectionWrapper instance. 127 self.close(conn.conn) 128 # Pop the weakref out in case _create raises OutOfConnectionsError. 129 self.refs.pop(refkey, None) 130 131 conn.conn = self._create() 132 133 # Replace the bare conn in self.refs. 134 if refkey is None: 135 refkey = weakref.ref(conn, self._release) 136 self.refs[refkey] = conn.conn 137 121 138 def shutdown(self): 122 139 """Release all database connections.""" … … 134 151 135 152 136 class ConnectionPerThread( object):153 class ConnectionPerThread(ConnectionFactoryBase): 137 154 """A database connection factory which uses one connection per thread. 138 155 … … 155 172 156 173 def __init__(self, open, close, retry=5): 157 self.open = open 158 self.close = close 159 self.retry = retry 160 self.conns = {} 174 ConnectionFactoryBase.__init__(self, open, close, retry) 175 self.refs = {} 161 176 162 177 def __call__(self): … … 164 179 threadid = threading._get_ident() 165 180 try: 166 return self. conns[threadid]181 return self.refs[threadid] 167 182 except KeyError: 168 exc = None 169 for i in xrange(self.retry): 170 try: 171 conn = self.open() 172 self.conns[threadid] = conn 173 return conn 174 except errors.OutOfConnectionsError, exc: 175 conn = None 176 time.sleep(i + 1) 177 if exc: 178 args = exc.args 179 else: 180 args = ["No connection found in %r iterations." % self.retry] 181 raise errors.OutOfConnectionsError(*args) 183 conn = self._create() 184 self.refs[threadid] = conn 185 return ConnectionWrapper(conn) 186 187 def reset(self, conn): 188 """Reset a (failed) connection.""" 189 refkey = None 190 for ref, bareconn in self.refs.items(): 191 if bareconn is conn.conn: 192 refkey = ref 193 break 194 195 # The 'conn' is actually a ConnectionWrapper instance. 196 self.close(conn.conn) 197 conn.conn = self._create() 198 199 # Replace the bare conn in self.refs. 200 if refkey is not None: 201 self.refs[refkey] = conn.conn 182 202 183 203 def shutdown(self): 184 204 """Release all database connections.""" 185 205 # Empty the conn map. 186 while self. conns:187 threadid, conn = self. conns.popitem()206 while self.refs: 207 threadid, conn = self.refs.popitem() 188 208 self.close(conn) 189 209 190 210 191 class SingleConnection( object):211 class SingleConnection(ConnectionFactoryBase): 192 212 """A single database connection for all consumers. 193 213 … … 196 216 """ 197 217 198 def __init__(self, open, close): 199 self.open = open 200 self.close = close 218 def __init__(self, open, close, retry=5): 219 ConnectionFactoryBase.__init__(self, open, close, retry) 201 220 # Delay opening the connection, because the 202 221 # SM may need to create the database first. … … 206 225 """Return our only connection.""" 207 226 if self._conn is None: 208 self._conn = self.open() 209 return self._conn 227 self._conn = self._create() 228 return ConnectionWrapper(self._conn) 229 230 def reset(self, conn): 231 """Reset a (failed) connection.""" 232 # The 'conn' is actually a ConnectionWrapper instance. 233 self.close(conn.conn) 234 conn.conn = self._conn = self._create() 210 235 211 236 def shutdown(self): … … 218 243 class ConnectionManager(object): 219 244 245 retry = 5 220 246 poolsize = 10 221 247 implicit_trans = False … … 244 270 if self.poolsize > 0: 245 271 self._factory = ConnectionPool(self._get_conn, self._del_conn, 246 self.poolsize) 247 else: 248 self._factory = ConnectionFactory(self._get_conn, self._del_conn) 272 self.retry, self.poolsize) 273 else: 274 self._factory = ConnectionFactory(self._get_conn, self._del_conn, 275 self.retry) 249 276 250 277 def shutdown(self): … … 263 290 conn.close() 264 291 265 def get(self, started=False,isolation=None):292 def get(self, isolation=None): 266 293 """Return the (possibly new) connection for the current transaction. 267 294 … … 276 303 and any subsequent calls to this method will then return the same 277 304 connection object. If self.implicit_trans is False, new connections 278 won't be STARTed or stored; the only exception to this is if the 279 'started' argument is True, in which case the connection will be 280 stored but not automatically STARTed. 305 won't be STARTed or stored. 281 306 """ 282 307 key = self.id() … … 287 312 else: 288 313 conn = self._factory() 289 if self.implicit_trans or started: 314 if self.implicit_trans: 315 self._start_transaction(conn, isolation) 316 # We MUST execute START before putting the conn in 317 # self.transactions so that dead connections have a chance 318 # to reconnect. 290 319 self.transactions[key] = conn 291 if not started:292 self.start(isolation)293 320 return conn 321 322 def reset(self, conn): 323 """Reset the given (failed) connection.""" 324 # If in a transaction, error, but first remove the conn from 325 # self.transactions (in a thread-safe way). 326 for key, txconn in self.transactions.items(): 327 if txconn is conn: 328 self.transactions.pop(key, None) 329 raise errors.TransactionDisconnected() 330 331 self._factory.reset(conn) 294 332 295 333 def id(self): 296 334 """The current transaction id.""" 297 335 return threading._get_ident() 336 337 def start(self, isolation=None): 338 """Start a transaction. Not needed if self.implicit_trans is True.""" 339 key = self.id() 340 if key in self.transactions: 341 conn = self.transactions[key] 342 if isinstance(conn, errors.TransactionLock): 343 raise conn 344 else: 345 conn = self._factory() 346 self._start_transaction(conn, isolation) 347 # We MUST execute START before putting the conn in 348 # self.transactions so that dead connections have a chance 349 # to reconnect. 350 self.transactions[key] = conn 351 352 def _start_transaction(self, conn, isolation=None): 353 """Start a transaction.""" 354 self.db.execute("START TRANSACTION;", conn) 355 self.isolate(conn, isolation) 298 356 299 357 def isolate(self, conn, isolation=None): … … 322 380 # This is SQL92 syntax, and should work with most DB's. 323 381 self.db.execute("SET TRANSACTION ISOLATION LEVEL %s;" % isolation, conn) 324 325 def start(self, isolation=None):326 """Start a transaction. Not needed if self.implicit_trans is True."""327 conn = self.get(started=True)328 self.db.execute("START TRANSACTION;", conn)329 self.isolate(conn, isolation)330 382 331 383 def rollback(self): trunk/geniusql/errors.py
r172 r240 43 43 pass 44 44 45 class TransactionDisconnected(GeniusqlError): 46 """Exception raised when a connection has been lost during a transaction. 47 48 Normally, connections are automatically reset when errors occur. However, 49 when a connection is lost during a transaction, it is assumed that any 50 statements were rolled back when the connection was dropped; therefore, 51 it is almost always unsafe to retry the current statement or proceed 52 with the remaining statements; instead, this exception is raised. 53 """ 54 pass 55 45 56 46 57 class FeatureWarning(UserWarning): trunk/geniusql/objects.py
r239 r240 1144 1144 return False 1145 1145 1146 def is_connection_error(self, exc): 1147 """If the given exception instance is a connection error, return True. 1148 1149 This should return True for errors which arise from broken connections; 1150 for example, if the database server has dropped the connection socket, 1151 or is unreachable. 1152 """ 1153 # You should definitely override this for your database. 1154 return False 1155 1146 1156 def execute(self, sql, conn=None): 1147 1157 """Return a native response for the given SQL.""" 1148 if conn is None:1149 conn = self.connections.get()1150 1158 if isinstance(sql, unicode): 1151 1159 sql = sql.encode(self.typeset.encoding) 1152 1160 self.log(sql) 1153 return conn.query(sql) 1161 1162 if conn is None: 1163 conn = self.connections.get() 1164 1165 try: 1166 return conn.query(sql) 1167 except Exception, x: 1168 if self.is_connection_error(x): 1169 self.connections.reset(conn) 1170 return conn.query(sql) 1171 raise 1154 1172 1155 1173 def execute_ddl(self, sql, conn=None): trunk/geniusql/providers/msaccess.py
r237 r240 395 395 # to be a commit timeout. See http://support.microsoft.com/kb/200300 396 396 # for additional synchronization issues. 397 self._factory = conns.SingleConnection(self._get_conn, self._del_conn) 397 self._factory = conns.SingleConnection(self._get_conn, self._del_conn, 398 self.retry) 398 399 399 400 def isolate(self, conn, isolation=None): trunk/geniusql/providers/mysql.py
r236 r240 490 490 return conn 491 491 492 def start(self, isolation=None): 493 """Start a transaction. Not needed if self.implicit_trans is True.""" 494 conn = self.get(started=True) 492 def _del_conn(self, conn): 493 """Close a connection object.""" 494 try: 495 conn.close() 496 except _mysql.ProgrammingError, exc: 497 # ProgrammingError: closing a closed connection 498 if exc.args == ('closing a closed connection',): 499 pass 500 else: 501 raise 502 503 def _start_transaction(self, conn, isolation=None): 504 """Start a transaction.""" 495 505 # http://dev.mysql.com/doc/refman/5.1/en/set-transaction.html 496 506 # "The default behavior of SET TRANSACTION is to set the … … 636 646 return '`' + name.replace('`', '``') + '`' 637 647 648 def is_connection_error(self, exc): 649 """If the given exception instance is a connection error, return True. 650 651 This should return True for errors which arise from broken connections; 652 for example, if the database server has dropped the connection socket, 653 or is unreachable. 654 """ 655 if isinstance(exc, _mysql.OperationalError): 656 # OperationalError: (2006, 'MySQL server has gone away') 657 return exc.args[0] == 2006 658 print exc 659 return False 660 638 661 def execute(self, sql, conn=None): 639 662 """Return a native response for the given SQL.""" 640 if conn is None:641 conn = self.connections.get()642 if isinstance(sql, unicode):643 sql = sql.encode(self.encoding)644 self.log(sql)645 663 try: 646 return conn.query(sql)664 return geniusql.Database.execute(self, sql, conn=conn) 647 665 except _mysql.OperationalError, x: 648 666 if x.args[0] == 1030 and x.args[1] == 'Got error 139 from storage engine': trunk/geniusql/providers/pypgsql.py
r198 r240 47 47 else: 48 48 raise 49 50 def _simulate_unreachable(self, callback):51 oldconnect = self.Connect52 oldretry = self._factory.retry53 try:54 connstr = "connect_timeout=1 "55 for atom in self.Connect.split(" "):56 k, v = atom.split("=", 1)57 if k == 'host':58 v = 'www.example.com'59 connstr += "%s=%s " % (k, v)60 self.Connect = connstr61 62 self._factory.retry = [0]63 64 callback()65 finally:66 self.Connect = oldconnect67 self._factory.retry = oldretry68 49 69 50 … … 92 73 raise 93 74 return dbinfo 75 76 def is_connection_error(self, exc): 77 """If the given exception instance is a connection error, return True. 78 79 This should return True for errors which arise from broken connections; 80 for example, if the database server has dropped the connection socket, 81 or is unreachable. 82 """ 83 if isinstance(exc, libpq.OperationalError): 84 # OperationalError: server closed the connection unexpectedly 85 # This probably means the server terminated abnormally 86 # before or while processing the request. 87 # OperationalError: no connection to the server\n 88 msg = exc.args[0] 89 return (msg.startswith('no connection to the server') or 90 msg.startswith('server closed the connection unexpectedly')) 91 elif isinstance(exc, libpq.InterfaceError): 92 # InterfaceError: PgConnection object is closed 93 msg = exc.args[0] 94 return msg.startswith('PgConnection object is closed') 95 return False 94 96 95 97 def execute_ddl(self, sql, conn=None): trunk/geniusql/providers/sqlite.py
r237 r240 633 633 # http://www.sqlite.org/cvstrac/wiki?p=InMemoryDatabase 634 634 # So we need to give :memory: databases a SingleConnection. 635 self._factory = conns.SingleConnection(self._get_conn, self._del_conn) 635 self._factory = conns.SingleConnection(self._get_conn, self._del_conn, 636 self.retry) 636 637 elif not self.db.threadsafe: 637 self._factory = conns.ConnectionPerThread(self._get_conn, self._del_conn) 638 self._factory = conns.ConnectionPerThread(self._get_conn, self._del_conn, 639 self.retry) 638 640 else: 639 641 # Use the default behavior (pool) trunk/geniusql/test/zoo_fixture.py
r238 r240 1403 1403 Animal = schema['Animal'] 1404 1404 1405 # Simulate an unreachable host during the grabbing of a connection. 1406 # This is easy to simulate by just munging the hostname for the conn. 1407 def callback(): 1408 fourlegs = len(Animal.select_all(lambda x: x.Legs == 4)) 1409 self.assertEqual(fourlegs, 4) 1410 1411 db.connections.shutdown() 1412 db.connections._simulate_unreachable(callback) 1405 conn = db.connections.get() 1406 data, _ = db.fetch("SELECT 42;", conn=conn) 1407 self.assertEqual(int(data[0][0]), 42) 1408 1409 raw_input("Disable the server and hit Enter.") 1410 try: 1411 db.fetch("SELECT 42;", conn=conn) 1412 except: 1413 pass 1414 else: 1415 self.fail("db fetch did not raise an error.") 1416 1417 raw_input("Enable the server and hit Enter.") 1418 data, _ = db.fetch("SELECT 42;", conn=conn) 1419 self.assertEqual(int(data[0][0]), 42) 1413 1420 1414 1421
