Contact: fumanchu@aminus.org

Log in as guest/geniusql to create tickets

Changeset 240

Show
Ignore:
Timestamp:
11/26/07 16:59:56
Author:
fumanchu
Message:

Initial work on auto-reconnecting to DB (pypgsql and mysql). The remaining providers need support, and pypgsql should try to use conn.reset where possible. It would also be nice to move the ConnUnreachable? test out of the main suite. Finally, the call of _set_factory needs to be moved eventually so that config entries like 'connections.retry' are applied.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/geniusql/conns.py

    r198 r240  
    2525 
    2626 
    27 class ConnectionFactory(object): 
    28     """A connection factory which creates a new connection for each request.""" 
     27class ConnectionFactoryBase(object): 
     28    """A base class for connection factories.""" 
    2929     
    3030    def __init__(self, open, close, retry=5): 
    3131        self.open = open 
    3232        self.close = close 
    33         self.retry = retry 
    34         self.refs = {} 
    35      
    36     def __call__(self): 
    37         """Return a connection.""" 
     33        if isinstance(retry, (list, tuple)): 
     34            self.iterations = retry 
     35        else: 
     36            self.iterations = [x + 1 for x in range(retry)] 
     37     
     38    def _create(self): 
     39        """Return an (unwrapped) connection.""" 
    3840        exc = None 
    39         for i in xrange(self.retry)
     41        for i in self.iterations
    4042            try: 
    41                 conn = self.open() 
    42                 w = ConnectionWrapper(conn) 
    43                 self.refs[weakref.ref(w, self._release)] = w.conn 
    44                 return w 
     43                return self.open() 
    4544            except errors.OutOfConnectionsError, exc: 
    46                 time.sleep(i + 1
    47                 conn = None 
     45                time.sleep(i
     46         
    4847        if exc: 
    4948            args = exc.args 
    5049        else: 
    51             args = ["No connection found in %r iterations." % self.retry
     50            args = ["No connection found in %r iterations." % self.iterations
    5251        raise errors.OutOfConnectionsError(*args) 
     52 
     53 
     54class ConnectionFactory(ConnectionFactoryBase): 
     55    """A connection factory which creates a new connection for each request.""" 
     56     
     57    def __init__(self, open, close, retry=5): 
     58        ConnectionFactoryBase.__init__(self, open, close, retry) 
     59        self.refs = {} 
     60     
     61    def __call__(self): 
     62        """Return a (wrapped) connection.""" 
     63        conn = self._create() 
     64        w = ConnectionWrapper(conn) 
     65        self.refs[weakref.ref(w, self._release)] = w.conn 
     66        return w 
    5367     
    5468    def _release(self, ref): 
    5569        """Release a connection.""" 
    5670        self.close(self.refs.pop(ref)) 
     71     
     72    def reset(self, conn): 
     73        """Reset a (failed) connection.""" 
     74        self.close(conn.conn) 
     75        conn.conn = self._create() 
    5776     
    5877    def shutdown(self): 
     
    6483 
    6584 
    66 class ConnectionPool(object): 
     85class ConnectionPool(ConnectionFactoryBase): 
    6786    """A database connection factory which keeps a pool of connections.""" 
    6887     
    69     def __init__(self, open, close, size=10, retry=5): 
    70         self.open = open 
    71         self.close = close 
     88    def __init__(self, open, close, retry=5, size=10): 
     89        ConnectionFactoryBase.__init__(self, open, close, retry) 
    7290        self.refs = {} 
    7391        self.pool = Queue.Queue(size) 
    74         self.retry = retry 
    75         if isinstance(self.retry, (list, tuple)): 
    76             self.iterations = self.retry 
    77         else: 
    78             self.iterations = [x + 1 for x in range(self.retry)] 
    7992     
    8093    def __call__(self): 
    81         """Return a connection from the pool.""" 
    82         exc = None 
    83         for i in self.iterations: 
    84             try: 
    85                 conn = self.pool.get_nowait() 
    86                 # Okay, this is freaky. If we wrap here, all goes well. 
    87                 # If we wrap on Queue.put(), mysql crashes after 1700 
    88                 # or so inserts (when migrating Access tables to MySQL). 
    89                 # Go figure. 
    90                 w = ConnectionWrapper(conn) 
    91                 self.refs[weakref.ref(w, self._release)] = w.conn 
    92                 return w 
    93             except Queue.Empty: 
    94                 pass 
    95              
    96             try: 
    97                 conn = self.open() 
    98                 w = ConnectionWrapper(conn) 
    99                 self.refs[weakref.ref(w, self._release)] = w.conn 
    100                 return w 
    101             except errors.OutOfConnectionsError, exc: 
    102                 time.sleep(i) 
    103                 conn = None 
    104          
    105         if exc: 
    106             args = exc.args 
    107         else: 
    108             args = ["No connection found in %r iterations." % self.iterations] 
    109         raise errors.OutOfConnectionsError(*args) 
     94        """Return a (wrapped) connection from the pool.""" 
     95        try: 
     96            conn = self.pool.get_nowait() 
     97        except Queue.Empty: 
     98            conn = self._create() 
     99         
     100        # Okay, this is freaky. If we wrap here, all goes well. 
     101        # If we wrap on Queue.put(), mysql crashes after 1700 
     102        # or so inserts (when migrating Access tables to MySQL). 
     103        # Go figure. 
     104        w = ConnectionWrapper(conn) 
     105        self.refs[weakref.ref(w, self._release)] = w.conn 
     106        return w 
    110107     
    111108    def _release(self, ref): 
     
    119116        self.close(conn) 
    120117     
     118    def reset(self, conn): 
     119        """Reset a (failed) connection.""" 
     120        refkey = None 
     121        for ref, bareconn in self.refs.items(): 
     122            if bareconn is conn.conn: 
     123                refkey = ref 
     124                break 
     125         
     126        # The 'conn' is actually a ConnectionWrapper instance. 
     127        self.close(conn.conn) 
     128        # Pop the weakref out in case _create raises OutOfConnectionsError. 
     129        self.refs.pop(refkey, None) 
     130         
     131        conn.conn = self._create() 
     132         
     133        # Replace the bare conn in self.refs. 
     134        if refkey is None: 
     135            refkey = weakref.ref(conn, self._release) 
     136        self.refs[refkey] = conn.conn 
     137     
    121138    def shutdown(self): 
    122139        """Release all database connections.""" 
     
    134151 
    135152 
    136 class ConnectionPerThread(object): 
     153class ConnectionPerThread(ConnectionFactoryBase): 
    137154    """A database connection factory which uses one connection per thread. 
    138155     
     
    155172     
    156173    def __init__(self, open, close, retry=5): 
    157         self.open = open 
    158         self.close = close 
    159         self.retry = retry 
    160         self.conns = {} 
     174        ConnectionFactoryBase.__init__(self, open, close, retry) 
     175        self.refs = {} 
    161176     
    162177    def __call__(self): 
     
    164179        threadid = threading._get_ident() 
    165180        try: 
    166             return self.conns[threadid] 
     181            return self.refs[threadid] 
    167182        except KeyError: 
    168             exc = None 
    169             for i in xrange(self.retry): 
    170                 try: 
    171                     conn = self.open() 
    172                     self.conns[threadid] = conn 
    173                     return conn 
    174                 except errors.OutOfConnectionsError, exc: 
    175                     conn = None 
    176                     time.sleep(i + 1) 
    177             if exc: 
    178                 args = exc.args 
    179             else: 
    180                 args = ["No connection found in %r iterations." % self.retry] 
    181             raise errors.OutOfConnectionsError(*args) 
     183            conn = self._create() 
     184            self.refs[threadid] = conn 
     185            return ConnectionWrapper(conn) 
     186     
     187    def reset(self, conn): 
     188        """Reset a (failed) connection.""" 
     189        refkey = None 
     190        for ref, bareconn in self.refs.items(): 
     191            if bareconn is conn.conn: 
     192                refkey = ref 
     193                break 
     194         
     195        # The 'conn' is actually a ConnectionWrapper instance. 
     196        self.close(conn.conn) 
     197        conn.conn = self._create() 
     198         
     199        # Replace the bare conn in self.refs. 
     200        if refkey is not None: 
     201            self.refs[refkey] = conn.conn 
    182202     
    183203    def shutdown(self): 
    184204        """Release all database connections.""" 
    185205        # Empty the conn map. 
    186         while self.conns: 
    187             threadid, conn = self.conns.popitem() 
     206        while self.refs: 
     207            threadid, conn = self.refs.popitem() 
    188208            self.close(conn) 
    189209 
    190210 
    191 class SingleConnection(object): 
     211class SingleConnection(ConnectionFactoryBase): 
    192212    """A single database connection for all consumers. 
    193213     
     
    196216    """ 
    197217     
    198     def __init__(self, open, close): 
    199         self.open = open 
    200         self.close = close 
     218    def __init__(self, open, close, retry=5): 
     219        ConnectionFactoryBase.__init__(self, open, close, retry) 
    201220        # Delay opening the connection, because the 
    202221        # SM may need to create the database first. 
     
    206225        """Return our only connection.""" 
    207226        if self._conn is None: 
    208             self._conn = self.open() 
    209         return self._conn 
     227            self._conn = self._create() 
     228        return ConnectionWrapper(self._conn) 
     229     
     230    def reset(self, conn): 
     231        """Reset a (failed) connection.""" 
     232        # The 'conn' is actually a ConnectionWrapper instance. 
     233        self.close(conn.conn) 
     234        conn.conn = self._conn = self._create() 
    210235     
    211236    def shutdown(self): 
     
    218243class ConnectionManager(object): 
    219244     
     245    retry = 5 
    220246    poolsize = 10 
    221247    implicit_trans = False 
     
    244270        if self.poolsize > 0: 
    245271            self._factory = ConnectionPool(self._get_conn, self._del_conn, 
    246                                            self.poolsize) 
    247         else: 
    248             self._factory = ConnectionFactory(self._get_conn, self._del_conn) 
     272                                           self.retry, self.poolsize) 
     273        else: 
     274            self._factory = ConnectionFactory(self._get_conn, self._del_conn, 
     275                                              self.retry) 
    249276     
    250277    def shutdown(self): 
     
    263290        conn.close() 
    264291     
    265     def get(self, started=False, isolation=None): 
     292    def get(self, isolation=None): 
    266293        """Return the (possibly new) connection for the current transaction. 
    267294         
     
    276303        and any subsequent calls to this method will then return the same 
    277304        connection object. If self.implicit_trans is False, new connections 
    278         won't be STARTed or stored; the only exception to this is if the 
    279         'started' argument is True, in which case the connection will be 
    280         stored but not automatically STARTed. 
     305        won't be STARTed or stored. 
    281306        """ 
    282307        key = self.id() 
     
    287312        else: 
    288313            conn = self._factory() 
    289             if self.implicit_trans or started: 
     314            if self.implicit_trans: 
     315                self._start_transaction(conn, isolation) 
     316                # We MUST execute START before putting the conn in 
     317                # self.transactions so that dead connections have a chance 
     318                # to reconnect. 
    290319                self.transactions[key] = conn 
    291                 if not started: 
    292                     self.start(isolation) 
    293320        return conn 
     321     
     322    def reset(self, conn): 
     323        """Reset the given (failed) connection.""" 
     324        # If in a transaction, error, but first remove the conn from 
     325        # self.transactions (in a thread-safe way). 
     326        for key, txconn in self.transactions.items(): 
     327            if txconn is conn: 
     328                self.transactions.pop(key, None) 
     329                raise errors.TransactionDisconnected() 
     330         
     331        self._factory.reset(conn) 
    294332     
    295333    def id(self): 
    296334        """The current transaction id.""" 
    297335        return threading._get_ident() 
     336     
     337    def start(self, isolation=None): 
     338        """Start a transaction. Not needed if self.implicit_trans is True.""" 
     339        key = self.id() 
     340        if key in self.transactions: 
     341            conn = self.transactions[key] 
     342            if isinstance(conn, errors.TransactionLock): 
     343                raise conn 
     344        else: 
     345            conn = self._factory() 
     346            self._start_transaction(conn, isolation) 
     347            # We MUST execute START before putting the conn in 
     348            # self.transactions so that dead connections have a chance 
     349            # to reconnect. 
     350            self.transactions[key] = conn 
     351     
     352    def _start_transaction(self, conn, isolation=None): 
     353        """Start a transaction.""" 
     354        self.db.execute("START TRANSACTION;", conn) 
     355        self.isolate(conn, isolation) 
    298356     
    299357    def isolate(self, conn, isolation=None): 
     
    322380        # This is SQL92 syntax, and should work with most DB's. 
    323381        self.db.execute("SET TRANSACTION ISOLATION LEVEL %s;" % isolation, conn) 
    324      
    325     def start(self, isolation=None): 
    326         """Start a transaction. Not needed if self.implicit_trans is True.""" 
    327         conn = self.get(started=True) 
    328         self.db.execute("START TRANSACTION;", conn) 
    329         self.isolate(conn, isolation) 
    330382     
    331383    def rollback(self): 
  • trunk/geniusql/errors.py

    r172 r240  
    4343    pass 
    4444 
     45class TransactionDisconnected(GeniusqlError): 
     46    """Exception raised when a connection has been lost during a transaction. 
     47     
     48    Normally, connections are automatically reset when errors occur. However, 
     49    when a connection is lost during a transaction, it is assumed that any 
     50    statements were rolled back when the connection was dropped; therefore, 
     51    it is almost always unsafe to retry the current statement or proceed 
     52    with the remaining statements; instead, this exception is raised. 
     53    """ 
     54    pass 
     55 
    4556 
    4657class FeatureWarning(UserWarning): 
  • trunk/geniusql/objects.py

    r239 r240  
    11441144        return False 
    11451145     
     1146    def is_connection_error(self, exc): 
     1147        """If the given exception instance is a connection error, return True. 
     1148         
     1149        This should return True for errors which arise from broken connections; 
     1150        for example, if the database server has dropped the connection socket, 
     1151        or is unreachable. 
     1152        """ 
     1153        # You should definitely override this for your database. 
     1154        return False 
     1155     
    11461156    def execute(self, sql, conn=None): 
    11471157        """Return a native response for the given SQL.""" 
    1148         if conn is None: 
    1149             conn = self.connections.get() 
    11501158        if isinstance(sql, unicode): 
    11511159            sql = sql.encode(self.typeset.encoding) 
    11521160        self.log(sql) 
    1153         return conn.query(sql) 
     1161         
     1162        if conn is None: 
     1163            conn = self.connections.get() 
     1164         
     1165        try: 
     1166            return conn.query(sql) 
     1167        except Exception, x: 
     1168            if self.is_connection_error(x): 
     1169                self.connections.reset(conn) 
     1170                return conn.query(sql) 
     1171            raise 
    11541172     
    11551173    def execute_ddl(self, sql, conn=None): 
  • trunk/geniusql/providers/msaccess.py

    r237 r240  
    395395        # to be a commit timeout. See http://support.microsoft.com/kb/200300 
    396396        # for additional synchronization issues. 
    397         self._factory = conns.SingleConnection(self._get_conn, self._del_conn) 
     397        self._factory = conns.SingleConnection(self._get_conn, self._del_conn, 
     398                                               self.retry) 
    398399     
    399400    def isolate(self, conn, isolation=None): 
  • trunk/geniusql/providers/mysql.py

    r236 r240  
    490490        return conn 
    491491     
    492     def start(self, isolation=None): 
    493         """Start a transaction. Not needed if self.implicit_trans is True.""" 
    494         conn = self.get(started=True) 
     492    def _del_conn(self, conn): 
     493        """Close a connection object.""" 
     494        try: 
     495            conn.close() 
     496        except _mysql.ProgrammingError, exc: 
     497            # ProgrammingError: closing a closed connection 
     498            if exc.args == ('closing a closed connection',): 
     499                pass 
     500            else: 
     501                raise 
     502     
     503    def _start_transaction(self, conn, isolation=None): 
     504        """Start a transaction.""" 
    495505        # http://dev.mysql.com/doc/refman/5.1/en/set-transaction.html 
    496506        # "The default behavior of SET TRANSACTION is to set the 
     
    636646        return '`' + name.replace('`', '``') + '`' 
    637647     
     648    def is_connection_error(self, exc): 
     649        """If the given exception instance is a connection error, return True. 
     650         
     651        This should return True for errors which arise from broken connections; 
     652        for example, if the database server has dropped the connection socket, 
     653        or is unreachable. 
     654        """ 
     655        if isinstance(exc, _mysql.OperationalError): 
     656            # OperationalError: (2006, 'MySQL server has gone away') 
     657            return exc.args[0] == 2006 
     658        print exc 
     659        return False 
     660     
    638661    def execute(self, sql, conn=None): 
    639662        """Return a native response for the given SQL.""" 
    640         if conn is None: 
    641             conn = self.connections.get() 
    642         if isinstance(sql, unicode): 
    643             sql = sql.encode(self.encoding) 
    644         self.log(sql) 
    645663        try: 
    646             return conn.query(sql
     664            return geniusql.Database.execute(self, sql, conn=conn
    647665        except _mysql.OperationalError, x: 
    648666            if x.args[0] == 1030 and x.args[1] == 'Got error 139 from storage engine': 
  • trunk/geniusql/providers/pypgsql.py

    r198 r240  
    4747            else: 
    4848                raise 
    49      
    50     def _simulate_unreachable(self, callback): 
    51         oldconnect = self.Connect 
    52         oldretry = self._factory.retry 
    53         try: 
    54             connstr = "connect_timeout=1 " 
    55             for atom in self.Connect.split(" "): 
    56                 k, v = atom.split("=", 1) 
    57                 if k == 'host': 
    58                     v = 'www.example.com' 
    59                 connstr += "%s=%s " % (k, v) 
    60             self.Connect = connstr 
    61              
    62             self._factory.retry = [0] 
    63              
    64             callback() 
    65         finally: 
    66             self.Connect = oldconnect 
    67             self._factory.retry = oldretry 
    6849 
    6950 
     
    9273                raise 
    9374        return dbinfo 
     75     
     76    def is_connection_error(self, exc): 
     77        """If the given exception instance is a connection error, return True. 
     78         
     79        This should return True for errors which arise from broken connections; 
     80        for example, if the database server has dropped the connection socket, 
     81        or is unreachable. 
     82        """ 
     83        if isinstance(exc, libpq.OperationalError): 
     84            # OperationalError: server closed the connection unexpectedly 
     85            #   This probably means the server terminated abnormally 
     86            #   before or while processing the request. 
     87            # OperationalError: no connection to the server\n 
     88            msg = exc.args[0] 
     89            return (msg.startswith('no connection to the server') or 
     90                    msg.startswith('server closed the connection unexpectedly')) 
     91        elif isinstance(exc, libpq.InterfaceError): 
     92            # InterfaceError: PgConnection object is closed 
     93            msg = exc.args[0] 
     94            return msg.startswith('PgConnection object is closed') 
     95        return False 
    9496     
    9597    def execute_ddl(self, sql, conn=None): 
  • trunk/geniusql/providers/sqlite.py

    r237 r240  
    633633            # http://www.sqlite.org/cvstrac/wiki?p=InMemoryDatabase 
    634634            # So we need to give :memory: databases a SingleConnection. 
    635             self._factory = conns.SingleConnection(self._get_conn, self._del_conn) 
     635            self._factory = conns.SingleConnection(self._get_conn, self._del_conn, 
     636                                                   self.retry) 
    636637        elif not self.db.threadsafe: 
    637             self._factory = conns.ConnectionPerThread(self._get_conn, self._del_conn) 
     638            self._factory = conns.ConnectionPerThread(self._get_conn, self._del_conn, 
     639                                                      self.retry) 
    638640        else: 
    639641            # Use the default behavior (pool) 
  • trunk/geniusql/test/zoo_fixture.py

    r238 r240  
    14031403        Animal = schema['Animal'] 
    14041404         
    1405         # Simulate an unreachable host during the grabbing of a connection. 
    1406         # This is easy to simulate by just munging the hostname for the conn. 
    1407         def callback(): 
    1408             fourlegs = len(Animal.select_all(lambda x: x.Legs == 4)) 
    1409             self.assertEqual(fourlegs, 4) 
    1410          
    1411         db.connections.shutdown() 
    1412         db.connections._simulate_unreachable(callback) 
     1405        conn = db.connections.get() 
     1406        data, _ = db.fetch("SELECT 42;", conn=conn) 
     1407        self.assertEqual(int(data[0][0]), 42) 
     1408         
     1409        raw_input("Disable the server and hit Enter.") 
     1410        try: 
     1411            db.fetch("SELECT 42;", conn=conn) 
     1412        except: 
     1413            pass 
     1414        else: 
     1415            self.fail("db fetch did not raise an error.") 
     1416         
     1417        raw_input("Enable the server and hit Enter.") 
     1418        data, _ = db.fetch("SELECT 42;", conn=conn) 
     1419        self.assertEqual(int(data[0][0]), 42) 
    14131420 
    14141421