Contact: fumanchu@aminus.org

Log in as guest/geniusql to create tickets

Changeset 10

Show
Ignore:
Timestamp:
02/15/07 01:12:02
Author:
fumanchu
Message:

Separated all connection/transaction logic into a new 'connectionmanager' delegate:

  1. Changed 'db.get_transaction' to be the default behavior (of db.connections.get) and hid the old bare connection() call as '_factory()'.
  2. New Database.execute_ddl method to hide all the lock/unlock calls.
  3. New "in_transaction" method for autocommitting db's.
  4. Other name changes:
    • is_lock_error to is_timeout_error
    • disconnect to shutdown
    • transaction_key to id
    • lock_contention to contention

Also consolidated most of the two postgres modules into a single base. Also fixed a SQLiteTypeless bug.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/geniusql/__init__.py

    r9 r10  
    105105        t = self.table 
    106106        if t.created: 
    107             t.db.lock("Creating index. Transactions not allowed.") 
    108             try: 
    109                 t.db.execute('CREATE INDEX %s ON %s (%s);' % 
     107            t.db.execute_ddl('CREATE INDEX %s ON %s (%s);' % 
    110108                             (index.qname, t.qname, 
    111109                              t.db.quote(index.colname))) 
    112             finally: 
    113                 t.db.unlock() 
    114110        dict.__setitem__(self, key, index) 
    115111     
     
    118114        t = self.table 
    119115        if t.created: 
    120             t.db.lock("Dropping index. Transactions not allowed.") 
    121             try: 
    122                 t.db.execute('DROP INDEX %s ON %s;' % (self[key].qname, t.qname)) 
    123             finally: 
    124                 t.db.unlock() 
     116            t.db.execute_ddl('DROP INDEX %s ON %s;' % 
     117                             (self[key].qname, t.qname)) 
    125118        dict.__delitem__(self, key) 
    126119 
     
    271264            del self[key] 
    272265         
    273         self.db.lock("Adding property. Transactions not allowed.") 
    274         try: 
    275             if column.autoincrement: 
    276                 # This may or may not be a no-op, depending on the DB. 
    277                 self.db.create_sequence(self, column) 
    278             self._add_column(column) 
    279             dict.__setitem__(self, key, column) 
    280         finally: 
    281             self.db.unlock() 
     266        if column.autoincrement: 
     267            # This may or may not be a no-op, depending on the DB. 
     268            self.db.create_sequence(self, column) 
     269        self._add_column(column) 
     270        dict.__setitem__(self, key, column) 
    282271     
    283272    def _drop_column(self, column): 
    284273        """Internal function to drop the column from the database.""" 
    285         self.db.execute("ALTER TABLE %s DROP COLUMN %s;" % 
    286                         (self.qname, column.qname)) 
     274        self.db.execute_ddl("ALTER TABLE %s DROP COLUMN %s;" % 
     275                            (self.qname, column.qname)) 
    287276     
    288277    def __delitem__(self, key): 
     
    294283            return 
    295284         
    296         self.db.lock("Dropping property. Transactions not allowed.") 
    297         try: 
    298             column = self[key] 
    299             self._drop_column(column) 
    300             if column.autoincrement: 
    301                 # This may or may not be a no-op, depending on the DB. 
    302                 self.db.drop_sequence(column) 
    303             dict.__delitem__(self, key) 
    304         finally: 
    305             self.db.unlock() 
     285        column = self[key] 
     286        self._drop_column(column) 
     287        if column.autoincrement: 
     288            # This may or may not be a no-op, depending on the DB. 
     289            self.db.drop_sequence(column) 
     290        dict.__delitem__(self, key) 
    306291     
    307292    def _rename(self, oldcol, newcol): 
    308293        # Override this to do the actual rename at the DB level. 
    309         self.db.execute("ALTER TABLE %s RENAME COLUMN %s TO %s;" % 
    310                         (self.qname, oldcol.qname, newcol.qname)) 
     294        self.db.execute_ddl("ALTER TABLE %s RENAME COLUMN %s TO %s;" % 
     295                            (self.qname, oldcol.qname, newcol.qname)) 
    311296     
    312297    def rename(self, oldkey, newkey): 
     
    326311            newcol.name = newname 
    327312            newcol.qname = self.db.quote(newname) 
    328             self.db.lock("Renaming property. Transactions not allowed.") 
    329             try: 
    330                 self._rename(oldcol, newcol) 
    331             finally: 
    332                 self.db.unlock() 
     313            self._rename(oldcol, newcol) 
    333314         
    334315        # Use the superclass calls to avoid DROP COLUMN/ADD COLUMN. 
     
    391372                values.append(val) 
    392373         
    393         transconn = self.db.get_transaction() 
     374        conn = self.db.connections.get() 
    394375         
    395376        fields = ", ".join(fields) 
    396377        values = ", ".join(values) 
    397378        self.db.execute('INSERT INTO %s (%s) VALUES (%s);' % 
    398                         (self.qname, fields, values), transconn) 
     379                        (self.qname, fields, values), conn) 
    399380         
    400381        if idkeys: 
    401             newids = self._grab_new_ids(idkeys, transconn) 
     382            newids = self._grab_new_ids(idkeys, conn) 
    402383            for key in newids.keys(): 
    403384                col = self[key] 
     
    427408            sql = ('UPDATE %s SET %s WHERE %s;' % 
    428409                   (self.qname, ", ".join(parms), self.id_clause(**inputs))) 
    429             self.db.execute(sql, self.db.get_transaction()
     410            self.db.execute(sql
    430411     
    431412    use_asterisk_to_delete_all = False 
     
    438419            star = "" 
    439420        self.db.execute('DELETE%s FROM %s WHERE %s;' % 
    440                         (star, self.qname, self.id_clause(**inputs)), 
    441                         self.db.get_transaction()) 
     421                        (star, self.qname, self.id_clause(**inputs))) 
    442422     
    443423    def delete_all(self, **inputs): 
     
    448428            star = "" 
    449429        self.db.execute('DELETE%s FROM %s WHERE %s;' % 
    450                         (star, self.qname, self.whereclause(**inputs)), 
    451                         self.db.get_transaction()) 
     430                        (star, self.qname, self.whereclause(**inputs))) 
    452431     
    453432    def select_all(self, restriction=None, **kwargs): 
     
    511490    selectwriter = SelectWriter 
    512491    tableclass = Table 
     492    connectionmanager = ConnectionManager 
    513493     
    514494    def __new__(cls, name, **kwargs): 
     
    524504        self.name = self.sql_name(name) 
    525505        self.qname = self.quote(self.name) 
    526         self.transactions = {} 
    527         self.connect() 
     506         
     507        poolsize = kwargs.get('poolsize', 10) 
     508        self.connections = self.connectionmanager(self, poolsize) 
     509         
    528510        self.discover_dbinfo() 
    529511     
     
    717699        table.created = True 
    718700         
    719         self.lock("Creating storage. Transactions not allowed.") 
    720         try: 
    721             fields = [] 
    722             pk = [] 
    723             for column in table.itervalues(): 
    724                 if column.autoincrement: 
    725                     # This may or may not be a no-op, depending on the DB. 
    726                     self.create_sequence(table, column) 
    727                  
    728                 fields.append(self.columnclause(column)) 
    729                 if column.key: 
    730                     pk.append(column.qname) 
     701        fields = [] 
     702        pk = [] 
     703        for column in table.itervalues(): 
     704            if column.autoincrement: 
     705                # This may or may not be a no-op, depending on the DB. 
     706                self.create_sequence(table, column) 
    731707             
    732             if pk: 
    733                 pk = ", PRIMARY KEY (%s)" % ", ".join(pk) 
    734             else: 
    735                 pk = "" 
    736              
    737             self.execute('CREATE TABLE %s (%s%s);' % 
     708            fields.append(self.columnclause(column)) 
     709            if column.key: 
     710                pk.append(column.qname) 
     711         
     712        if pk: 
     713            pk = ", PRIMARY KEY (%s)" % ", ".join(pk) 
     714        else: 
     715            pk = "" 
     716         
     717        self.execute_ddl('CREATE TABLE %s (%s%s);' % 
    738718                         (table.qname, ", ".join(fields), pk)) 
    739              
    740             for index in table.indices.itervalues(): 
    741                 self.execute('CREATE INDEX %s ON %s (%s);' % 
     719         
     720        for index in table.indices.itervalues(): 
     721            self.execute_ddl('CREATE INDEX %s ON %s (%s);' % 
    742722                             (index.qname, table.qname, 
    743723                              self.quote(index.colname))) 
    744             dict.__setitem__(self, key, table) 
    745         finally: 
    746             self.unlock() 
     724         
     725        dict.__setitem__(self, key, table) 
    747726     
    748727    def __delitem__(self, key): 
    749         self.lock("Dropping storage. Transactions not allowed.") 
    750         try: 
    751             table = self[key] 
    752             self.execute('DROP TABLE %s;' % table.qname) 
    753             for col in table.itervalues(): 
    754                 if col.autoincrement: 
    755                     self.drop_sequence(col) 
    756             dict.__delitem__(self, key) 
    757         finally: 
    758             self.unlock() 
     728        table = self[key] 
     729        self.execute_ddl('DROP TABLE %s;' % table.qname) 
     730        for col in table.itervalues(): 
     731            if col.autoincrement: 
     732                self.drop_sequence(col) 
     733        dict.__delitem__(self, key) 
    759734     
    760735    def _rename(self, oldtable, newtable): 
     
    774749            newtable.name = newname 
    775750            newtable.qname = self.quote(newname) 
    776             self.lock("Renaming storage. Transactions not allowed.") 
    777             try: 
    778                 self._rename(oldtable, newname) 
    779             finally: 
    780                 self.unlock() 
     751            self._rename(oldtable, newname) 
    781752         
    782753        # Use the superclass calls to avoid DROP TABLE/CREATE TABLE. 
     
    840811        return self.tableclass(name, self.quote(name), self) 
    841812     
    842     #                             Connecting                              # 
    843      
    844     poolsize = 10 
    845      
    846     def connect(self): 
    847         if self.poolsize > 0: 
    848             self.connection = ConnectionPool(self._get_conn, self._del_conn, 
    849                                              self.poolsize) 
    850         else: 
    851             self.connection = ConnectionFactory(self._get_conn, self._del_conn) 
    852      
    853     def _get_conn(self): 
    854         """Create and return a connection object.""" 
    855         # Override this with the connection call for your DB. Example: 
    856         #     return libpq.PQconnectdb(self.connstring) 
    857         raise NotImplementedError 
    858      
    859     def _del_conn(self, conn): 
    860         """Close a connection object.""" 
    861         # Override this with the close call (if any) for your DB. 
    862         conn.close() 
    863      
    864     def disconnect(self): 
    865         """Release all database connections.""" 
    866         self.connection.shutdown() 
     813    def is_timeout_error(self, exc): 
     814        """If the given exception instance is a lock timeout, return True. 
     815         
     816        This should return True for errors which arise from locking 
     817        timeouts; for example, if the database prevents 'dirty reads' 
     818        by raising an error. 
     819        """ 
     820        # You should definitely override this for your database. 
     821        return False 
    867822     
    868823    def execute(self, query, conn=None): 
    869824        """Return a native response for the given query.""" 
    870825        if conn is None: 
    871             conn = self.connection() 
     826            conn = self.connections.get() 
    872827        if isinstance(query, unicode): 
    873828            query = query.encode(self.adaptertosql.encoding) 
     
    875830        return conn.query(query) 
    876831     
     832    def execute_ddl(self, query, conn=None): 
     833        """Return a native response for the given DDL statement. 
     834         
     835        In general, DDL statements should lock out other statements 
     836        (especially those isolated in other transactions). Use this 
     837        method to perform a locked DDL statement. 
     838        """ 
     839        self.connections.lock("Transaction denied due to DDL: %r" % query) 
     840        try: 
     841##            # Must shut down all connections to avoid 
     842##            # "being accessed by other users" error? 
     843##            self.connections.shutdown() 
     844            if conn is None: 
     845                # Important: use _factory(), not get(), to avoid the lock 
     846                conn = self.connections._factory() 
     847            self.execute(query, conn) 
     848        finally: 
     849            self.connections.unlock() 
     850     
    877851    def fetch(self, query, conn=None): 
    878852        """Return rowdata, columns (name, type) for the given query. 
     
    890864        """Yield matching data, coerced to Python types (where known).""" 
    891865        sel = self.selectwriter(self, relation, attributes, restriction) 
    892         data, _ = self.fetch(sel.sql(distinct), self.get_transaction()
     866        data, _ = self.fetch(sel.sql(distinct)
    893867        return ResultSet(data, sel.columns, sel.imperfect) 
    894868     
    895869    def create_database(self): 
    896         self.lock("Creating database. Transactions not allowed.") 
    897         try: 
    898             self.execute("CREATE DATABASE %s;" % self.qname) 
    899             self.clear() 
    900         finally: 
    901             self.unlock() 
     870        self.execute_ddl("CREATE DATABASE %s;" % self.qname) 
     871        self.clear() 
    902872     
    903873    def drop_database(self): 
    904         self.lock("Dropping database. Transactions not allowed.") 
    905         try: 
    906             # Must shut down all connections to avoid 
    907             # "being accessed by other users" error. 
    908             self.connection.shutdown() 
    909             self.execute("DROP DATABASE %s;" % self.qname) 
    910             self.clear() 
    911         finally: 
    912             self.unlock() 
    913      
    914     #                            Transactions                             # 
    915      
    916     transaction_key = threading._get_ident 
    917     implicit_trans = False 
    918      
    919     # The "default_isolation" value should be a value native to the DB. 
    920     default_isolation = None 
    921      
    922     # The values in "isolation_levels" should match the names of 
    923     # IsolationLevel objects in isolation.py 
    924     isolation_levels = ["READ UNCOMMITTED", "READ COMMITTED", 
    925                         "REPEATABLE READ", "SERIALIZABLE"] 
    926      
    927     def get_transaction(self, new=False, isolation=None): 
    928         """Return the (possibly new) connection for the current transaction. 
    929          
    930         If we are already in a transaction, this returns the connection for 
    931         that transaction. The "current transaction" is determined by a key 
    932         (obtained by a call to self.transaction_key); by default, the key 
    933         is the current thread ID (but subclasses are free to change this). 
    934          
    935         If there is no "current transaction", a new connection object is 
    936         obtained by calling self.connection (which is usually a connection 
    937         pool object). If self.implicit_trans is True, new connections will 
    938         be associated with self.transaction_key(), and repeated calls to 
    939         get_transaction will then return the same connection object. 
    940         If self.implicit_trans is False, you'll get a new connection 
    941         (from the pool) each time. 
    942         """ 
    943         key = self.transaction_key() 
    944         if key in self.transactions: 
    945             conn = self.transactions[key] 
    946             if isinstance(conn, errors.TransactionLock): 
    947                 raise conn 
    948         else: 
    949             conn = self.connection() 
    950             if self.implicit_trans or new: 
    951                 self.transactions[key] = conn 
    952                 if not new: 
    953                     self.start(isolation) 
    954         return conn 
    955      
    956     def is_lock_error(self, exc): 
    957         """If the given exception instance is a lock timeout, return True. 
    958          
    959         This should return True for errors which arise from transaction 
    960         locking timeouts; for example, if the database prevents 'dirty 
    961         reads' by raising an error. 
    962         """ 
    963         # You should definitely override this for your database. 
    964         return False 
    965      
    966     def isolate(self, conn, isolation=None): 
    967         """Set the isolation level of the given connection. 
    968          
    969         If 'isolation' is None, our default_isolation will be used for new 
    970         connections. Valid values for the 'isolation' argument may be native 
    971         values for your particular database. However, it is recommended you 
    972         pass items from the global 'levels' list instead; these will be 
    973         automatically replaced with native values. 
    974          
    975         For many databases, this must be executed after START TRANSACTION. 
    976         """ 
    977         if isolation is None: 
    978             isolation = self.default_isolation 
    979          
    980         if isinstance(isolation, IsolationLevel): 
    981             # Map the given IsolationLevel object to a native value. 
    982             isolation = isolation.name 
    983             if isolation not in self.isolation_levels: 
    984                 raise ValueError("IsolationLevel %r not allowed by %s. " 
    985                                  "Try one of %r instead." 
    986                                  % (isolation, self.__class__.__name__, 
    987                                     self.isolation_levels)) 
    988          
    989         # This is SQL92 syntax, and should work with most DB's. 
    990         self.execute("SET TRANSACTION ISOLATION LEVEL %s;" % isolation, conn) 
    991      
    992     def start(self, isolation=None): 
    993         """Start a transaction. Not needed if self.implicit_trans is True.""" 
    994         conn = self.get_transaction(new=True) 
    995         self.execute("START TRANSACTION;", conn) 
    996         self.isolate(conn, isolation) 
    997      
    998     def rollback(self): 
    999         """Roll back the current transaction, if any.""" 
    1000         key = self.transaction_key() 
    1001         if key in self.transactions: 
    1002             self.execute("ROLLBACK;", self.transactions[key]) 
    1003             del self.transactions[key] 
    1004         else: 
    1005             # This is critical in order to support polygonal SM structures 
    1006             # (same store being called twice by separate proxies). 
    1007             pass 
    1008      
    1009     def commit(self): 
    1010         """Commit the current transaction, if any.""" 
    1011         key = self.transaction_key() 
    1012         try: 
    1013             conn = self.transactions.pop(key) 
    1014         except KeyError: 
    1015             # This is critical in order to support polygonal SM structures 
    1016             # (same store being called twice by separate proxies). 
    1017             pass 
    1018         else: 
    1019             self.execute("COMMIT;", conn) 
    1020      
    1021     # Change this to 'error' if you don't want autocommit on schema ops. 
    1022     lock_contention = 'commit' 
    1023      
    1024     def lock(self, msg=None): 
    1025         """Deny transactions during schema operations (DDL statements). 
    1026          
    1027         Any code which calls this should also call 'unlock' in a try/finally: 
    1028          
    1029         db.lock('dropping storage') 
    1030         try: 
    1031             drop_storage(cls) 
    1032         finally: 
    1033             db.unlock() 
    1034         """ 
    1035         key = self.transaction_key() 
    1036         if key in self.transactions: 
    1037             if isinstance(self.transactions[key], errors.TransactionLock): 
    1038                 return 
    1039             if self.lock_contention == 'error': 
    1040                 raise errors.TransactionLock("Schema operations are not " 
    1041                                              "allowed inside transactions.") 
    1042             self.commit() 
    1043          
    1044         if msg is None: 
    1045             msg = "Transactions not allowed at the moment." 
    1046         self.transactions[key] = errors.TransactionLock(msg) 
    1047      
    1048     def unlock(self): 
    1049         """Allow transactions.""" 
    1050         key = self.transaction_key() 
    1051         trans = self.transactions.get(key, None) 
    1052         if trans is None: 
    1053             return 
    1054         if not isinstance(trans, errors.TransactionLock): 
    1055             raise errors.TransactionLock("Unlock called inside transaction.") 
    1056         del self.transactions[key] 
     874        # Must shut down all connections to avoid 
     875        # "being accessed by other users" error. 
     876        self.connections.shutdown() 
     877        self.execute_ddl("DROP DATABASE %s;" % self.qname) 
     878        self.clear() 
    1057879 
    1058880 
  • trunk/geniusql/conn.py

    r9 r10  
    1 """Objects for managing database connections with the geniusql package.""" 
     1"""Objects for managing database connections and transactions with Geniusql.""" 
    22 
    33__all__ = [ 
     4    'ConnectionManager', 
    45    'ConnectionFactory', 'ConnectionPool', 'ConnectionWrapper', 
    56    'SingleConnection', 
     
    78 
    89import Queue 
     10import threading 
    911import time 
    1012import weakref 
    1113 
    12 from geniusql import errors 
     14from geniusql import errors, isolation as _isolation 
    1315 
    1416 
     
    143145            self._conn = None 
    144146 
     147 
     148class ConnectionManager(object): 
     149     
     150    implicit_trans = False 
     151     
     152    # Change this to 'error' if you don't want autocommit on schema ops. 
     153    contention = 'commit' 
     154     
     155    # The "default_isolation" value should be a value native to the DB. 
     156    default_isolation = None 
     157     
     158    # The values in "isolation_levels" should match the names of 
     159    # IsolationLevel objects in isolation.py 
     160    isolation_levels = ["READ UNCOMMITTED", "READ COMMITTED", 
     161                        "REPEATABLE READ", "SERIALIZABLE"] 
     162     
     163    def __init__(self, db, poolsize=10): 
     164        self.transactions = {} 
     165        self.db = db 
     166        self.poolsize = poolsize 
     167        if poolsize > 0: 
     168            self._factory = ConnectionPool(self._get_conn, self._del_conn, 
     169                                           poolsize) 
     170        else: 
     171            self._factory = ConnectionFactory(self._get_conn, self._del_conn) 
     172     
     173    def shutdown(self): 
     174        """Release all database connections.""" 
     175        self._factory.shutdown() 
     176     
     177    def _get_conn(self): 
     178        """Create and return a connection object.""" 
     179        # Override this with the connection call for your DB. Example: 
     180        #     return libpq.PQconnectdb(self.connstring) 
     181        raise NotImplementedError 
     182     
     183    def _del_conn(self, conn): 
     184        """Close a connection object.""" 
     185        # Override this with the close call (if any) for your DB. 
     186        conn.close() 
     187     
     188    def get(self, started=False, isolation=None): 
     189        """Return the (possibly new) connection for the current transaction. 
     190         
     191        If we are already in a transaction, this returns the connection for 
     192        that transaction. The "current transaction" context is determined by 
     193        self.id(); by default, this is the current thread ID (but subclasses 
     194        are free to change this). If there is no "current transaction", then 
     195        a new connection object is obtained (usually from a pool). 
     196 
     197        If self.implicit_trans is True, a new connection will automatically 
     198        call "START TRANSACTION". It will also be associated with self.id(), 
     199        and any subsequent calls to this method will then return the same 
     200        connection object. If self.implicit_trans is False, new connections 
     201        won't be STARTed or stored; the only exception to this is if the 
     202        'started' argument is True, in which case the connection will be 
     203        stored but not automatically STARTed. 
     204        """ 
     205        key = self.id() 
     206        if key in self.transactions: 
     207            conn = self.transactions[key] 
     208            if isinstance(conn, errors.TransactionLock): 
     209                raise conn 
     210        else: 
     211            conn = self._factory() 
     212            if self.implicit_trans or started: 
     213                self.transactions[key] = conn 
     214                if not started: 
     215                    self.start(isolation) 
     216        return conn 
     217     
     218    def id(self): 
     219        """The current transaction id.""" 
     220        return threading._get_ident() 
     221     
     222    def isolate(self, conn, isolation=None): 
     223        """Set the isolation level of the given connection. 
     224         
     225        If 'isolation' is None, our default_isolation will be used for new 
     226        connections. Valid values for the 'isolation' argument may be native 
     227        values for your particular database. However, it is recommended you 
     228        pass items from the global 'levels' list instead; these will be 
     229        automatically replaced with native values. 
     230         
     231        For many databases, this must be executed after START TRANSACTION. 
     232        """ 
     233        if isolation is None: 
     234            isolation = self.default_isolation 
     235         
     236        if isinstance(isolation, _isolation.IsolationLevel): 
     237            # Map the given IsolationLevel object to a native value. 
     238            isolation = isolation.name 
     239            if isolation not in self.isolation_levels: 
     240                raise ValueError("IsolationLevel %r not allowed by %s. " 
     241                                 "Try one of %r instead." 
     242                                 % (isolation, self.__class__.__name__, 
     243                                    self.isolation_levels)) 
     244         
     245        # This is SQL92 syntax, and should work with most DB's. 
     246        self.db.execute("SET TRANSACTION ISOLATION LEVEL %s;" % isolation, conn) 
     247     
     248    def start(self, isolation=None): 
     249        """Start a transaction. Not needed if self.implicit_trans is True.""" 
     250        conn = self.get(started=True) 
     251        self.db.execute("START TRANSACTION;", conn) 
     252        self.isolate(conn, isolation) 
     253     
     254    def rollback(self): 
     255        """Roll back the current transaction, if any.""" 
     256        key = self.id() 
     257        if key in self.transactions: 
     258            self.db.execute("ROLLBACK;", self.transactions[key]) 
     259            del self.transactions[key] 
     260        else: 
     261            # This is critical in order to support polygonal SM structures 
     262            # (same store being called twice by separate proxies). 
     263            pass 
     264     
     265    def commit(self): 
     266        """Commit the current transaction, if any.""" 
     267        try: 
     268            conn = self.transactions.pop(self.id()) 
     269        except KeyError: 
     270            # This is critical in order to support polygonal SM structures 
     271            # (same store being called twice by separate proxies). 
     272            pass 
     273        else: 
     274            self.db.execute("COMMIT;", conn) 
     275     
     276    def lock(self, msg=None): 
     277        """Deny transactions during schema operations (DDL statements). 
     278         
     279        Any code which calls this should also call 'unlock' in a try/finally: 
     280         
     281        db.connections.lock('dropping storage') 
     282        try: 
     283            db.execute("DROP TABLE %s;" % tablename) 
     284        finally: 
     285            db.connections.unlock() 
     286        """ 
     287        key = self.id() 
     288        if key in self.transactions: 
     289            if isinstance(self.transactions[key], errors.TransactionLock): 
     290                return 
     291            if self.contention == 'error': 
     292                raise errors.TransactionLock("Schema operations are not " 
     293                                             "allowed inside transactions.") 
     294            self.commit() 
     295         
     296        if msg is None: 
     297            msg = "Transactions not allowed at the moment." 
     298        self.transactions[key] = errors.TransactionLock(msg) 
     299     
     300    def unlock(self): 
     301        """Allow transactions.""" 
     302        key = self.id() 
     303        trans = self.transactions.get(key, None) 
     304        if trans is None: 
     305            return 
     306        if not isinstance(trans, errors.TransactionLock): 
     307            raise errors.TransactionLock("Unlock called inside transaction.") 
     308        del self.transactions[key] 
     309     
     310    def in_transaction(self): 
     311        """Return True if the current context is in a transaction. 
     312         
     313        This also returns True if the current context is executing DDL 
     314        statements, or is barred from starting a transaction for some 
     315        other reason. 
     316        """ 
     317        trans = self.transactions.get(self.id()) 
     318        if trans is None or isinstance(trans, errors.TransactionLock): 
     319            return False 
     320        return True 
     321 
  • trunk/geniusql/isolation.py

    r9 r10  
    1 """Isolation Level definitions for Dejavu 
     1"""Isolation Level definitions for Geniusql 
    22 
    33We follow the terminology of ANSI for specifying isolation levels, 
  • trunk/geniusql/providers/__init__.py

    r9 r10  
    4747    "mysql": "geniusql.providers.mysql.MySQLDatabase", 
    4848     
    49     "postgres": "geniusql.providers.pypgsql.PgDatabase", 
    50     "postgresql": "geniusql.providers.pypgsql.PgDatabase", 
    51     "pypgsql": "geniusql.providers.pypgsql.PgDatabase", 
     49    "postgres": "geniusql.providers.pypgsql.PyPgDatabase", 
     50    "postgresql": "geniusql.providers.pypgsql.PyPgDatabase", 
     51    "pypgsql": "geniusql.providers.pypgsql.PyPgDatabase", 
    5252     
    5353    "psycopg": "geniusql.providers.psycopg.PsycoPgDatabase", 
  • trunk/geniusql/providers/ado.py

    r9 r10  
    365365        coldef = self.db.columnclause(column) 
    366366        # SQL Server doesn't use the "COLUMN" keyword with "ADD" 
    367         self.db.execute("ALTER TABLE %s ADD %s;" % (self.qname, coldef)) 
     367        self.db.execute_ddl("ALTER TABLE %s ADD %s;" % (self.qname, coldef)) 
    368368     
    369369    def _rename(self, oldcol, newcol): 
    370         conn = self.db.connection() 
     370        conn = self.db.connections.get() 
    371371        try: 
    372372            cat = win32com.client.Dispatch(r'ADOX.Catalog') 
     
    387387 
    388388 
     389class ADOConnectionManager(geniusql.ConnectionManager): 
     390     
     391    # the amount of time to try to close the db connection 
     392    # before raising an exception 
     393    shutdowntimeout = 1 # sec. 
     394     
     395    ConnectionTimeout = None 
     396    CommandTimeout = None 
     397     
     398    def _get_conn(self, master=False): 
     399        if master: 
     400            # Must shut down all connections to avoid 
     401            # "being accessed by other users" error. 
     402            self.shutdown() 
     403             
     404            atoms = connatoms(self.Connect) 
     405            atoms['INITIAL CATALOG'] = "tempdb" 
     406            connectstr = "; ".join(["%s=%s" % (k, v) 
     407                                    for k, v in atoms.iteritems()]) 
     408        else: 
     409            connectstr = self.Connect 
     410         
     411        conn = win32com.client.Dispatch(r'ADODB.Connection') 
     412        conn.Open(connectstr) 
     413        if self.ConnectionTimeout is not None: 
     414            conn.ConnectionTimeout = self.ConnectionTimeout 
     415        if self.CommandTimeout is not None: 
     416            conn.CommandTimeout = self.CommandTimeout 
     417        return conn 
     418     
     419    def _del_conn(self, conn): 
     420        for trial in xrange(self.shutdowntimeout * 10): 
     421            try: 
     422                # This may raise "Operation cannot be performed 
     423                # while executing asynchronously" 
     424                # if a prior operation has not yet completed. 
     425                conn.Close() 
     426                return 
     427            except pywintypes.com_error, e: 
     428                try: 
     429                    ecode = e.args[2][-1] 
     430                except IndexError: 
     431                    ecode = None 
     432                if ecode == -2146824577: 
     433                    # "Operation cannot be performed while executing asynchronously" 
     434                    # Try again... 
     435                    time.sleep(0.1) 
     436                    continue 
     437                raise 
     438     
     439    #                            Transactions                             # 
     440     
     441    def start(self, isolation=None): 
     442        """Start a transaction. Not needed if self.implicit_trans is True.""" 
     443        conn = self.get(started=True) 
     444        self.db.execute("BEGIN TRANSACTION;", conn) 
     445        self.isolate(conn, isolation) 
     446 
     447 
    389448class ADODatabase(geniusql.Database): 
    390449     
     
    393452    tableclass = ADOTable 
    394453     
    395     # the amount of time to try to close the db connection 
    396     # before raising an exception 
    397     shutdowntimeout = 1 # sec. 
    398      
     454    def __init__(self, name, **kwargs): 
     455        geniusql.Database.__init__(self, name, **kwargs) 
     456        self.connections.Connect = self.Connect 
    399457     
    400458    #                              Discovery                              # 
     
    587645     
    588646    def _rename(self, oldtable, newtable): 
    589         conn = self.connection() 
     647        conn = self.connections.get() 
    590648        try: 
    591649            cat = win32com.client.Dispatch(r'ADOX.Catalog') 
     
    602660        return '[' + name + ']' 
    603661     
    604     #                             Connecting                              # 
    605      
    606     ConnectionTimeout = None 
    607     CommandTimeout = None 
    608      
    609     def _get_conn(self): 
    610         conn = win32com.client.Dispatch(r'ADODB.Connection') 
    611         conn.Open(self.Connect) 
    612         if self.ConnectionTimeout is not None: 
    613             conn.ConnectionTimeout = self.ConnectionTimeout 
    614         if self.CommandTimeout is not None: 
    615             conn.CommandTimeout = self.CommandTimeout 
    616         return conn 
    617      
    618     def _del_conn(self, conn): 
    619         for trial in xrange(self.shutdowntimeout * 10): 
    620             try: 
    621                 # This may raise "Operation cannot be performed 
    622                 # while executing asynchronously" 
    623                 # if a prior operation has not yet completed. 
    624                 conn.Close() 
    625                 return 
    626             except pywintypes.com_error, e: 
    627                 try: 
    628                     ecode = e.args[2][-1] 
    629                 except IndexError: 
    630                     ecode = None 
    631                 if ecode == -2146824577: 
    632                     # "Operation cannot be performed while executing asynchronously" 
    633                     # Try again... 
    634                     time.sleep(0.1) 
    635                     continue 
    636                 raise 
    637      
    638662    def execute(self, query, conn=None): 
    639663        if conn is None: 
    640             conn = self.connection() 
     664            conn = self.connections.get() 
    641665        if isinstance(query, unicode): 
    642666            query = query.encode(self.adaptertosql.encoding) 
    643667         
    644         self.log(query
     668        self.log(repr((id(conn), id(getattr(conn, "conn", None)), query))
    645669        try: 
    646             if isinstance(conn, geniusql.ConnectionWrapper): 
     670            bareconn = conn 
     671            if hasattr(conn, 'conn'): 
    647672                # 'conn' is a ConnectionWrapper object, which .Open 
    648673                # won't accept. Pass the unwrapped connection instead. 
    649                 conn = conn.conn 
     674                # Note that we CANNOT write "conn = conn.conn", because 
     675                # if we called get() above, we'd lose our only 
     676                # reference to the wrapper and our weakref callback 
     677                # would close the conn before we've executed the SQL. 
     678                bareconn = conn.conn 
    650679             
    651680            # Call Execute directly, skipping win32com overhead. 
    652             conn._oleobj_.InvokeTypes(6, 0, 1, (9, 0), 
    653                                       ((8, 1), (16396, 18), (3, 49)), 
    654                                       query, pythoncom.Missing, -1) 
     681            bareconn._oleobj_.InvokeTypes(6, 0, 1, (9, 0), 
     682                                          ((8, 1), (16396, 18), (3, 49)), 
     683                                          query, pythoncom.Missing, -1) 
    655684        except pywintypes.com_error, x: 
    656685            x.args += (query, ) 
     
    661690        """fetch(query, conn=None) -> rowdata, columns.""" 
    662691        if conn is None: 
    663             conn = self.connection() 
     692            conn = self.connections.get() 
    664693         
    665694        try: 
     
    670699                                                query, Empty, Empty) 
    671700            else: 
    672                 self.log(query) 
    673                 if isinstance(conn, geniusql.ConnectionWrapper): 
     701                self.log(repr((id(conn), id(getattr(conn, "conn", None)), query))) 
     702                bareconn = conn 
     703                if hasattr(conn, 'conn'): 
    674704                    # 'conn' is a ConnectionWrapper object, which .Open 
    675705                    # won't accept. Pass the unwrapped connection instead. 
    676                     conn = conn.conn 
     706                    bareconn = conn.conn 
    677707                 
    678708                # Call conn.Open(query) directly, skipping win32com overhead. 
    679                 res, rows_affected = conn._oleobj_.InvokeTypes(6, 0, 1, (9, 0), 
     709                res, rows_affected = bareconn._oleobj_.InvokeTypes(6, 0, 1, (9, 0), 
    680710                                                ((8, 1), (16396, 18), (3, 49)), 
    681711                                                # *args = 
     
    725755         
    726756        return data, columns 
    727      
    728      
    729     #                            Transactions                             # 
    730      
    731     def start(self, isolation=None): 
    732         """Start a transaction. Not needed if self.implicit_trans is True.""" 
    733         conn = self.get_transaction(new=True) 
    734         self.execute("BEGIN TRANSACTION;", conn) 
    735         self.isolate(conn, isolation) 
    736757 
    737758 
     
    840861     
    841862    def _rename(self, oldcol, newcol): 
    842         self.db.execute("EXEC sp_rename '%s.%s', '%s', 'COLUMN'" % 
    843                         (self.name, oldcol.name, newcol.name)) 
     863        self.db.execute_ddl("EXEC sp_rename '%s.%s', '%s', 'COLUMN'" % 
     864                            (self.name, oldcol.name, newcol.name)) 
    844865     
    845866    def _grab_new_ids(self, idkeys, conn): 
     
    853874 
    854875 
     876class SQLServerConnectionManager(ADOConnectionManager): 
     877     
     878    default_isolation = "READ COMMITTED" 
     879 
     880 
    855881class SQLServerDatabase(ADODatabase): 
    856882     
     
    859885    adaptertosql = AdapterToADOSQL_SQLServer() 
    860886    typeadapter = TypeAdapter_SQLServer() 
     887    connectionmanager = SQLServerConnectionManager 
    861888     
    862889    def __init__(self, name, **kwargs): 
    863890        ADODatabase.__init__(self, name, **kwargs) 
    864891        if "2005" in self.version(): 
    865             self.isolation_levels.append("SNAPSHOT") 
     892            self.connections.isolation_levels.append("SNAPSHOT") 
    866893     
    867894    def version(self): 
    868         adoconn = self._template_conn(
    869         adov = adoconn.Version 
    870         data, coldefs = self.fetch("SELECT @@VERSION;", adoconn) 
     895        conn = self.connections._get_conn(master=True
     896        adov = conn.Version 
     897        data, coldefs = self.fetch("SELECT @@VERSION;", conn) 
    871898        sqlv, = data[0] 
    872         adoconn.Close() 
    873         del adoconn 
     899        conn.Close() 
     900        del conn 
    874901        return "ADO Version: %s\n%s" % (adov, sqlv) 
    875902     
    876     def _template_conn(self): 
    877         adoconn = win32com.client.Dispatch(r'ADODB.Connection') 
    878         atoms = connatoms(self.Connect) 
    879         atoms['INITIAL CATALOG'] = "tempdb" 
    880         adoconn.Open("; ".join(["%s=%s" % (k, v) for k, v in atoms.iteritems()])) 
    881         return adoconn 
    882      
    883903    def create_database(self): 
    884         self.lock("Creating database. Transactions not allowed.") 
    885         try: 
    886             adoconn = self._template_conn() 
    887             adoconn.Execute("CREATE DATABASE %s" % self.qname) 
    888             adoconn.Close() 
    889             self.clear() 
    890         finally: 
    891             self.unlock() 
     904        conn = self.connections._get_conn(master=True) 
     905        self.execute_ddl("CREATE DATABASE %s;" % self.qname, conn) 
     906        conn.Close() 
     907        self.clear() 
    892908     
    893909    def drop_database(self): 
    894         self.lock("Dropping database. Transactions not allowed.") 
    895         try: 
    896             # Must shut down all connections to avoid 
    897             # "being accessed by other users" error. 
    898             self.connection.shutdown() 
    899              
    900             adoconn = self._template_conn() 
    901             adoconn.Execute("DROP DATABASE %s;" % self.qname) 
    902             adoconn.Close() 
    903             self.clear() 
    904         finally: 
    905             self.unlock() 
     910        conn = self.connections._get_conn(master=True) 
     911        self.execute_ddl("DROP DATABASE %s;" % self.qname, conn) 
     912        conn.Close() 
     913        self.clear() 
    906914     
    907915    def columnclause(self, column): 
     
    929937        return '%s %s%s' % (column.qname, dbtype, clause) 
    930938     
    931     #                            Transactions                             # 
    932      
    933     default_isolation = "READ COMMITTED" 
    934      
    935     def is_lock_error(self, exc): 
     939    def is_timeout_error(self, exc): 
    936940        """If the given exception instance is a lock timeout, return True. 
    937941         
     
    10551059 
    10561060 
     1061class MSAccessConnectionManager(ADOConnectionManager): 
     1062     
     1063    poolsize = 0 
     1064    default_isolation = "READ UNCOMMITTED" 
     1065    isolation_levels = ["READ UNCOMMITTED",] 
     1066     
     1067    def __init__(self, db, poolsize=0): 
     1068        self.transactions = {} 
     1069        self.db = db 
     1070        self.poolsize = 0 
     1071        # MS Access can't use a pool, because there doesn't seem 
     1072        # to be a commit timeout. See http://support.microsoft.com/kb/200300 
     1073        # for additional synchronization issues. 
     1074        self._factory = geniusql.SingleConnection(self._get_conn, self._del_conn) 
     1075     
     1076    def isolate(self, conn, isolation=None): 
     1077        """Set the isolation level of the given connection. 
     1078         
     1079        If 'isolation' is None, our default_isolation will be used for new 
     1080        connections. Valid values for the 'isolation' argument may be native 
     1081        values for your particular database. However, it is recommended you 
     1082        pass items from the global 'levels' list instead; these will be 
     1083        automatically replaced with native values. 
     1084         
     1085        For many databases, this must be executed after START TRANSACTION. 
     1086        """ 
     1087        if isolation is None: 
     1088            isolation = self.default_isolation 
     1089         
     1090        if isinstance(isolation, geniusql.IsolationLevel): 
     1091            # Map the given IsolationLevel object to a native value. 
     1092            isolation = isolation.name 
     1093            if isolation not in self.isolation_levels: 
     1094                raise ValueError("IsolationLevel %r not allowed by %s." 
     1095                                 % (isolation, self.__class__.__name__)) 
     1096         
     1097        # No action to take, since you can't actually set iso level. 
     1098        pass 
     1099 
     1100 
    10571101class MSAccessDatabase(ADODatabase): 
    10581102     
     
    10611105    typeadapter = TypeAdapter_MSAccess() 
    10621106    tableclass = MSAccessTable 
     1107    connectionmanager = MSAccessConnectionManager 
    10631108     
    10641109    def version(self): 
    1065         adoconn = win32com.client.Dispatch(r'ADODB.Connection') 
    1066         v = adoconn.Version 
    1067         del adoconn 
     1110        conn = win32com.client.Dispatch(r'ADODB.Connection') 
     1111        v = conn.Version 
     1112        del conn 
    10681113        return "ADO Version: %s" % v 
    10691114     
     
    10711116        cols = ADODatabase._get_columns(self, tablename, conn) 
    10721117        if conn is None: 
    1073             conn = self.connection() 
     1118            conn = self.connections._factory() 
    10741119         
    10751120        try: 
    10761121            # Horrible hack to get autoincrement property 
    10771122            query = "SELECT * FROM %s WHERE FALSE" % self.quote(tablename) 
    1078             if isinstance(conn, geniusql.ConnectionWrapper): 
     1123            bareconn = conn 
     1124            if hasattr(conn, 'conn'): 
    10791125                # 'conn' is a ConnectionWrapper object, which .Open 
    10801126                # won't accept. Pass the unwrapped connection instead. 
    1081                 conn = conn.conn 
     1127                bareconn = conn.conn 
    10821128             
    10831129            # Call conn.Open(query) directly, skipping win32com overhead. 
     
    11491195        return '%s %s' % (column.qname, dbtype) 
    11501196     
    1151     #                             Connecting                              # 
    1152      
    1153     poolsize = 0 
    1154      
    1155     def connect(self): 
    1156         # MS Access can't use a pool, because there doesn't seem 
    1157         # to be a commit timeout. See http://support.microsoft.com/kb/200300 
    1158         # for additional synchronization issues. 
    1159         self.connection = geniusql.SingleConnection(self._get_conn, self._del_conn) 
    1160      
    11611197    def create_database(self): 
    1162         self.lock("Creating database. Transactions not allowed.") 
    1163         try: 
    1164             # By not providing an Engine Type, it defaults to 5 = Access 2000. 
    1165             cat = win32com.client.Dispatch(r'ADOX.Catalog') 
    1166             cat.Create(self.Connect) 
    1167             cat.ActiveConnection.Close() 
    1168             self.clear() 
    1169         finally: 
    1170             self.unlock() 
     1198        # By not providing an Engine Type, it defaults to 5 = Access 2000. 
     1199        cat = win32com.client.Dispatch(r'ADOX.Catalog') 
     1200        cat.Create(self.Connect) 
     1201        cat.ActiveConnection.Close() 
     1202        self.clear() 
    11711203     
    11721204    def drop_database(self): 
    1173         self.lock("Dropping database. Transactions not allowed.") 
    1174         try: 
    1175             # Must shut down our only connection to avoid 
    1176             # "Permission denied" error on os.remove call below. 
    1177             self.connection.shutdown() 
    1178              
    1179             import os 
    1180             # This should accept relative or absolute paths 
    1181             if os.path.exists(self.name): 
    1182                 os.remove(self.name) 
    1183             self.clear() 
    1184         finally: 
    1185             self.unlock() 
    1186      
    1187     default_isolation = "READ UNCOMMITTED" 
    1188     isolation_levels = ["READ UNCOMMITTED",] 
    1189      
    1190     def isolate(self, conn, isolation=None): 
    1191         """Set the isolation level of the given connection. 
    1192          
    1193         If 'isolation' is None, our default_isolation will be used for new 
    1194         connections. Valid values for the 'isolation' argument may be native 
    1195         values for your particular database. However, it is recommended you 
    1196         pass items from the global 'levels' list instead; these will be 
    1197         automatically replaced with native values. 
    1198          
    1199         For many databases, this must be executed after START TRANSACTION. 
    1200         """ 
    1201         if isolation is None: 
    1202             isolation = self.default_isolation 
    1203          
    1204         if isinstance(isolation, geniusql.IsolationLevel): 
    1205             # Map the given IsolationLevel object to a native value. 
    1206             isolation = isolation.name 
    1207             if isolation not in self.isolation_levels: 
    1208                 raise ValueError("IsolationLevel %r not allowed by %s." 
    1209                                  % (isolation, self.__class__.__name__)) 
    1210          
    1211         # No action to take, since you can't actually set iso level. 
    1212         pass 
     1205        # Must shut down our only connection to avoid 
     1206        # "Permission denied" error on os.remove call below. 
     1207        self.connections.shutdown() 
     1208         
     1209        import os 
     1210        # This should accept relative or absolute paths 
     1211        if os.path.exists(self.name): 
     1212            os.remove(self.name) 
     1213        self.clear() 
    12131214 
    12141215 
  • trunk/geniusql/providers/firebird.py

    r9 r10  
    243243        coldef = self.db.columnclause(column) 
    244244        # FB doesn't recognize the keyword "COLUMN" in "ADD". 
    245         self.db.execute("ALTER TABLE %s ADD %s;" % (self.qname, coldef)) 
     245        self.db.execute_ddl("ALTER TABLE %s ADD %s;" % (self.qname, coldef)) 
    246246     
    247247    def _drop_column(self, column): 
    248248        """Internal function to drop the column from the database.""" 
    249249        # FB doesn't recognize the keyword "COLUMN" in "DROP". 
    250         self.db.execute("ALTER TABLE %s DROP %s;" % 
    251                         (self.qname, column.qname)) 
     250        self.db.execute_ddl("ALTER TABLE %s DROP %s;" % 
     251                            (self.qname, column.qname)) 
    252252     
    253253    def _rename(self, oldcol, newcol): 
    254254        # FB doesn't use the keyword "RENAME". 
    255         self.db.execute("ALTER TABLE %s ALTER COLUMN %s TO %s;" % 
    256                         (self.qname, oldcol.qname, newcol.qname)) 
     255        self.db.execute_ddl("ALTER TABLE %s ALTER COLUMN %s TO %s;" % 
     256                            (self.qname, oldcol.qname, newcol.qname)) 
    257257     
    258258    def insert(self, **inputs): 
     
    283283        values = ", ".join(values) 
    284284        self.db.execute('INSERT INTO %s (%s) VALUES (%s);' 
    285                         % (self.qname, fields, values), 
    286                         self.db.get_transaction()) 
     285                        % (self.qname, fields, values)) 
    287286         
    288287        return newids 
     288 
     289 
     290class FirebirdConnectionManager(geniusql.ConnectionManager): 
     291     
     292    default_isolation = kinterbasdb.isc_tpb_read_committed 
     293    _no_iso_tpb = ( 
     294        kinterbasdb.isc_tpb_version3 
     295        + kinterbasdb.isc_tpb_shared 
     296        + kinterbasdb.isc_tpb_nowait 
     297        + kinterbasdb.isc_tpb_write 
     298        + kinterbasdb.isc_tpb_rec_version 
     299        ) 
     300    isolation_levels = ["READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE"] 
     301     
     302    def _get_conn(self): 
     303        conn = kinterbasdb.connect(host=self.db.host, 
     304                                   database=self.db.name, 
     305                                   user=self.db.user, 
     306                                   password=self.db.password, 
     307                                   charset=self.db.encoding, 
     308                                   ) 
     309        # Set the default TPB (for implicit transactions). 
     310        conn.default_tpb = self._no_iso_tpb + self.default_isolation 
     311         
     312        # Remove converters for FIXED so we can mix fixedpoint and decimal 
     313        conn.set_type_trans_in({'FIXED': None}) 
     314        conn.set_type_trans_out({'FIXED': None}) 
     315        return conn 
     316     
     317    def isolate(self, isolation=None): 
     318        """isolate() is not implemented for Firebird.""" 
     319        raise NotImplementedError("Firebird does not allow arbitrary re-isolation.") 
     320     
     321    def start(self, isolation=None): 
     322        """Start a transaction. Not needed if self.implicit_trans is True.""" 
     323        if isolation is None: 
     324            isolation = self.default_isolation 
     325         
     326        if isinstance(isolation, geniusql.IsolationLevel): 
     327            # Map the given IsolationLevel object to a native value. 
     328            # This base class uses the four ANSI names as native values. 
     329            isolation = isolation.name 
     330            if isolation not in self.isolation_levels: 
     331                raise ValueError("IsolationLevel %r not allowed by %s. " 
     332                                 "Try one of %r instead." 
     333                                 % (isolation, self.__class__.__name__, 
     334                                    self.isolation_levels)) 
     335             
     336            if isolation == "READ COMMITTED": 
     337                isolation = kinterbasdb.isc_tpb_read_committed 
     338            elif isolation == "REPEATABLE READ": 
     339                isolation = kinterbasdb.isc_tpb_concurrency 
     340            else: 
     341                isolation = kinterbasdb.isc_tpb_consistency 
     342         
     343        conn = self.get(started=True) 
     344        self.db.log("START TRANSACTION;") 
     345        conn.begin(self._no_iso_tpb + isolation) 
     346     
     347    def rollback(self): 
     348        """Roll back the current transaction, if any.""" 
     349        try: 
     350            conn = self.transactions.pop(self.id()) 
     351        except KeyError: 
     352            pass 
     353        else: 
     354            self.db.log("ROLLBACK;") 
     355            conn.rollback() 
     356     
     357    def commit(self): 
     358        """Commit the current transaction, if any.""" 
     359        try: 
     360            conn = self.transactions.pop(self.id()) 
     361        except KeyError: 
     362            pass 
     363        else: 
     364            self.db.log("COMMIT;") 
     365            conn.commit() 
    289366 
    290367 
     
    298375    typeadapter = TypeAdapterFirebird() 
    299376    tableclass = FirebirdTable 
     377    connectionmanager = FirebirdConnectionManager 
    300378     
    301379    sql_name_max_length = 31 
     
    315393         
    316394        self.qname = self.quote(self.name) 
    317         self.transactions = {} 
    318         self.connect() 
     395        self.connections = self.connectionmanager(self, kwargs.get('poolsize', 10)) 
    319396        self.discover_dbinfo() 
    320397     
     
    478555            sname = self.quote("%s_%s_seq" % (table.name, column.name)) 
    479556            column.sequence_name = sname 
    480         self.execute("CREATE GENERATOR %s;" % sname) 
    481         self.execute("SET GENERATOR %s TO %s;" % (sname, column.initial - 1)) 
     557        self.execute_ddl("CREATE GENERATOR %s;" % sname) 
     558        self.execute_ddl("SET GENERATOR %s TO %s;" % (sname, column.initial - 1)) 
    482559     
    483560    def drop_sequence(self, column): 
    484561        """Drop a SEQUENCE for the given column and remove its sequence_name.""" 
    485562        if column.sequence_name is not None: 
    486             self.execute("DROP GENERATOR %s;" % column.sequence_name) 
     563            self.execute_ddl("DROP GENERATOR %s;" % column.sequence_name) 
    487564            column.sequence_name = None 
    488565     
     
    493570        return '"' + name.replace('"', '""') + '"' 
    494571     
    495     def _get_conn(self): 
    496         conn = kinterbasdb.connect(host=self.host, 
    497                                    database=self.name, 
    498                                    user=self.user, 
    499                                    password=self.password, 
    500                                    charset=self.encoding, 
    501                                    ) 
    502         # Set the default TPB (for implicit transactions). 
    503         conn.default_tpb = self._no_iso_tpb + self.default_isolation 
    504          
    505         # Remove converters for FIXED so we can mix fixedpoint and decimal 
    506         conn.set_type_trans_in({'FIXED': None}) 
    507         conn.set_type_trans_out({'FIXED': None}) 
    508         return conn 
    509      
    510572    deadlock_timeout = 10 
    511573     
     
    513575        try: 
    514576            if conn is None: 
    515                 conn = self.connection() 
     577                conn = self.connections.get() 
    516578            if isinstance(query, unicode): 
    517579                query = query.encode(self.adaptertosql.encoding) 
     
    526588                    # If we're not in a transaction, we need to auto-commit. 
    527589                    # This prevents "Previous transaction still active" errors. 
    528                     key = self.transaction_key() 
    529                     trans = self.transactions.get(key) 
    530                     if trans is None or isinstance(trans, errors.TransactionLock): 
     590                    if not self.connections.in_transaction(): 
    531591                        conn.commit() 
    532592                     
    533593                    return 
    534594                except Exception, x: 
    535                     if self.is_lock_error(x) and self.deadlock_timeout: 
     595                    if self.is_timeout_error(x) and self.deadlock_timeout: 
    536596                        if time.time() - start < self.deadlock_timeout: 
    537597                            time.sleep(0.000001) 
     
    553613        try: 
    554614            if conn is None: 
    555                 conn = self.connection() 
     615                conn = self.connections.get() 
    556616            if isinstance(query, unicode): 
    557617                query = query.encode(self.adaptertosql.encoding) 
     
    566626            # If we're not in a transaction, we need to auto-commit. 
    567627            # This prevents "Previous transaction still active" errors. 
    568             key = self.transaction_key() 
    569             trans = self.transactions.get(key) 
    570             if trans is None or isinstance(trans, errors.TransactionLock): 
     628            if not self.connections.in_transaction(): 
    571629                conn.commit() 
    572630             
     
    581639     
    582640    def create_database(self): 
    583         self.lock("Creating database. Transactions not allowed.") 
    584         try: 
    585             # Firebird DB 'names' are actually filesystem paths. 
    586             sql = ("CREATE DATABASE %s USER '%s' PASSWORD '%s';" 
    587                    % (self.qname, self.user, self.password)) 
    588              
    589             # Use the kinterbasdb helper methods for cleaner create and drop. 
    590             # We also use dialect 3 *always* to help with quoted identifiers. 
    591             conn = kinterbasdb.create_database(sql, 3) 
    592             conn.close() 
    593              
    594             self.clear() 
    595         finally: 
    596             self.unlock() 
     641        # Firebird DB 'names' are actually filesystem paths. 
     642        sql = ("CREATE DATABASE %s USER '%s' PASSWORD '%s';" 
     643               % (self.qname, self.user, self.password)) 
     644         
     645        # Use the kinterbasdb helper methods for cleaner create and drop. 
     646        # We also use dialect 3 *always* to help with quoted identifiers. 
     647        conn = kinterbasdb.create_database(sql, 3) 
     648        conn.close() 
     649         
     650        self.clear() 
    597651     
    598652    def drop_database(self): 
    599         self.lock("Dropping database. Transactions not allowed.") 
    600         try: 
    601             # Must shut down all connections to avoid 
    602             # "being accessed by other users" error. 
    603             self.connection.shutdown() 
    604              
    605             conn = self._get_conn() 
    606             conn.drop_database() 
    607              
    608             self.clear() 
    609         finally: 
    610             self.unlock() 
    611      
    612     #                            Transactions                             # 
    613      
    614     default_isolation = kinterbasdb.isc_tpb_read_committed 
    615     _no_iso_tpb = ( 
    616         kinterbasdb.isc_tpb_version3 
    617         + kinterbasdb.isc_tpb_shared 
    618         + kinterbasdb.isc_tpb_nowait 
    619         + kinterbasdb.isc_tpb_write 
    620         + kinterbasdb.isc_tpb_rec_version 
    621         ) 
    622     isolation_levels = ["READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE"] 
    623      
    624     def is_lock_error(self, exc): 
     653        # Must shut down all connections to avoid 
     654        # "being accessed by other users" error. 
     655        self.connections.shutdown() 
     656         
     657        conn = self.connections._get_conn() 
     658        conn.drop_database() 
     659        # For some reason, the conn is already closed... 
     660##        conn.close() 
     661        self.clear() 
     662     
     663    def is_timeout_error(self, exc): 
    625664        """If the given exception instance is a lock timeout, return True. 
    626665         
     
    636675        return "lock conflict" in exc.args[1] 
    637676     
    638     def isolate(self, isolation=None): 
    639         """isolate() is not implemented for Firebird.""" 
    640         raise NotImplementedError("Firebird does not allow arbitrary re-isolation.") 
    641      
    642     def start(self, isolation=None): 
    643         """Start a transaction. Not needed if self.implicit_trans is True.""" 
    644         if isolation is None: 
    645             isolation = self.default_isolation 
    646          
    647         if isinstance(isolation, geniusql.IsolationLevel): 
    648             # Map the given IsolationLevel object to a native value. 
    649             # This base class uses the four ANSI names as native values. 
    650             isolation = isolation.name 
    651             if isolation not in self.isolation_levels: 
    652                 raise ValueError("IsolationLevel %r not allowed by %s. " 
    653                                  "Try one of %r instead." 
    654                                  % (isolation, self.__class__.__name__, 
    655                                     self.isolation_levels)) 
    656              
    657             if isolation == "READ COMMITTED": 
    658                 isolation = kinterbasdb.isc_tpb_read_committed 
    659             elif isolation == "REPEATABLE READ": 
    660                 isolation = kinterbasdb.isc_tpb_concurrency 
    661             else: 
    662                 isolation = kinterbasdb.isc_tpb_consistency 
    663          
    664         conn = self.get_transaction(new=True) 
    665         conn.begin(self._no_iso_tpb + isolation) 
    666      
    667     def rollback(self): 
    668         """Roll back the current transaction, if any.""" 
    669         key = self.transaction_key() 
    670         try: 
    671             conn = self.transactions.pop(key) 
    672         except KeyError: 
    673             pass 
    674         else: 
    675             conn.rollback() 
    676      
    677     def commit(self): 
    678         """Commit the current transaction, if any.""" 
    679         key = self.transaction_key() 
    680         try: 
    681             conn = self.transactions.pop(key) 
    682         except KeyError: 
    683             pass 
    684         else: 
    685             conn.commit() 
    686      
    687677    def version(self): 
    688678        import kinterbasdb.services 
  • trunk/geniusql/providers/mysql.py

    r9 r10  
    141141    def __delitem__(self, key): 
    142142        t = self.table 
    143         t.db.lock("Dropping index. Transactions not allowed.") 
    144         try: 
    145             # MySQL might rename multiple-column indices to "PRIMARY" 
    146             for i in t.db._get_indices(t.name): 
    147                 if i.colname == self[key].colname: 
    148                     t.db.execute('DROP INDEX %s ON %s;' % (i.qname, t.qname)) 
    149         finally: 
    150             t.db.unlock() 
     143        # MySQL might rename multiple-column indices to "PRIMARY" 
     144        for i in t.db._get_indices(t.name): 
     145            if i.colname == self[key].colname: 
     146                t.db.execute_ddl('DROP INDEX %s ON %s;' % (i.qname, t.qname)) 
    151147 
    152148 
     
    156152     
    157153    def _rename(self, oldcol, newcol): 
    158         self.db.execute("ALTER TABLE %s CHANGE %s %s %s;" % 
    159                         (self.qname, oldcol.qname, newcol.qname, 
    160                          oldcol.dbtype)) 
     154        self.db.execute_ddl("ALTER TABLE %s CHANGE %s %s %s;" % 
     155                            (self.qname, oldcol.qname, newcol.qname, 
     156                             oldcol.dbtype)) 
    161157     
    162158    def _grab_new_ids(self, idkeys, conn): 
    163159        return {idkeys[0]: conn.insert_id()} 
     160 
    164161 
    165162 
     
    169166            "cursorclass", "client_flag", 
    170167            ] 
     168 
     169class MySQLConnectionManager(geniusql.ConnectionManager): 
     170     
     171    # InnoDB default 
     172    default_isolation = "REPEATABLE READ" 
     173     
     174    def _get_conn(self, master=False): 
     175        if master: 
     176            args = self.connargs.copy() 
     177            args['db'] = 'mysql' 
     178        else: 
     179            args = self.connargs 
     180         
     181        try: 
     182            conn = _mysql.connect(**args) 
     183        except _mysql.OperationalError, x: 
     184            if x.args[0] == 1040:   # Too many connections 
     185                raise errors.OutOfConnectionsError 
     186            raise 
     187        return conn 
     188 
    171189 
    172190class MySQLDatabase(geniusql.Database): 
     
    187205    tableclass = MySQLTable 
    188206    indexsetclass = MySQLIndexSet 
    189      
    190     # InnoDB default 
    191     default_isolation = "REPEATABLE READ" 
     207    connectionmanager = MySQLConnectionManager 
    192208     
    193209    def __init__(self, name, **kwargs): 
    194210        geniusql.Database.__init__(self, name, **kwargs) 
    195211         
    196         self.connargs = dict([(k, v) for k, v in kwargs.iteritems() 
    197                               if k in connargs]) 
     212        self.connections.connargs = dict([(k, v) for k, v in kwargs.iteritems() 
     213                                          if k in connargs]) 
    198214         
    199215        self.decompiler = MySQLDecompiler 
     
    201217        # Get the version string from MySQL, to see if we need 
    202218        # a different decompiler. 
    203         conn = self._template_conn(
     219        conn = self.connections._get_conn(master=True
    204220        rowdata, cols = self.fetch("SELECT version();", conn) 
    205221        conn.close() 
     
    274290            encoding = " CHARACTER SET %s" % encoding 
    275291         
    276         self.lock("Creating storage. Transactions not allowed.") 
    277         try: 
    278             self.execute('CREATE TABLE %s (%s%s)%s;' % 
     292        self.execute_ddl('CREATE TABLE %s (%s%s)%s;' % 
    279293                         (table.qname, ", ".join(fields), pk, encoding)) 
    280              
    281             if incr_fields: 
    282                 # Wow, what a hack. We have to INSERT a dummy row to set the 
    283                 # autoincrement initial value(s), and we can't delete it until 
    284                 # after the CREATE INDEX statements (or the counter will revert). 
    285                 fields = ", ".join([col.qname for col in incr_fields]) 
    286                 values = ", ".join([str(col.initial - 1) for col in incr_fields]) 
    287                 self.execute("INSERT INTO %s (%s) VALUES (%s);" 
     294         
     295        if incr_fields: 
     296            # Wow, what a hack. We have to INSERT a dummy row to set the 
     297            # autoincrement initial value(s), and we can't delete it until 
     298            # after the CREATE INDEX statements (or the counter will revert). 
     299            fields = ", ".join([col.qname for col in incr_fields]) 
     300            values = ", ".join([str(col.initial - 1) for col in incr_fields]) 
     301            self.execute_ddl("INSERT INTO %s (%s) VALUES (%s);" 
    288302                             % (table.qname, fields, values)) 
    289              
    290             for k, index in table.indices.iteritems(): 
    291                 dbtype = table[k].dbtype 
    292                 if dbtype.endswith('BLOB') or dbtype == 'TEXT': 
    293                     # MySQL won't allow indexes on a BLOB field without a 
    294                     # specific index prefix length. We choose 255 just for fun. 
    295                     self.execute('CREATE INDEX %s ON %s (%s(255));' % 
     303         
     304        for k, index in table.indices.iteritems(): 
     305            dbtype = table[k].dbtype 
     306            if dbtype.endswith('BLOB') or dbtype == 'TEXT': 
     307                # MySQL won't allow indexes on a BLOB field without a 
     308                # specific index prefix length. We choose 255 just for fun. 
     309                self.execute_ddl('CREATE INDEX %s ON %s (%s(255));' % 
    296310                                 (index.qname, table.qname, q(index.colname))) 
    297                 else: 
    298                     self.execute('CREATE INDEX %s ON %s (%s);' % 
     311            else: 
     312                self.execute_ddl('CREATE INDEX %s ON %s (%s);' % 
    299313                                 (index.qname, table.qname, q(index.colname))) 
    300              
    301             if incr_fields: 
    302                 self.execute("DELETE FROM %s" % table.qname) 
    303         finally: 
    304             self.unlock() 
     314         
     315        if incr_fields: 
     316            self.execute_ddl("DELETE FROM %s" % table.qname) 
    305317         
    306318        dict.__setitem__(self, key, table) 
     
    432444        return '`' + name.replace('`', '``') + '`' 
    433445     
    434     def _get_conn(self): 
    435         try: 
    436             conn = _mysql.connect(**self.connargs) 
    437         except _mysql.OperationalError, x: 
    438             if x.args[0] == 1040:   # Too many connections 
    439                 raise errors.OutOfConnectionsError 
    440             raise 
    441         return conn 
    442      
    443     def _template_conn(self): 
    444         tmplconn = self.connargs.copy() 
    445         tmplconn['db'] = 'mysql' 
    446         return _mysql.connect(**tmplconn) 
    447      
    448446    def execute(self, query, conn=None): 
    449447        """execute(query, conn=None) -> result set.""" 
    450448        if conn is None: 
    451             conn = self.connection() 
     449            conn = self.connections.get() 
    452450        if isinstance(query, unicode): 
    453451            query = query.encode(self.adaptertosql.encoding) 
     
    469467        """ 
    470468        if conn is None: 
    471             conn = self.connection() 
     469            conn = self.connections.get() 
    472470        self.execute(query, conn) 
    473471         
     
    489487     
    490488    def create_database(self): 
    491         self.lock("Creating database. Transactions not allowed.") 
    492         try: 
    493             # _mysql has create_db and drop_db commands, but they're deprecated. 
    494             encoding = self.encoding 
    495             if encoding: 
    496                 encoding = " CHARACTER SET %s" % encoding 
    497             sql = 'CREATE DATABASE %s%s;' % (self.qname, encoding) 
    498             conn = self._template_conn() 
    499             self.execute(sql, conn) 
    500             conn.close() 
    501             self.clear() 
    502         finally: 
    503             self.unlock() 
     489        # _mysql has create_db and drop_db commands, but they're deprecated. 
     490        encoding = self.encoding 
     491        if encoding: 
     492            encoding = " CHARACTER SET %s" % encoding 
     493        sql = 'CREATE DATABASE %s%s;' % (self.qname, encoding) 
     494        conn = self.connections._get_conn(master=True) 
     495        self.execute_ddl(sql, conn) 
     496        conn.close() 
     497        self.clear() 
    504498     
    505499    def drop_database(self): 
    506         self.lock("Dropping database. Transactions not allowed.") 
    507         try: 
    508             sql = 'DROP DATABASE %s;' % self.qname 
    509             conn = self._template_conn() 
    510             self.execute(sql, conn) 
    511             conn.close() 
    512             self.clear() 
    513         finally: 
    514             self.unlock() 
    515      
    516     def is_lock_error(self, exc): 
     500        conn = self.connections._get_conn(master=True) 
     501        self.execute_ddl('DROP DATABASE %s;' % self.qname, conn) 
     502        conn.close() 
     503        self.clear() 
     504     
     505    def is_timeout_error(self, exc): 
    517506        # OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction') 
    518507        if not isinstance(exc, _mysql.OperationalError): 
  • trunk/geniusql/providers/psycopg.py

    r9 r10  
    88 
    99 
    10 import datetime 
    11 try: 
    12     import cPickle as pickle 
    13 except ImportError: 
    14     import pickle 
    15 import re 
    16 seq_name = re.compile(r"nextval\('([^:]+)'.*\)") 
    17 escape_oct = re.compile(r"[\000-\037\177-\377]") 
    18 replace_oct = lambda m: r"\\%03o" % ord(m.group(0)) 
    19 unescape_oct = re.compile(r"\\(\d\d\d)") 
    20 replace_unoct = lambda m: chr(int(m.group(1), 8)) 
    21  
    2210import geniusql 
    23 from geniusql import errors, typerefs 
     11from geniusql import errors 
     12from geniusql.providers import postgres 
    2413 
    2514 
    26 class AdapterToPsycoPg(geniusql.AdapterToSQL): 
    27      
    28     like_escapes = [("%", r"\\%"), ("_", r"\\_")] 
    29      
    30     # Do these need to know if "SHOW DateStyle;" != "ISO, MDY" ? 
    31     def coerce_datetime_datetime_to_any(self, value): 
    32         return ("'%04d-%02d-%02d %02d:%02d:%02d.%06d'" % 
    33                 (value.year, value.month, value.day, 
    34                  value.hour, value.minute, value.second, 
    35                  value.microsecond)) 
    36      
    37     def coerce_datetime_date_to_any(self, value): 
    38         return "'%04d-%02d-%02d'" % (value.year, value.month, value.day) 
    39      
    40     def coerce_datetime_time_to_any(self, value): 
    41         return ("'%02d:%02d:%02d.%06d'" % 
    42                 (value.hour, value.minute, value.second, value.microsecond)) 
    43      
    44     def coerce_any_to_bytea(self, value): 
    45         # See http://www.postgresql.org/docs/8.1/interactive/datatype-binary.html 
    46         value = pickle.dumps(value, 2) 
    47         def repl(char): 
    48             o = ord(char) 
    49             if o <= 31 or o == 39 or o == 92 or o >= 127: 
    50                 return r"\\%03d" % int(oct(o)) 
    51             return char 
    52         return "'%s'::bytea" % "".join(map(repl, value)) 
    53      
    54     def do_pickle(self, value): 
    55         value = pickle.dumps(value, 2) 
    56         value = self.coerce_str_to_any(value, skip_encoding=False) 
    57         return value 
    58     coerce_dict_to_any = do_pickle 
    59     coerce_list_to_any = do_pickle 
    60     coerce_tuple_to_any = do_pickle 
    61      
    62     def coerce_str_to_any(self, value, skip_encoding=False): 
    63         if not skip_encoding and not isinstance(value, str): 
    64             value = value.encode(self.encoding) 
    65         for pat, repl in self.escapes: 
    66             value = value.replace(pat, repl) 
    67          
    68         # Escape octal sequences 
    69         value = escape_oct.sub(replace_oct, value) 
    70         return "'" + value + "'" 
    71      
    72     def coerce_float_to_REAL(self, value): 
    73         # Use quotes to restrict the value to single precision, so that 
    74         # comparisons work between existing values and supplied constants. 
    75         # See http://archives.postgresql.org/pgsql-bugs/2004-02/msg00062.php 
    76         return "'%r'" % value 
    77     coerce_float_to_FLOAT4 = coerce_float_to_REAL 
    78  
    79  
    80 class AdapterFromPsycoPg(geniusql.AdapterFromDB): 
    81      
    82     def coerce_any_to_str(self, value): 
    83         # Unescape octal sequences 
    84         value = unescape_oct.sub(replace_unoct, value) 
    85         if isinstance(value, unicode): 
    86             return value.encode(self.encoding) 
    87         else: 
    88             return str(value) 
     15class AdapterFromPsycoPg(postgres.AdapterFromPg): 
    8916     
    9017    def coerce_any_to_datetime_datetime(self, value): 
     
    9825 
    9926 
    100 class PsycoPgDecompiler(geniusql.SQLDecompiler): 
     27class PsycoPgConnectionManager(geniusql.ConnectionManager): 
    10128     
    102     def dejavu_icontainedby(self, op1, op2): 
    103         if isinstance(op1, geniusql.ConstWrapper): 
    104             # Looking for text in a field. Use ILike (reverse terms). 
    105             return op2 + " ILIKE '%" + self.adapter.escape_like(op1) + "%'" 
     29    default_isolation = "READ COMMITTED" 
     30     
     31    def _get_conn(self, master=False): 
     32        if master: 
     33            # Must shut down all connections to avoid 
     34            # "being accessed by other users" error. 
     35            self.shutdown() 
     36             
     37            connstr = "" 
     38            for atom in self.Connect.split(" "): 
     39                k, v = atom.split("=", 1) 
     40                if k == 'dbname': 
     41                    v = 'template1' 
     42                connstr += "%s=%s " % (k, v) 
    10643        else: 
    107             # Looking for field in (a, b, c). 
    108             # Force all args to lowercase for case-insensitive comparison. 
    109             atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue] 
    110             return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms)) 
     44            connstr = self.Connect 
     45         
     46        try: 
     47            c = _psycopg.connect(connstr) 
     48            # Allow statements like CREATE DATABASE to run outside a transaction. 
     49            c.set_isolation_level(0) 
     50            return c 
     51        except _psycopg.DatabaseError, x: 
     52            if x.args[0].startswith('could not connect'): 
     53                raise errors.OutOfConnectionsError() 
     54            raise 
    11155     
    112     def dejavu_istartswith(self, x, y): 
    113         return x + " ILIKE '" + self.adapter.escape_like(y) + "%'" 
    114      
    115     def dejavu_iendswith(self, x, y): 
    116         return x + " ILIKE '%" + self.adapter.escape_like(y) + "'" 
    117      
    118     def dejavu_ieq(self, x, y): 
    119         # ILIKE with no wildcards should behave like ieq. 
    120         return x + " ILIKE '" + self.adapter.escape_like(y) + "'" 
    121      
    122     def dejavu_year(self, x): 
    123         return "date_part('year', " + x + ")" 
    124      
    125     def dejavu_month(self, x): 
    126         return "date_part('month', " + x + ")" 
    127      
    128     def dejavu_day(self, x): 
    129         return "date_part('day', " + x + ")" 
     56    def _del_conn(self, conn): 
     57        conn.close() 
    13058 
    13159 
    132 class PsycoPgIndexSet(geniusql.IndexSet): 
     60class PsycoPgDatabase(postgres.PgDatabase): 
    13361     
    134     def __delitem__(self, key): 
    135         """Drop the specified index.""" 
    136         t = self.table 
    137         t.db.lock("Dropping index. Transactions not allowed.") 
    138         try: 
    139             # PG doesn't use DROP INDEX .. ON .. 
    140             t.db.execute('DROP INDEX %s;' % self[key].qname) 
    141         finally: 
    142             t.db.unlock() 
    143  
    144  
    145 class PsycoPgTable(geniusql.Table): 
    146      
    147     indexsetclass = PsycoPgIndexSet 
    148      
    149     def _grab_new_ids(self, idkeys, conn): 
    150         newids = {} 
    151         for idkey in idkeys: 
    152             col = self[idkey] 
    153             seq = col.sequence_name 
    154             data, _ = self.db.fetch("SELECT last_value FROM %s;" % seq, conn) 
    155             newids[idkey] = data[0][0] 
    156         return newids 
    157  
    158  
    159 class PsycoPgDatabase(geniusql.Database): 
    160      
    161     sql_name_max_length = 63 
    162     quote_all = True 
    163     poolsize = 10 
    164     encoding = 'SQL_ASCII' 
    165      
    166     decompiler = PsycoPgDecompiler 
    167     adaptertosql = AdapterToPsycoPg() 
    16862    adapterfromdb = AdapterFromPsycoPg() 
    169     tableclass = PsycoPgTable 
     63    connectionmanager = PsycoPgConnectionManager 
    17064     
    17165    def _get_dbinfo(self, conn=None): 
     
    18074        return dbinfo 
    18175     
    182     def _get_tables(self, conn=None): 
    183         data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE schemaname" 
    184                              " not in ('information_schema', 'pg_catalog')", 
    185                              conn=conn) 
    186         return [self.tableclass(row[0], self.quote(row[0]), 
    187                                 self, created=True) 
    188                 for row in data] 
    189      
    190     def _get_table(self, tablename, conn=None): 
    191         data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE " 
    192                              "tablename = '%s'" % tablename, 
    193                              conn=conn) 
    194         for name, in data: 
    195             if name == tablename: 
    196                 return self.tableclass(name, self.quote(name), 
    197                                        self, created=True) 
    198         raise errors.MappingError(tablename) 
    199      
    200     def _get_columns(self, tablename, conn=None): 
    201         # Get the OID of the table 
    202         data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'" 
    203                              % tablename, conn=conn) 
    204         table_OID = data[0][0] 
    205          
    206         # Get index data so we can set col.key if pg_index.indisprimary 
    207         data, _ = self.fetch("SELECT indkey FROM pg_index WHERE indrelid " 
    208                              "= %s AND indisprimary" % table_OID, conn=conn) 
    209         if data: 
    210             # indkey is an "array" (we get a space-separated string of ints). 
    211             # These will equal pg_attribute.attnum, below. 
    212             indices = map(int, data[0][0].split(" ")) 
    213         else: 
    214             indices = [] 
    215          
    216         # Get column data 
    217         sql = ("SELECT attname, atttypid, attnum, attlen, atttypmod " 
    218                "FROM pg_attribute WHERE attisdropped = False AND " 
    219                "attrelid = %s" % table_OID) 
    220         data, _ = self.fetch(sql, conn=conn) 
    221         cols = [] 
    222         for row in data: 
    223             name = row[0] 
    224             if name in ('tableoid', 'cmax', 'xmax', 'cmin', 'xmin', 
    225                         'oid', 'ctid'): 
    226                 # This is a column which PostgreSQL defines automatically 
    227                 continue 
    228              
    229             # Data type 
    230             dbtype, _ = self.fetch("SELECT typname, typlen FROM pg_type " 
    231                                    "WHERE oid = %s" % row[1]) 
    232             if dbtype: 
    233                 dbtype = dbtype[0][0].upper() 
    234             else: 
    235                 dbtype = None 
    236             c = geniusql.Column(self.python_type(dbtype), dbtype, 
    237                                 None, {}, row[2] in indices, 
    238                                 row[0], self.quote(row[0])) 
    239              
    240             if dbtype in ('FLOAT4', 'FLOAT8'): 
    241                 c.hints['precision'] = row[3] 
    242             elif dbtype in ('MONEY', 'NUMERIC'): 
    243                 c.hints['precision'] = (row[4] >> 16) & 65535 
    244                 c.hints['scale'] = (row[4] & 65535) - 4 
    245              
    246             # Default value 
    247             default, _ = self.fetch("SELECT adsrc FROM pg_attrdef " 
    248                                     "WHERE adnum = %s AND adrelid = %s" 
    249                                     % (row[2], table_OID)) 
    250             if default: 
    251                 default = default[0][0] 
    252                 if default.startswith("nextval("): 
    253                     # Grab seqname from "nextval(seqname::[text|regclass])" 
    254                     c.autoincrement = True 
    255                     c.sequence_name = seq_name.search(default).group(1) 
    256                     c.initial = self.fetch("SELECT min_value FROM %s" % 
    257                                            c.sequence_name)[0][0] 
    258                     c.default = None 
    259                 else: 
    260                     # adsrc is always a string, so we must cast 
    261                     # it using our guessed type. 
    262                     c.default = self.python_type(dbtype)(default) 
    263             else: 
    264                 c.default = None 
    265              
    266             if dbtype.startswith('BPCHAR') or dbtype.startswith('VARCHAR'): 
    267                 # See http://archives.postgresql.org/pgsql-interfaces/2004-07/msg00021.php 
    268                 c.hints['bytes'] = row[4] - 4 
    269             else: 
    270                 bytes = row[3] 
    271                 if bytes > 0: 
    272                     c.hints['bytes'] = bytes 
    273                 elif dbtype == 'TEXT': 
    274                     c.hints['bytes'] = 0 
    275              
    276             cols.append(c) 
    277         return cols 
    278      
    279     def _get_indices(self, tablename, conn=None): 
    280         # Get the OID of the parent table. 
    281         data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'" 
    282                              % tablename, conn=conn) 
    283         if not data: 
    284             return [] 
    285          
    286         table_OID = data[0][0] 
    287         indices = [] 
    288         data, _ = self.fetch("SELECT pg_class.relname, indkey, indisprimary, " 
    289                              "indisunique FROM pg_index LEFT JOIN pg_class " 
    290                              "ON pg_index.indexrelid = pg_class.oid WHERE " 
    291                              "pg_index.indrelid = %s" % table_OID, conn=conn) 
    292         for row in data: 
    293             # indkey is an "array" (we get a space-separated string of ints). 
    294             cols = map(int, row[1].split(" ")) 
    295             for col in cols: 
    296                 d, _ = self.fetch("SELECT attname FROM pg_attribute " 
    297                                   "WHERE attrelid = %s AND attnum = %s" 
    298                                   % (table_OID, col), conn=conn) 
    299                 i = geniusql.Index(row[0], self.quote(row[0]), tablename, 
    300                                    d[0][0], bool(row[3])) 
    301                 indices.append(i) 
    302          
    303         return indices 
    304      
    305     def python_type(self, dbtype): 
    306         """Return a Python type which can store values of the given dbtype.""" 
    307         dbtype = dbtype.upper() 
    308         if dbtype in ('INT2', 'INT4', 'INTEGER', 'SMALLINT'): 
    309             return int 
    310         elif dbtype in ('BOOL', 'BOOLEAN'): 
    311             return bool 
    312         elif dbtype in ('INT8', 'BIGINT'): 
    313             return long 
    314         elif dbtype in ('FLOAT4', 'FLOAT8', 'MONEY', 'DOUBLE PRECISION', 'REAL'): 
    315             return float 
    316         elif dbtype.startswith('NUMERIC'): 
    317             if typerefs.decimal: 
    318                 return typerefs.decimal.Decimal 
    319             elif typerefs.fixedpoint: 
    320                 return typerefs.fixedpoint.FixedPoint 
    321         elif dbtype == 'DATE': 
    322             return datetime.date 
    323         elif dbtype in ('TIMESTAMP', 'TIMESTAMPTZ'): 
    324             return datetime.datetime 
    325         elif dbtype in ('TIME', 'TIMETZ'): 
    326             return datetime.time 
    327         elif dbtype in ('BYTEA'): 
    328             return str 
    329         for t in ('CHAR', 'VARCHAR', 'BPCHAR', 'TEXT'): 
    330             if dbtype.startswith(t): 
    331                 return str 
    332          
    333         raise TypeError("Database type %r could not be converted " 
    334                         "to a Python type." % dbtype) 
    335      
    336     def columnclause(self, column): 
    337         """Return a clause for the given column for CREATE or ALTER TABLE. 
    338          
    339         This will be of the form "name type [DEFAULT [x | nextval('seq')]]". 
    340          
    341         PostgreSQL creates the sequence in a separate statement. 
    342         """ 
    343         if column.autoincrement: 
    344             default = "nextval('%s')" % column.sequence_name 
    345         else: 
    346             default = column.default or "" 
    347             if not isinstance(default, str): 
    348                 default = self.adaptertosql.coerce(default, column.dbtype) 
    349          
    350         if default: 
    351             default = " DEFAULT %s" % default 
    352          
    353         return '%s %s%s' % (column.qname, column.dbtype, default) 
    354      
    355     def create_sequence(self, table, column): 
    356         """Create a SEQUENCE for the given column and set its sequence_name.""" 
    357         sname = column.sequence_name 
    358         if sname is None: 
    359             sname = self.quote("%s_%s_seq" % (table.name, column.name)) 
    360             column.sequence_name = sname 
    361         self.execute("CREATE SEQUENCE %s START %s;" % (sname, column.initial)) 
    362      
    363     def drop_sequence(self, column): 
    364         """Drop a SEQUENCE for the given column and remove its sequence_name.""" 
    365         if column.sequence_name is not None: 
    366             self.execute("DROP SEQUENCE %s;" % column.sequence_name) 
    367             column.sequence_name = None 
    368      
    369     def quote(self, name): 
    370         if self.quote_all: 
    371             name = '"' + name.replace('"', '""') + '"' 
    372         return name 
    373      
    374     def sql_name(self, name): 
    375         name = geniusql.Database.sql_name(self, name) 
    376         if not self.quote_all: 
    377             name = name.lower() 
    378         return name 
    379      
    380     default_isolation = "READ COMMITTED" 
    381      
    382     def _get_conn(self): 
    383         try: 
    384             c = _psycopg.connect(self.Connect) 
    385             c.set_isolation_level(0) 
    386             return c 
    387         except _psycopg.DatabaseError, x: 
    388             if x.args[0].startswith('could not connect'): 
    389                 raise errors.OutOfConnectionsError() 
    390             raise 
    391      
    392     def _del_conn(self, conn): 
    393         conn.close() 
    394      
    395     def _template_conn(self): 
    396         atoms = self.Connect.split(" ") 
    397         tmplconn = "" 
    398         for atom in atoms: 
    399             k, v = atom.split("=", 1) 
    400             if k == 'dbname': v = 'template1' 
    401             tmplconn += "%s=%s " % (k, v) 
    402         c = _psycopg.connect(tmplconn) 
    403         # Allow statements like CREATE DATABASE to run outside a transaction. 
    404         c.set_isolation_level(0) 
    405         return c 
     76    def version(self): 
     77        c = self.connections._get_conn(master=True) 
     78        data, _ = self.fetch("SELECT version();", c) 
     79        v, = data[0] 
     80        c.close() 
     81        return "%s\npsycopg version: %s" % (v, _psycopg.__version__) 
    40682     
    40783    def execute(self, query, conn=None): 
    40884        """execute(query, conn=None) -> result set.""" 
    40985        if conn is None: 
    410             conn = self.connection() 
     86            conn = self.connections.get() 
    41187        if isinstance(query, unicode): 
    41288            query = query.encode(self.adaptertosql.encoding) 
     
    42197        """fetch(query, conn=None) -> rowdata, columns.""" 
    42298        if conn is None: 
    423             conn = self.connection() 
     99            conn = self.connections.get() 
    424100        if isinstance(query, unicode): 
    425101            query = query.encode(self.adaptertosql.encoding) 
     
    435111         
    436112        return data, coldefs 
    437      
    438     def create_database(self): 
    439         self.lock("Creating database. Transactions not allowed.") 
    440         try: 
    441             # Must shut down all connections to avoid 
    442             # "being accessed by other users" error. 
    443             self.connection.shutdown() 
    444              
    445             c = self._template_conn() 
    446             encoding = self.encoding 
    447             if encoding: 
    448                 encoding = " WITH ENCODING '%s'" % encoding 
    449             self.execute("CREATE DATABASE %s%s" % (self.qname, encoding), c) 
    450             c.close() 
    451             del c 
    452             self.clear() 
    453         finally: 
    454             self.unlock() 
    455      
    456     def drop_database(self): 
    457         self.lock("Dropping database. Transactions not allowed.") 
    458         try: 
    459             # Must shut down all connections to avoid 
    460             # "being accessed by other users" error. 
    461             self.connection.shutdown() 
    462              
    463             c = self._template_conn() 
    464             self.execute("DROP DATABASE %s;" % self.qname, c) 
    465             c.close() 
    466             del c 
    467             self.clear() 
    468         finally: 
    469             self.unlock() 
    470      
    471     def version(self): 
    472         c = self._template_conn() 
    473         data, _ = self.fetch("SELECT version();", c) 
    474         v, = data[0] 
    475         c.close() 
    476         return "%s\npsycopg version: %s" % (v, _psycopg.__version__) 
    477113 
  • trunk/geniusql/providers/pypgsql.py

    r9 r10  
    22from pyPgSQL import libpq 
    33 
    4 import datetime 
    5 try: 
    6     import cPickle as pickle 
    7 except ImportError: 
    8     import pickle 
    9  
    10 import re 
    11 seq_name = re.compile(r"nextval\('([^:]+)'.*\)") 
    12 escape_oct = re.compile(r"[\000-\037\177-\377]") 
    13 replace_oct = lambda m: r"\\%03o" % ord(m.group(0)) 
    14 unescape_oct = re.compile(r"\\(\d\d\d)") 
    15 replace_unoct = lambda m: chr(int(m.group(1), 8)) 
     4import geniusql 
     5from geniusql import errors, typerefs 
     6from geniusql.providers import postgres 
    167 
    178 
    18 import geniusql 
    19 from geniusql import errors, typerefs 
     9class PyPgConnectionManager(geniusql.ConnectionManager): 
     10     
     11    default_isolation = "READ COMMITTED" 
     12     
     13    def _get_conn(self, master=False): 
     14        if master: 
     15            # Must shut down all connections to avoid 
     16            # "being accessed by other users" error. 
     17            self.shutdown() 
     18             
     19            connstr = "" 
     20            for atom in self.Connect.split(" "): 
     21                k, v = atom.split("=", 1) 
     22                if k == 'dbname': 
     23                    v = 'template1' 
     24                connstr += "%s=%s " % (k, v) 
     25        else: 
     26            connstr = self.Connect 
     27         
     28        try: 
     29            return libpq.PQconnectdb(connstr) 
     30        except libpq.DatabaseError, x: 
     31            if x.args[0].startswith('could not connect'): 
     32                raise errors.OutOfConnectionsError() 
     33            raise 
     34     
     35    def _del_conn(self, conn): 
     36        conn.finish() 
    2037 
    2138 
    22 class AdapterToPgSQL(geniusql.AdapterToSQL): 
     39class PyPgDatabase(postgres.PgDatabase): 
    2340     
    24     like_escapes = [("%", r"\\%"), ("_", r"\\_")] 
    25      
    26     # Do these need to know if "SHOW DateStyle;" != "ISO, MDY" ? 
    27     def coerce_datetime_datetime_to_any(self, value): 
    28         return ("'%04d-%02d-%02d %02d:%02d:%02d.%06d'" % 
    29                 (value.year, value.month, value.day, 
    30                  value.hour, value.minute, value.second, 
    31                  value.microsecond)) 
    32      
    33     def coerce_datetime_date_to_any(self, value): 
    34         return "'%04d-%02d-%02d'" % (value.year, value.month, value.day) 
    35      
    36     def coerce_datetime_time_to_any(self, value): 
    37         return ("'%02d:%02d:%02d.%06d'" % 
    38                 (value.hour, value.minute, value.second, value.microsecond)) 
    39      
    40     def coerce_any_to_bytea(self, value): 
    41         # See http://www.postgresql.org/docs/8.1/interactive/datatype-binary.html 
    42         value = pickle.dumps(value, 2) 
    43         def repl(char): 
    44             o = ord(char) 
    45             if o <= 31 or o == 39 or o == 92 or o >= 127: 
    46                 return r"\\%03d" % int(oct(o)) 
    47             return char 
    48         return "'%s'::bytea" % "".join(map(repl, value)) 
    49      
    50     def do_pickle(self, value): 
    51         value = pickle.dumps(value, 2) 
    52         value = self.coerce_str_to_any(value, skip_encoding=False) 
    53         return value 
    54     coerce_dict_to_any = do_pickle 
    55     coerce_list_to_any = do_pickle 
    56     coerce_tuple_to_any = do_pickle 
    57      
    58     def coerce_str_to_any(self, value, skip_encoding=False): 
    59         if not skip_encoding and not isinstance(value, str): 
    60             value = value.encode(self.encoding) 
    61         for pat, repl in self.escapes: 
    62             value = value.replace(pat, repl) 
    63          
    64         # Escape octal sequences 
    65         value = escape_oct.sub(replace_oct, value) 
    66         return "'" + value + "'" 
    67      
    68     def coerce_float_to_REAL(self, value): 
    69         # Use quotes to restrict the value to single precision, so that 
    70         # comparisons work between existing values and supplied constants. 
    71         # See http://archives.postgresql.org/pgsql-bugs/2004-02/msg00062.php 
    72         return "'%r'" % value 
    73     coerce_float_to_FLOAT4 = coerce_float_to_REAL 
    74  
    75  
    76 class AdapterFromPgSQL(geniusql.AdapterFromDB): 
    77      
    78     def coerce_any_to_str(self, value): 
    79         # Unescape octal sequences 
    80         value = unescape_oct.sub(replace_unoct, value) 
    81         if isinstance(value, unicode): 
    82             return value.encode(self.encoding) 
    83         else: 
    84             return str(value) 
    85  
    86  
    87 class PgSQLDecompiler(geniusql.SQLDecompiler): 
    88      
    89     def dejavu_icontainedby(self, op1, op2): 
    90         if isinstance(op1, geniusql.ConstWrapper): 
    91             # Looking for text in a field. Use ILike (reverse terms). 
    92             return op2 + " ILIKE '%" + self.adapter.escape_like(op1) + "%'" 
    93         else: 
    94             # Looking for field in (a, b, c). 
    95             # Force all args to lowercase for case-insensitive comparison. 
    96             atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue] 
    97             return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms)) 
    98      
    99     def dejavu_istartswith(self, x, y): 
    100         return x + " ILIKE '" + self.adapter.escape_like(y) + "%'" 
    101      
    102     def dejavu_iendswith(self, x, y): 
    103         return x + " ILIKE '%" + self.adapter.escape_like(y) + "'" 
    104      
    105     def dejavu_ieq(self, x, y): 
    106         # ILIKE with no wildcards should behave like ieq. 
    107         return x + " ILIKE '" + self.adapter.escape_like(y) + "'" 
    108      
    109     def dejavu_year(self, x): 
    110         return "date_part('year', " + x + ")" 
    111      
    112     def dejavu_month(self, x): 
    113         return "date_part('month', " + x + ")" 
    114      
    115     def dejavu_day(self, x): 
    116         return "date_part('day', " + x + ")" 
    117  
    118  
    119 class PgIndexSet(geniusql.IndexSet): 
    120      
    121     def __delitem__(self, key): 
    122         """Drop the specified index.""" 
    123         t = self.table 
    124         t.db.lock("Dropping index. Transactions not allowed.") 
    125         try: 
    126             # PG doesn't use DROP INDEX .. ON .. 
    127             t.db.execute('DROP INDEX %s;' % self[key].qname) 
    128         finally: 
    129             t.db.unlock() 
    130  
    131  
    132 class PgTable(geniusql.Table): 
    133      
    134     indexsetclass = PgIndexSet 
    135      
    136     def _grab_new_ids(self, idkeys, conn): 
    137         newids = {} 
    138         for idkey in idkeys: 
    139             col = self[idkey] 
    140             seq = col.sequence_name 
    141             data, _ = self.db.fetch("SELECT last_value FROM %s;" % seq, conn) 
    142             newids[idkey] = data[0][0] 
    143         return newids 
    144  
    145  
    146 class PgDatabase(geniusql.Database): 
    147      
    148     sql_name_max_length = 63 
    149     quote_all = True 
    150     poolsize = 10 
    151     encoding = 'SQL_ASCII' 
    152      
    153     decompiler = PgSQLDecompiler 
    154     adaptertosql = AdapterToPgSQL() 
    155     adapterfromdb = AdapterFromPgSQL() 
    156     tableclass = PgTable 
     41    connectionmanager = PyPgConnectionManager 
    15742     
    15843    def _get_dbinfo(self, conn=None): 
     
    16651                raise 
    16752        return dbinfo 
    168      
    169     def _get_tables(self, conn=None): 
    170         data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE schemaname" 
    171                              " not in ('information_schema', 'pg_catalog')", 
    172                              conn=conn) 
    173         return [self.tableclass(row[0], self.quote(row[0]), 
    174                                 self, created=True) 
    175                 for row in data] 
    176      
    177     def _get_table(self, tablename, conn=None): 
    178         data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE " 
    179                              "tablename = '%s'" % tablename, 
    180                              conn=conn) 
    181         for name, in data: 
    182             if name == tablename: 
    183                 return self.tableclass(name, self.quote(name), 
    184                                        self, created=True) 
    185         raise errors.MappingError(tablename) 
    186      
    187     def _get_columns(self, tablename, conn=None): 
    188         # Get the OID of the table 
    189         data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'" 
    190                              % tablename, conn=conn) 
    191         table_OID = data[0][0] 
    192          
    193         # Get index data so we can set col.key if pg_index.indisprimary 
    194         data, _ = self.fetch("SELECT indkey FROM pg_index WHERE indrelid " 
    195                              "= %s AND indisprimary" % table_OID, conn=conn) 
    196         if data: 
    197             # indkey is an "array" (we get a space-separated string of ints). 
    198             # These will equal pg_attribute.attnum, below. 
    199             indices = map(int, data[0][0].split(" ")) 
    200         else: 
    201             indices = [] 
    202          
    203         # Get column data 
    204         sql = ("SELECT attname, atttypid, attnum, attlen, atttypmod " 
    205                "FROM pg_attribute WHERE attisdropped = False AND " 
    206                "attrelid = %s" % table_OID) 
    207         data, _ = self.fetch(sql, conn=conn) 
    208         cols = [] 
    209         for row in data: 
    210             name = row[0] 
    211             if name in ('tableoid', 'cmax', 'xmax', 'cmin', 'xmin', 
    212                         'oid', 'ctid'): 
    213                 # This is a column which PostgreSQL defines automatically 
    214                 continue 
    215              
    216             # Data type 
    217             dbtype, _ = self.fetch("SELECT typname, typlen FROM pg_type " 
    218                                    "WHERE oid = %s" % row[1]) 
    219             if dbtype: 
    220                 dbtype = dbtype[0][0].upper() 
    221             else: 
    222                 dbtype = None 
    223             c = geniusql.Column(self.python_type(dbtype), dbtype, 
    224                                 None, {}, row[2] in indices, 
    225                                 row[0], self.quote(row[0])) 
    226              
    227             if dbtype in ('FLOAT4', 'FLOAT8'): 
    228                 c.hints['precision'] = row[3] 
    229             elif dbtype in ('MONEY', 'NUMERIC'): 
    230                 c.hints['precision'] = (row[4] >> 16) & 65535 
    231                 c.hints['scale'] = (row[4] & 65535) - 4 
    232              
    233             # Default value 
    234             default, _ = self.fetch("SELECT adsrc FROM pg_attrdef " 
    235                                     "WHERE adnum = %s AND adrelid = %s" 
    236                                     % (row[2], table_OID)) 
    237             if default: 
    238                 default = default[0][0] 
    239                 if default.startswith("nextval("): 
    240                     # Grab seqname from "nextval(seqname::[text|regclass])" 
    241                     c.autoincrement = True 
    242                     c.sequence_name = seq_name.search(default).group(1) 
    243                     c.initial = self.fetch("SELECT min_value FROM %s" % 
    244                                            c.sequence_name)[0][0] 
    245                     c.default = None 
    246                 else: 
    247                     # adsrc is always a string, so we must cast 
    248                     # it using our guessed type. 
    249                     c.default = self.python_type(dbtype)(default) 
    250             else: 
    251                 c.default = None 
    252              
    253             if dbtype.startswith('BPCHAR') or dbtype.startswith('VARCHAR'): 
    254                 # See http://archives.postgresql.org/pgsql-interfaces/2004-07/msg00021.php 
    255                 c.hints['bytes'] = row[4] - 4 
    256             else: 
    257                 bytes = row[3] 
    258                 if bytes > 0: 
    259                     c.hints['bytes'] = bytes 
    260                 elif dbtype == 'TEXT': 
    261                     c.hints['bytes'] = 0 
    262              
    263             cols.append(c) 
    264         return cols 
    265      
    266     def _get_indices(self, tablename, conn=None): 
    267         # Get the OID of the parent table. 
    268         data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'" 
    269                              % tablename, conn=conn) 
    270         if not data: 
    271             return [] 
    272          
    273         table_OID = data[0][0] 
    274         indices = [] 
    275         data, _ = self.fetch("SELECT pg_class.relname, indkey, indisprimary, " 
    276                              "indisunique FROM pg_index LEFT JOIN pg_class " 
    277                              "ON pg_index.indexrelid = pg_class.oid WHERE " 
    278                              "pg_index.indrelid = %s" % table_OID, conn=conn) 
    279         for row in data: 
    280             # indkey is an "array" (we get a space-separated string of ints). 
    281             cols = map(int, row[1].split(" ")) 
    282             for col in cols: 
    283                 d, _ = self.fetch("SELECT attname FROM pg_attribute " 
    284                                   "WHERE attrelid = %s AND attnum = %s" 
    285                                   % (table_OID, col), conn=conn) 
    286                 i = geniusql.Index(row[0], self.quote(row[0]), tablename, 
    287                                    d[0][0], bool(row[3])) 
    288                 indices.append(i) 
    289          
    290         return indices 
    291      
    292     def python_type(self, dbtype): 
    293         """Return a Python type which can store values of the given dbtype.""" 
    294         dbtype = dbtype.upper() 
    295         if dbtype in ('INT2', 'INT4', 'INTEGER', 'SMALLINT'): 
    296             return int 
    297         elif dbtype in ('BOOL', 'BOOLEAN'): 
    298             return bool 
    299         elif dbtype in ('INT8', 'BIGINT'): 
    300             return long 
    301         elif dbtype in ('FLOAT4', 'FLOAT8', 'MONEY', 'DOUBLE PRECISION', 'REAL'): 
    302             return float 
    303         elif dbtype.startswith('NUMERIC'): 
    304             if typerefs.decimal: 
    305                 return typerefs.decimal.Decimal 
    306             elif typerefs.fixedpoint: 
    307                 return typerefs.fixedpoint.FixedPoint 
    308         elif dbtype == 'DATE': 
    309             return datetime.date 
    310         elif dbtype in ('TIMESTAMP', 'TIMESTAMPTZ'): 
    311             return datetime.datetime 
    312         elif dbtype in ('TIME', 'TIMETZ'): 
    313             return datetime.time 
    314         elif dbtype in ('BYTEA'): 
    315             return str 
    316         for t in ('CHAR', 'VARCHAR', 'BPCHAR', 'TEXT'): 
    317             if dbtype.startswith(t): 
    318                 return str 
    319          
    320         raise TypeError("Database type %r could not be converted " 
    321                         "to a Python type." % dbtype) 
    322      
    323     def columnclause(self, column): 
    324         """Return a clause for the given column for CREATE or ALTER TABLE. 
    325          
    326         This will be of the form "name type [DEFAULT [x | nextval('seq')]]". 
    327          
    328         PostgreSQL creates the sequence in a separate statement. 
    329         """ 
    330         if column.autoincrement: 
    331             default = "nextval('%s')" % column.sequence_name 
    332         else: 
    333             default = column.default or "" 
    334             if not isinstance(default, str): 
    335                 default = self.adaptertosql.coerce(default, column.dbtype) 
    336          
    337         if default: 
    338             default = " DEFAULT %s" % default 
    339          
    340         return '%s %s%s' % (column.qname, column.dbtype, default) 
    341      
    342     def create_sequence(self, table, column): 
    343         """Create a SEQUENCE for the given column and set its sequence_name.""" 
    344         sname = column.sequence_name 
    345         if sname is None: 
    346             sname = self.quote("%s_%s_seq" % (table.name, column.name)) 
    347             column.sequence_name = sname 
    348         self.execute("CREATE SEQUENCE %s START %s;" % (sname, column.initial)) 
    349      
    350     def drop_sequence(self, column): 
    351         """Drop a SEQUENCE for the given column and remove its sequence_name.""" 
    352         if column.sequence_name is not None: 
    353             self.execute("DROP SEQUENCE %s;" % column.sequence_name) 
    354             column.sequence_name = None 
    355      
    356     def quote(self, name): 
    357         if self.quote_all: 
    358             name = '"' + name.replace('"', '""') + '"' 
    359         return name 
    360      
    361     def sql_name(self, name): 
    362         name = geniusql.Database.sql_name(self, name) 
    363         if not self.quote_all: 
    364             name = name.lower() 
    365         return name 
    366      
    367     default_isolation = "READ COMMITTED" 
    368      
    369     def _get_conn(self): 
    370         try: 
    371             return libpq.PQconnectdb(self.Connect) 
    372         except libpq.DatabaseError, x: 
    373             if x.args[0].startswith('could not connect'): 
    374                 raise errors.OutOfConnectionsError() 
    375             raise 
    376      
    377     def _del_conn(self, conn): 
    378         conn.finish() 
    379      
    380     def _template_conn(self): 
    381         atoms = self.Connect.split(" ") 
    382         tmplconn = "" 
    383         for atom in atoms: 
    384             k, v = atom.split("=", 1) 
    385             if k == 'dbname': v = 'template1' 
    386             tmplconn += "%s=%s " % (k, v) 
    387         return libpq.PQconnectdb(tmplconn) 
    38853     
    38954    def fetch(self, query, conn=None): 
     
    40267        return data, columns 
    40368     
    404     def create_database(self): 
    405         self.lock("Creating database. Transactions not allowed.") 
    406         try: 
    407             c = self._template_conn() 
    408             encoding = self.encoding 
    409             if encoding: 
    410                 encoding = " WITH ENCODING '%s'" % encoding 
    411             self.execute("CREATE DATABASE %s%s" % (self.qname, encoding), c) 
    412             c.finish() 
    413             self.clear() 
    414         finally: 
    415             self.unlock() 
    416      
    417     def drop_database(self): 
    418         self.lock("Dropping database. Transactions not allowed.") 
    419         try: 
    420             # Must shut down all connections to avoid 
    421             # "being accessed by other users" error. 
    422             self.connection.shutdown() 
    423              
    424             c = self._template_conn() 
    425             self.execute("DROP DATABASE %s;" % self.qname, c) 
    426             c.finish() 
    427             self.clear() 
    428         finally: 
    429             self.unlock() 
    430      
    43169    def version(self): 
    432         c = self._template_conn(
     70        c = self.connections._get_conn(master=True
    43371        v = c.version 
    434         c.finish(
     72        self.connections._del_conn(c
    43573        return v 
    43674 
  • trunk/geniusql/providers/sqlite.py

    r9 r10  
    5656            return '1' 
    5757        return '0' 
     58 
     59 
     60class AdapterToSQLiteTypeless(AdapterToSQLite): 
     61     
     62    def cast_any_to_object(self, colref): 
     63        return colref 
    5864 
    5965 
     
    318324                db.create_sequence(self, column) 
    319325             
    320             db.lock("Adding property. Transactions not allowed.") 
    321             try: 
    322                 # Make a temporary copy. 
    323                 tempkey, temptable = self._temp_copy() 
    324                 # Add the new column to the copy. 
    325                 dict.__setitem__(temptable, key, column) 
    326                 # Bind the temp table to the DB. 
    327                 db[tempkey] = temptable 
    328                  
    329                 # Copy data from the old table to the temp table. 
    330                 selfields = [] 
    331                 for k, c in temptable.iteritems(): 
    332                     qname = c.qname 
    333                     if k == key: 
    334                         # This is a new column. Populate with NULL. 
    335                         qname = "NULL AS %s" % qname 
    336                     selfields.append(qname) 
    337                 db.execute("INSERT INTO %s SELECT %s FROM %s;" % 
    338                            (temptable.qname, ", ".join(selfields), self.qname)) 
    339                  
    340                 # Copy data from the temp table to a new table for self. 
    341                 self._copy_from_temp(temptable, self._parent_key(), tempkey) 
    342             finally: 
    343                 db.unlock() 
    344      
    345     def __delitem__(self, key): 
    346         if key in self.indices: 
    347             del self.indices[key] 
    348          
    349         if not self.created: 
    350             dict.__delitem__(self, key) 
    351             return 
    352          
    353         db = self.db 
    354         db.lock("Dropping property. Transactions not allowed.") 
    355         try: 
    356             column = self[key] 
    357              
    358326            # Make a temporary copy. 
    359327            tempkey, temptable = self._temp_copy() 
    360             # Drop the column from the copy. 
    361             dict.__delitem__(temptable, key
     328            # Add the new column to the copy. 
     329            dict.__setitem__(temptable, key, column
    362330            # Bind the temp table to the DB. 
    363331            db[tempkey] = temptable 
     
    367335            for k, c in temptable.iteritems(): 
    368336                qname = c.qname 
     337                if k == key: 
     338                    # This is a new column. Populate with NULL. 
     339                    qname = "NULL AS %s" % qname 
    369340                selfields.append(qname) 
    370             db.execute("INSERT INTO %s SELECT %s FROM %s;" % 
     341            db.execute_ddl("INSERT INTO %s SELECT %s FROM %s;" % 
     342                           (temptable.qname, ", ".join(selfields), self.qname)) 
     343             
     344            # Copy data from the temp table to a new table for self. 
     345            self._copy_from_temp(temptable, self._parent_key(), tempkey) 
     346     
     347    def __delitem__(self, key): 
     348        if key in self.indices: 
     349            del self.indices[key] 
     350         
     351        if not self.created: 
     352            dict.__delitem__(self, key) 
     353            return 
     354         
     355        db = self.db 
     356        column = self[key] 
     357         
     358        # Make a temporary copy. 
     359        tempkey, temptable = self._temp_copy() 
     360        # Drop the column from the copy. 
     361        dict.__delitem__(temptable, key) 
     362        # Bind the temp table to the DB. 
     363        db[tempkey] = temptable 
     364         
     365        # Copy data from the old table to the temp table. 
     366        selfields = [] 
     367        for k, c in temptable.iteritems(): 
     368            qname = c.qname 
     369            selfields.append(qname) 
     370        db.execute_ddl("INSERT INTO %s SELECT %s FROM %s;" % 
    371371                       (temptable.qname, ", ".join(selfields), self.qname)) 
    372              
    373             self._copy_from_temp(temptable, self._parent_key(), tempkey) 
    374              
    375             if column.autoincrement: 
    376                 # This may or may not be a no-op, depending on the DB. 
    377                 db.drop_sequence(column) 
    378         finally: 
    379             db.unlock() 
     372         
     373        self._copy_from_temp(temptable, self._parent_key(), tempkey) 
     374         
     375        if column.autoincrement: 
     376            # This may or may not be a no-op, depending on the DB. 
     377            db.drop_sequence(column) 
    380378     
    381379    def rename(self, oldkey, newkey): 
     
    387385         
    388386        if oldname != newname: 
    389             db.lock("Renaming property. Transactions not allowed.") 
    390             try: 
    391                 dict.__delitem__(self, oldkey) 
    392                 dict.__setitem__(self, newkey, oldcol) 
    393                 oldcol.name = newname 
    394                 oldcol.qname = db.quote(newname) 
    395                  
    396                 # Make a temporary copy. 
    397                 tempkey, temptable = self._temp_copy() 
    398                 # Bind the temp table to the DB. 
    399                 db[tempkey] = temptable 
    400                  
    401                 # Copy data from the old table to the temp table. 
    402                 selfields = [] 
    403                 for k, c in temptable.iteritems(): 
    404                     qname = c.qname 
    405                     if k == newkey: 
    406                         qname = "%s AS %s" % (db.quote(oldname), qname) 
    407                     selfields.append(qname) 
    408                 db.execute("INSERT INTO %s SELECT %s FROM %s;" % 
     387            dict.__delitem__(self, oldkey) 
     388            dict.__setitem__(self, newkey, oldcol) 
     389            oldcol.name = newname 
     390            oldcol.qname = db.quote(newname) 
     391             
     392            # Make a temporary copy. 
     393            tempkey, temptable = self._temp_copy() 
     394            # Bind the temp table to the DB. 
     395            db[tempkey] = temptable 
     396             
     397            # Copy data from the old table to the temp table. 
     398            selfields = [] 
     399            for k, c in temptable.iteritems(): 
     400                qname = c.qname 
     401                if k == newkey: 
     402                    qname = "%s AS %s" % (db.quote(oldname), qname) 
     403                selfields.append(qname) 
     404            db.execute_ddl("INSERT INTO %s SELECT %s FROM %s;" % 
    409405                           (temptable.qname, ", ".join(selfields), self.qname)) 
    410                  
    411                 self._copy_from_temp(temptable, self._parent_key(), tempkey) 
    412             finally: 
    413                 db.unlock() 
     406             
     407            self._copy_from_temp(temptable, self._parent_key(), tempkey) 
    414408     
    415409    def _grab_new_ids(self, idkeys, conn): 
     
    479473 
    480474 
     475class SQLiteConnectionManager(geniusql.ConnectionManager): 
     476     
     477    default_isolation = "SERIALIZABLE" 
     478    isolation_levels = ["SERIALIZABLE"] 
     479     
     480    def __init__(self, db, poolsize=10): 
     481        self.transactions = {} 
     482        self.db = db 
     483        self.poolsize = poolsize 
     484        if self.db.name == ":memory:": 
     485            # "Multiple connections to ":memory:" within a single process 
     486            # create a fresh database each time" 
     487            # http://www.sqlite.org/cvstrac/wiki?p=InMemoryDatabase 
     488            # So we need to give :memory: databases a SingleConnection. 
     489            self._factory = geniusql.SingleConnection(self._get_conn, self._del_conn) 
     490        else: 
     491            return geniusql.ConnectionManager.__init__(self, db, poolsize) 
     492     
     493    if _cursor_required: 
     494        def _get_conn(self): 
     495            # SQLite should create the DB if missing. 
     496            # valid _sqlite3 kwargs: "database", "timeout", "detect_types", 
     497            # "isolation_level", "check_same_thread", "factory", 
     498            # "cached_statements". 
     499            # Instead of "timeout", we re-use the old 
     500            # deadlock_timeout code inside execute. 
     501            conn = _sqlite.connect(database=self.db.name, check_same_thread=False) 
     502##            # None sets autocommit mode on. 
     503##            conn.isolation_level = None 
     504            conn.text_factory = str 
     505            return conn.cursor() 
     506    else: 
     507        def _get_conn(self): 
     508            return _sqlite.connect(self.db.name, self.db.mode) 
     509     
     510    def isolate(self, conn, isolation=None): 
     511        """Set the isolation level of the given connection. 
     512         
     513        If 'isolation' is None, our default_isolation will be used for new 
     514        connections. Valid values for the 'isolation' argument may be native 
     515        values for your particular database. However, it is recommended you 
     516        pass items from the global 'levels' list instead; these will be 
     517        automatically replaced with native values. 
     518         
     519        For many databases, this must be executed after START TRANSACTION. 
     520        """ 
     521        if isolation is None: 
     522            isolation = self.default_isolation 
     523         
     524        if isinstance(isolation, geniusql.IsolationLevel): 
     525            # Map the given IsolationLevel object to a native value. 
     526            # This base class uses the four ANSI names as native values. 
     527            isolation = isolation.name 
     528         
     529        if isolation not in self.isolation_levels: 
     530            raise ValueError("IsolationLevel %r not allowed by %s. " 
     531                             "Try one of %r instead." 
     532                             % (isolation, self.__class__.__name__, 
     533                                self.isolation_levels)) 
     534         
     535        # Nothing to do here, since we only allow one level. 
     536        pass 
     537     
     538    def start(self, isolation=None): 
     539        """Start a transaction.""" 
     540        conn = self.get(started=True) 
     541        self.db.execute("BEGIN;", conn) 
     542        self.isolate(conn, isolation) 
     543     
     544    def rollback(self): 
     545        """Roll back the current transaction.""" 
     546        key = self.id() 
     547        if key in self.transactions: 
     548            self.db.execute("ROLLBACK;", self.transactions[key]) 
     549            del self.transactions[key] 
     550     
     551    def commit(self): 
     552        """Commit the current transaction.""" 
     553        key = self.id() 
     554        if key in self.transactions: 
     555            self.db.execute("COMMIT;", self.transactions[key]) 
     556            del self.transactions[key] 
     557 
     558 
    481559class SQLiteDatabase(geniusql.Database): 
    482560     
     
    488566    adapterfromdb = AdapterFromSQLite() 
    489567    typeadapter = TypeAdapterSQLite() 
     568    connectionmanager = SQLiteConnectionManager 
    490569     
    491570    tableclass = SQLiteTable 
     
    512591     
    513592    def _get_tables(self, conn=None): 
    514         data, _ = self.fetch("SELECT name FROM sqlite_master WHERE type = 'table';") 
     593        data, _ = self.fetch("SELECT name FROM sqlite_master " 
     594                             "WHERE type = 'table';", conn) 
    515595        # Note that we set Table.created here, since these already exist in the DB. 
    516596        return [self.tableclass(row[0], self.quote(row[0]), 
     
    519599     
    520600    def _get_table(self, tablename, conn=None): 
    521         data, _ = self.fetch("SELECT name FROM sqlite_master WHERE
    522                              "name = '%s' AND type = 'table';" % tablename
     601        data, _ = self.fetch("SELECT name FROM sqlite_master WHERE name =
     602                             "'%s' AND type = 'table';" % tablename, conn
    523603        # Note that we set Table.created here, since these already exist in the DB. 
    524604        for name, in data: 
     
    568648    def _get_indices(self, tablename, conn=None): 
    569649        data, _ = self.fetch("SELECT name, tbl_name, sql FROM sqlite_master " 
    570                              "WHERE type = 'index';"
     650                             "WHERE type = 'index';", conn
    571651        indices = [] 
    572652        for row in data: 
     
    670750            pk = "" 
    671751         
    672         self.lock("Creating storage. Transactions not allowed.") 
    673         try: 
    674             self.execute('CREATE TABLE %s (%s%s);' % 
     752        self.execute_ddl('CREATE TABLE %s (%s%s);' % 
    675753                         (table.qname, ", ".join(fields), pk)) 
    676              
    677             for index in table.indices.itervalues(): 
    678                 self.execute('CREATE INDEX %s ON %s (%s);' % 
     754         
     755        for index in table.indices.itervalues(): 
     756            self.execute_ddl('CREATE INDEX %s ON %s (%s);' % 
    679757                             (index.qname, table.qname, 
    680758                              self.quote(index.colname))) 
    681              
    682             if autoincr_col: 
    683                 self.create_sequence(table, autoincr_col) 
    684              
    685             dict.__setitem__(self, key, table) 
    686         finally: 
    687             self.unlock() 
     759         
     760        if autoincr_col: 
     761            self.create_sequence(table, autoincr_col) 
     762         
     763        dict.__setitem__(self, key, table) 
    688764     
    689765    def _rename(self, oldtable, newtable): 
    690766        if _rename_table_support: 
    691             self.execute("ALTER TABLE %s RENAME TO %s" % 
    692                          (oldtable.qname, newtable.qname)) 
     767            self.execute_ddl("ALTER TABLE %s RENAME TO %s" % 
     768                             (oldtable.qname, newtable.qname)) 
    693769        else: 
    694770            raise NotImplementedError 
     
    710786        return "[" + name + "]" 
    711787     
    712     def connect(self): 
    713         if self.name == ":memory:": 
    714             # "Multiple connections to ":memory:" within a single process 
    715             # create a fresh database each time" 
    716             # http://www.sqlite.org/cvstrac/wiki?p=InMemoryDatabase 
    717             # So we need to give :memory: databases a SingleConnection. 
    718             self.connection = geniusql.SingleConnection(self._get_conn, self._del_conn) 
    719         else: 
    720             return geniusql.Database.connect(self) 
    721      
    722     if _cursor_required: 
    723         def _get_conn(self): 
    724             # SQLite should create the DB if missing. 
    725             # valid _sqlite3 kwargs: "database", "timeout", "detect_types", 
    726             # "isolation_level", "check_same_thread", "factory", 
    727             # "cached_statements". 
    728             # Instead of "timeout", we re-use the old 
    729             # deadlock_timeout code inside execute. 
    730             conn = _sqlite.connect(database=self.name, check_same_thread=False) 
    731 ##            # None sets autocommit mode on. 
    732 ##            conn.isolation_level = None 
    733             conn.text_factory = str 
    734             return conn.cursor() 
    735     else: 
    736         def _get_conn(self): 
    737             return _sqlite.connect(self.name, self.mode) 
    738      
    739     create_database = _get_conn 
     788    def create_database(self): 
     789        self.connections.get() 
    740790     
    741791    def drop_database(self): 
    742         self.disconnect() 
     792        self.connections.shutdown() 
    743793        if self.name != ":memory:": 
    744794            # This should accept relative or absolute paths 
     
    746796        self.clear() 
    747797     
    748      
    749     #                            Transactions                             # 
    750      
    751     default_isolation = "SERIALIZABLE" 
    752     isolation_levels = ["SERIALIZABLE"] 
    753      
    754     def is_lock_error(self, exc): 
     798    def is_timeout_error(self, exc): 
    755799        if not isinstance(exc, _sqlite.OperationalError): 
    756800            return False 
    757801        return exc.args[0] == 'database is locked' 
    758802     
    759     def isolate(self, conn, isolation=None): 
    760         """Set the isolation level of the given connection. 
    761          
    762         If 'isolation' is None, our default_isolation will be used for new 
    763         connections. Valid values for the 'isolation' argument may be native 
    764         values for your particular database. However, it is recommended you 
    765         pass items from the global 'levels' list instead; these will be 
    766         automatically replaced with native values. 
    767          
    768         For many databases, this must be executed after START TRANSACTION. 
    769         """ 
    770         if isolation is None: 
    771             isolation = self.default_isolation 
    772          
    773         if isinstance(isolation, geniusql.IsolationLevel): 
    774             # Map the given IsolationLevel object to a native value. 
    775             # This base class uses the four ANSI names as native values. 
    776             isolation = isolation.name 
    777          
    778         if isolation not in self.isolation_levels: 
    779             raise ValueError("IsolationLevel %r not allowed by %s. " 
    780                              "Try one of %r instead." 
    781                              % (isolation, self.__class__.__name__, 
    782                                 self.isolation_levels)) 
    783          
    784         # Nothing to do here, since we only allow one level. 
    785         pass 
    786      
    787     def start(self, isolation=None): 
    788         """Start a transaction.""" 
    789         conn = self.get_transaction(new=True) 
    790         self.execute("BEGIN;", conn) 
    791         self.isolate(conn, isolation) 
    792      
    793     def rollback(self): 
    794         """Roll back the current transaction.""" 
    795         key = self.transaction_key() 
    796         if key in self.transactions: 
    797             self.execute("ROLLBACK;", self.transactions[key]) 
    798             del self.transactions[key] 
    799      
    800     def commit(self): 
    801         """Commit the current transaction.""" 
    802         key = self.transaction_key() 
    803         if key in self.transactions: 
    804             self.execute("COMMIT;", self.transactions[key]) 
    805             del self.transactions[key] 
    806      
    807803    deadlock_timeout = 20 
    808804     
     
    810806        try: 
    811807            if conn is None: 
    812                 conn = self.connection() 
     808                conn = self.connections.get() 
    813809            if isinstance(query, unicode): 
    814810                query = query.encode(self.adaptertosql.encoding) 
     
    822818                    if ((msg.startswith("no such") or 
    823819                         msg == "database schema has changed")): 
    824                         tx = self.transactions.get(self.transaction_key()) 
    825                         if tx is None or isinstance(tx, errors.TransactionLock): 
     820                        if not self.connections.in_transaction(): 
    826821                            # Bah. Shut down all connections and get a new one, 
    827822                            # since some previous connection changed the schema. 
    828                             self.connection.shutdown() 
    829                             conn = self.connection() 
     823                            self.connections.shutdown() 
     824                            conn = self.connections._factory() 
    830825                            continue 
    831                     if self.is_lock_error(x) and self.deadlock_timeout: 
     826                    if self.is_timeout_error(x) and self.deadlock_timeout: 
    832827                        if time.time() - start < self.deadlock_timeout: 
    833828                            time.sleep(0.000001) 
  • trunk/geniusql/select.py

    r9 r10  
    22 
    33from geniusql import errors 
    4  
    54from dejavu import logic, codewalk 
    65 
     
    275274     
    276275    def _compare_constants(self, op1, op2): 
    277         """Coerce/cast compared types (or mark imperfect).""" 
     276        """Coerce/cast compared types. 
     277         
     278        If a column value is compared to a constant and no coerce or cast 
     279        adapter function is available, a TypeError is raised. 
     280        """ 
    278281        col = getattr(op1, "col", None) 
    279282        if col: 
  • trunk/geniusql/test/zoo_fixture.py

    r9 r10  
    11111111    except (AttributeError, NotImplementedError): 
    11121112        pass 
    1113     db.disconnect() 
     1113    db.connections.shutdown() 
    11141114 
    11151115def run(DB_class, name, opts):