Changeset 10
- Timestamp:
- 02/15/07 01:12:02
- Files:
-
- trunk/geniusql/__init__.py (modified) (16 diffs)
- trunk/geniusql/conn.py (modified) (3 diffs)
- trunk/geniusql/isolation.py (modified) (1 diff)
- trunk/geniusql/providers/__init__.py (modified) (1 diff)
- trunk/geniusql/providers/ado.py (modified) (16 diffs)
- trunk/geniusql/providers/firebird.py (modified) (12 diffs)
- trunk/geniusql/providers/mysql.py (modified) (9 diffs)
- trunk/geniusql/providers/postgres.py (added)
- trunk/geniusql/providers/psycopg.py (modified) (5 diffs)
- trunk/geniusql/providers/pypgsql.py (modified) (3 diffs)
- trunk/geniusql/providers/sqlite.py (modified) (14 diffs)
- trunk/geniusql/select.py (modified) (2 diffs)
- trunk/geniusql/test/zoo_fixture.py (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/geniusql/__init__.py
r9 r10 105 105 t = self.table 106 106 if t.created: 107 t.db.lock("Creating index. Transactions not allowed.") 108 try: 109 t.db.execute('CREATE INDEX %s ON %s (%s);' % 107 t.db.execute_ddl('CREATE INDEX %s ON %s (%s);' % 110 108 (index.qname, t.qname, 111 109 t.db.quote(index.colname))) 112 finally:113 t.db.unlock()114 110 dict.__setitem__(self, key, index) 115 111 … … 118 114 t = self.table 119 115 if t.created: 120 t.db.lock("Dropping index. Transactions not allowed.") 121 try: 122 t.db.execute('DROP INDEX %s ON %s;' % (self[key].qname, t.qname)) 123 finally: 124 t.db.unlock() 116 t.db.execute_ddl('DROP INDEX %s ON %s;' % 117 (self[key].qname, t.qname)) 125 118 dict.__delitem__(self, key) 126 119 … … 271 264 del self[key] 272 265 273 self.db.lock("Adding property. Transactions not allowed.") 274 try: 275 if column.autoincrement: 276 # This may or may not be a no-op, depending on the DB. 277 self.db.create_sequence(self, column) 278 self._add_column(column) 279 dict.__setitem__(self, key, column) 280 finally: 281 self.db.unlock() 266 if column.autoincrement: 267 # This may or may not be a no-op, depending on the DB. 268 self.db.create_sequence(self, column) 269 self._add_column(column) 270 dict.__setitem__(self, key, column) 282 271 283 272 def _drop_column(self, column): 284 273 """Internal function to drop the column from the database.""" 285 self.db.execute ("ALTER TABLE %s DROP COLUMN %s;" %286 (self.qname, column.qname))274 self.db.execute_ddl("ALTER TABLE %s DROP COLUMN %s;" % 275 (self.qname, column.qname)) 287 276 288 277 def __delitem__(self, key): … … 294 283 return 295 284 296 self.db.lock("Dropping property. Transactions not allowed.") 297 try: 298 column = self[key] 299 self._drop_column(column) 300 if column.autoincrement: 301 # This may or may not be a no-op, depending on the DB. 302 self.db.drop_sequence(column) 303 dict.__delitem__(self, key) 304 finally: 305 self.db.unlock() 285 column = self[key] 286 self._drop_column(column) 287 if column.autoincrement: 288 # This may or may not be a no-op, depending on the DB. 289 self.db.drop_sequence(column) 290 dict.__delitem__(self, key) 306 291 307 292 def _rename(self, oldcol, newcol): 308 293 # Override this to do the actual rename at the DB level. 309 self.db.execute ("ALTER TABLE %s RENAME COLUMN %s TO %s;" %310 (self.qname, oldcol.qname, newcol.qname))294 self.db.execute_ddl("ALTER TABLE %s RENAME COLUMN %s TO %s;" % 295 (self.qname, oldcol.qname, newcol.qname)) 311 296 312 297 def rename(self, oldkey, newkey): … … 326 311 newcol.name = newname 327 312 newcol.qname = self.db.quote(newname) 328 self.db.lock("Renaming property. Transactions not allowed.") 329 try: 330 self._rename(oldcol, newcol) 331 finally: 332 self.db.unlock() 313 self._rename(oldcol, newcol) 333 314 334 315 # Use the superclass calls to avoid DROP COLUMN/ADD COLUMN. … … 391 372 values.append(val) 392 373 393 transconn = self.db.get_transaction()374 conn = self.db.connections.get() 394 375 395 376 fields = ", ".join(fields) 396 377 values = ", ".join(values) 397 378 self.db.execute('INSERT INTO %s (%s) VALUES (%s);' % 398 (self.qname, fields, values), transconn)379 (self.qname, fields, values), conn) 399 380 400 381 if idkeys: 401 newids = self._grab_new_ids(idkeys, transconn)382 newids = self._grab_new_ids(idkeys, conn) 402 383 for key in newids.keys(): 403 384 col = self[key] … … 427 408 sql = ('UPDATE %s SET %s WHERE %s;' % 428 409 (self.qname, ", ".join(parms), self.id_clause(**inputs))) 429 self.db.execute(sql , self.db.get_transaction())410 self.db.execute(sql) 430 411 431 412 use_asterisk_to_delete_all = False … … 438 419 star = "" 439 420 self.db.execute('DELETE%s FROM %s WHERE %s;' % 440 (star, self.qname, self.id_clause(**inputs)), 441 self.db.get_transaction()) 421 (star, self.qname, self.id_clause(**inputs))) 442 422 443 423 def delete_all(self, **inputs): … … 448 428 star = "" 449 429 self.db.execute('DELETE%s FROM %s WHERE %s;' % 450 (star, self.qname, self.whereclause(**inputs)), 451 self.db.get_transaction()) 430 (star, self.qname, self.whereclause(**inputs))) 452 431 453 432 def select_all(self, restriction=None, **kwargs): … … 511 490 selectwriter = SelectWriter 512 491 tableclass = Table 492 connectionmanager = ConnectionManager 513 493 514 494 def __new__(cls, name, **kwargs): … … 524 504 self.name = self.sql_name(name) 525 505 self.qname = self.quote(self.name) 526 self.transactions = {} 527 self.connect() 506 507 poolsize = kwargs.get('poolsize', 10) 508 self.connections = self.connectionmanager(self, poolsize) 509 528 510 self.discover_dbinfo() 529 511 … … 717 699 table.created = True 718 700 719 self.lock("Creating storage. Transactions not allowed.") 720 try: 721 fields = [] 722 pk = [] 723 for column in table.itervalues(): 724 if column.autoincrement: 725 # This may or may not be a no-op, depending on the DB. 726 self.create_sequence(table, column) 727 728 fields.append(self.columnclause(column)) 729 if column.key: 730 pk.append(column.qname) 701 fields = [] 702 pk = [] 703 for column in table.itervalues(): 704 if column.autoincrement: 705 # This may or may not be a no-op, depending on the DB. 706 self.create_sequence(table, column) 731 707 732 if pk: 733 pk = ", PRIMARY KEY (%s)" % ", ".join(pk) 734 else: 735 pk = "" 736 737 self.execute('CREATE TABLE %s (%s%s);' % 708 fields.append(self.columnclause(column)) 709 if column.key: 710 pk.append(column.qname) 711 712 if pk: 713 pk = ", PRIMARY KEY (%s)" % ", ".join(pk) 714 else: 715 pk = "" 716 717 self.execute_ddl('CREATE TABLE %s (%s%s);' % 738 718 (table.qname, ", ".join(fields), pk)) 739 740 for index in table.indices.itervalues():741 self.execute('CREATE INDEX %s ON %s (%s);' %719 720 for index in table.indices.itervalues(): 721 self.execute_ddl('CREATE INDEX %s ON %s (%s);' % 742 722 (index.qname, table.qname, 743 723 self.quote(index.colname))) 744 dict.__setitem__(self, key, table) 745 finally: 746 self.unlock() 724 725 dict.__setitem__(self, key, table) 747 726 748 727 def __delitem__(self, key): 749 self.lock("Dropping storage. Transactions not allowed.") 750 try: 751 table = self[key] 752 self.execute('DROP TABLE %s;' % table.qname) 753 for col in table.itervalues(): 754 if col.autoincrement: 755 self.drop_sequence(col) 756 dict.__delitem__(self, key) 757 finally: 758 self.unlock() 728 table = self[key] 729 self.execute_ddl('DROP TABLE %s;' % table.qname) 730 for col in table.itervalues(): 731 if col.autoincrement: 732 self.drop_sequence(col) 733 dict.__delitem__(self, key) 759 734 760 735 def _rename(self, oldtable, newtable): … … 774 749 newtable.name = newname 775 750 newtable.qname = self.quote(newname) 776 self.lock("Renaming storage. Transactions not allowed.") 777 try: 778 self._rename(oldtable, newname) 779 finally: 780 self.unlock() 751 self._rename(oldtable, newname) 781 752 782 753 # Use the superclass calls to avoid DROP TABLE/CREATE TABLE. … … 840 811 return self.tableclass(name, self.quote(name), self) 841 812 842 # Connecting # 843 844 poolsize = 10 845 846 def connect(self): 847 if self.poolsize > 0: 848 self.connection = ConnectionPool(self._get_conn, self._del_conn, 849 self.poolsize) 850 else: 851 self.connection = ConnectionFactory(self._get_conn, self._del_conn) 852 853 def _get_conn(self): 854 """Create and return a connection object.""" 855 # Override this with the connection call for your DB. Example: 856 # return libpq.PQconnectdb(self.connstring) 857 raise NotImplementedError 858 859 def _del_conn(self, conn): 860 """Close a connection object.""" 861 # Override this with the close call (if any) for your DB. 862 conn.close() 863 864 def disconnect(self): 865 """Release all database connections.""" 866 self.connection.shutdown() 813 def is_timeout_error(self, exc): 814 """If the given exception instance is a lock timeout, return True. 815 816 This should return True for errors which arise from locking 817 timeouts; for example, if the database prevents 'dirty reads' 818 by raising an error. 819 """ 820 # You should definitely override this for your database. 821 return False 867 822 868 823 def execute(self, query, conn=None): 869 824 """Return a native response for the given query.""" 870 825 if conn is None: 871 conn = self.connection ()826 conn = self.connections.get() 872 827 if isinstance(query, unicode): 873 828 query = query.encode(self.adaptertosql.encoding) … … 875 830 return conn.query(query) 876 831 832 def execute_ddl(self, query, conn=None): 833 """Return a native response for the given DDL statement. 834 835 In general, DDL statements should lock out other statements 836 (especially those isolated in other transactions). Use this 837 method to perform a locked DDL statement. 838 """ 839 self.connections.lock("Transaction denied due to DDL: %r" % query) 840 try: 841 ## # Must shut down all connections to avoid 842 ## # "being accessed by other users" error? 843 ## self.connections.shutdown() 844 if conn is None: 845 # Important: use _factory(), not get(), to avoid the lock 846 conn = self.connections._factory() 847 self.execute(query, conn) 848 finally: 849 self.connections.unlock() 850 877 851 def fetch(self, query, conn=None): 878 852 """Return rowdata, columns (name, type) for the given query. … … 890 864 """Yield matching data, coerced to Python types (where known).""" 891 865 sel = self.selectwriter(self, relation, attributes, restriction) 892 data, _ = self.fetch(sel.sql(distinct) , self.get_transaction())866 data, _ = self.fetch(sel.sql(distinct)) 893 867 return ResultSet(data, sel.columns, sel.imperfect) 894 868 895 869 def create_database(self): 896 self.lock("Creating database. Transactions not allowed.") 897 try: 898 self.execute("CREATE DATABASE %s;" % self.qname) 899 self.clear() 900 finally: 901 self.unlock() 870 self.execute_ddl("CREATE DATABASE %s;" % self.qname) 871 self.clear() 902 872 903 873 def drop_database(self): 904 self.lock("Dropping database. Transactions not allowed.") 905 try: 906 # Must shut down all connections to avoid 907 # "being accessed by other users" error. 908 self.connection.shutdown() 909 self.execute("DROP DATABASE %s;" % self.qname) 910 self.clear() 911 finally: 912 self.unlock() 913 914 # Transactions # 915 916 transaction_key = threading._get_ident 917 implicit_trans = False 918 919 # The "default_isolation" value should be a value native to the DB. 920 default_isolation = None 921 922 # The values in "isolation_levels" should match the names of 923 # IsolationLevel objects in isolation.py 924 isolation_levels = ["READ UNCOMMITTED", "READ COMMITTED", 925 "REPEATABLE READ", "SERIALIZABLE"] 926 927 def get_transaction(self, new=False, isolation=None): 928 """Return the (possibly new) connection for the current transaction. 929 930 If we are already in a transaction, this returns the connection for 931 that transaction. The "current transaction" is determined by a key 932 (obtained by a call to self.transaction_key); by default, the key 933 is the current thread ID (but subclasses are free to change this). 934 935 If there is no "current transaction", a new connection object is 936 obtained by calling self.connection (which is usually a connection 937 pool object). If self.implicit_trans is True, new connections will 938 be associated with self.transaction_key(), and repeated calls to 939 get_transaction will then return the same connection object. 940 If self.implicit_trans is False, you'll get a new connection 941 (from the pool) each time. 942 """ 943 key = self.transaction_key() 944 if key in self.transactions: 945 conn = self.transactions[key] 946 if isinstance(conn, errors.TransactionLock): 947 raise conn 948 else: 949 conn = self.connection() 950 if self.implicit_trans or new: 951 self.transactions[key] = conn 952 if not new: 953 self.start(isolation) 954 return conn 955 956 def is_lock_error(self, exc): 957 """If the given exception instance is a lock timeout, return True. 958 959 This should return True for errors which arise from transaction 960 locking timeouts; for example, if the database prevents 'dirty 961 reads' by raising an error. 962 """ 963 # You should definitely override this for your database. 964 return False 965 966 def isolate(self, conn, isolation=None): 967 """Set the isolation level of the given connection. 968 969 If 'isolation' is None, our default_isolation will be used for new 970 connections. Valid values for the 'isolation' argument may be native 971 values for your particular database. However, it is recommended you 972 pass items from the global 'levels' list instead; these will be 973 automatically replaced with native values. 974 975 For many databases, this must be executed after START TRANSACTION. 976 """ 977 if isolation is None: 978 isolation = self.default_isolation 979 980 if isinstance(isolation, IsolationLevel): 981 # Map the given IsolationLevel object to a native value. 982 isolation = isolation.name 983 if isolation not in self.isolation_levels: 984 raise ValueError("IsolationLevel %r not allowed by %s. " 985 "Try one of %r instead." 986 % (isolation, self.__class__.__name__, 987 self.isolation_levels)) 988 989 # This is SQL92 syntax, and should work with most DB's. 990 self.execute("SET TRANSACTION ISOLATION LEVEL %s;" % isolation, conn) 991 992 def start(self, isolation=None): 993 """Start a transaction. Not needed if self.implicit_trans is True.""" 994 conn = self.get_transaction(new=True) 995 self.execute("START TRANSACTION;", conn) 996 self.isolate(conn, isolation) 997 998 def rollback(self): 999 """Roll back the current transaction, if any.""" 1000 key = self.transaction_key() 1001 if key in self.transactions: 1002 self.execute("ROLLBACK;", self.transactions[key]) 1003 del self.transactions[key] 1004 else: 1005 # This is critical in order to support polygonal SM structures 1006 # (same store being called twice by separate proxies). 1007 pass 1008 1009 def commit(self): 1010 """Commit the current transaction, if any.""" 1011 key = self.transaction_key() 1012 try: 1013 conn = self.transactions.pop(key) 1014 except KeyError: 1015 # This is critical in order to support polygonal SM structures 1016 # (same store being called twice by separate proxies). 1017 pass 1018 else: 1019 self.execute("COMMIT;", conn) 1020 1021 # Change this to 'error' if you don't want autocommit on schema ops. 1022 lock_contention = 'commit' 1023 1024 def lock(self, msg=None): 1025 """Deny transactions during schema operations (DDL statements). 1026 1027 Any code which calls this should also call 'unlock' in a try/finally: 1028 1029 db.lock('dropping storage') 1030 try: 1031 drop_storage(cls) 1032 finally: 1033 db.unlock() 1034 """ 1035 key = self.transaction_key() 1036 if key in self.transactions: 1037 if isinstance(self.transactions[key], errors.TransactionLock): 1038 return 1039 if self.lock_contention == 'error': 1040 raise errors.TransactionLock("Schema operations are not " 1041 "allowed inside transactions.") 1042 self.commit() 1043 1044 if msg is None: 1045 msg = "Transactions not allowed at the moment." 1046 self.transactions[key] = errors.TransactionLock(msg) 1047 1048 def unlock(self): 1049 """Allow transactions.""" 1050 key = self.transaction_key() 1051 trans = self.transactions.get(key, None) 1052 if trans is None: 1053 return 1054 if not isinstance(trans, errors.TransactionLock): 1055 raise errors.TransactionLock("Unlock called inside transaction.") 1056 del self.transactions[key] 874 # Must shut down all connections to avoid 875 # "being accessed by other users" error. 876 self.connections.shutdown() 877 self.execute_ddl("DROP DATABASE %s;" % self.qname) 878 self.clear() 1057 879 1058 880 trunk/geniusql/conn.py
r9 r10 1 """Objects for managing database connections with the geniusql package."""1 """Objects for managing database connections and transactions with Geniusql.""" 2 2 3 3 __all__ = [ 4 'ConnectionManager', 4 5 'ConnectionFactory', 'ConnectionPool', 'ConnectionWrapper', 5 6 'SingleConnection', … … 7 8 8 9 import Queue 10 import threading 9 11 import time 10 12 import weakref 11 13 12 from geniusql import errors 14 from geniusql import errors, isolation as _isolation 13 15 14 16 … … 143 145 self._conn = None 144 146 147 148 class ConnectionManager(object): 149 150 implicit_trans = False 151 152 # Change this to 'error' if you don't want autocommit on schema ops. 153 contention = 'commit' 154 155 # The "default_isolation" value should be a value native to the DB. 156 default_isolation = None 157 158 # The values in "isolation_levels" should match the names of 159 # IsolationLevel objects in isolation.py 160 isolation_levels = ["READ UNCOMMITTED", "READ COMMITTED", 161 "REPEATABLE READ", "SERIALIZABLE"] 162 163 def __init__(self, db, poolsize=10): 164 self.transactions = {} 165 self.db = db 166 self.poolsize = poolsize 167 if poolsize > 0: 168 self._factory = ConnectionPool(self._get_conn, self._del_conn, 169 poolsize) 170 else: 171 self._factory = ConnectionFactory(self._get_conn, self._del_conn) 172 173 def shutdown(self): 174 """Release all database connections.""" 175 self._factory.shutdown() 176 177 def _get_conn(self): 178 """Create and return a connection object.""" 179 # Override this with the connection call for your DB. Example: 180 # return libpq.PQconnectdb(self.connstring) 181 raise NotImplementedError 182 183 def _del_conn(self, conn): 184 """Close a connection object.""" 185 # Override this with the close call (if any) for your DB. 186 conn.close() 187 188 def get(self, started=False, isolation=None): 189 """Return the (possibly new) connection for the current transaction. 190 191 If we are already in a transaction, this returns the connection for 192 that transaction. The "current transaction" context is determined by 193 self.id(); by default, this is the current thread ID (but subclasses 194 are free to change this). If there is no "current transaction", then 195 a new connection object is obtained (usually from a pool). 196 197 If self.implicit_trans is True, a new connection will automatically 198 call "START TRANSACTION". It will also be associated with self.id(), 199 and any subsequent calls to this method will then return the same 200 connection object. If self.implicit_trans is False, new connections 201 won't be STARTed or stored; the only exception to this is if the 202 'started' argument is True, in which case the connection will be 203 stored but not automatically STARTed. 204 """ 205 key = self.id() 206 if key in self.transactions: 207 conn = self.transactions[key] 208 if isinstance(conn, errors.TransactionLock): 209 raise conn 210 else: 211 conn = self._factory() 212 if self.implicit_trans or started: 213 self.transactions[key] = conn 214 if not started: 215 self.start(isolation) 216 return conn 217 218 def id(self): 219 """The current transaction id.""" 220 return threading._get_ident() 221 222 def isolate(self, conn, isolation=None): 223 """Set the isolation level of the given connection. 224 225 If 'isolation' is None, our default_isolation will be used for new 226 connections. Valid values for the 'isolation' argument may be native 227 values for your particular database. However, it is recommended you 228 pass items from the global 'levels' list instead; these will be 229 automatically replaced with native values. 230 231 For many databases, this must be executed after START TRANSACTION. 232 """ 233 if isolation is None: 234 isolation = self.default_isolation 235 236 if isinstance(isolation, _isolation.IsolationLevel): 237 # Map the given IsolationLevel object to a native value. 238 isolation = isolation.name 239 if isolation not in self.isolation_levels: 240 raise ValueError("IsolationLevel %r not allowed by %s. " 241 "Try one of %r instead." 242 % (isolation, self.__class__.__name__, 243 self.isolation_levels)) 244 245 # This is SQL92 syntax, and should work with most DB's. 246 self.db.execute("SET TRANSACTION ISOLATION LEVEL %s;" % isolation, conn) 247 248 def start(self, isolation=None): 249 """Start a transaction. Not needed if self.implicit_trans is True.""" 250 conn = self.get(started=True) 251 self.db.execute("START TRANSACTION;", conn) 252 self.isolate(conn, isolation) 253 254 def rollback(self): 255 """Roll back the current transaction, if any.""" 256 key = self.id() 257 if key in self.transactions: 258 self.db.execute("ROLLBACK;", self.transactions[key]) 259 del self.transactions[key] 260 else: 261 # This is critical in order to support polygonal SM structures 262 # (same store being called twice by separate proxies). 263 pass 264 265 def commit(self): 266 """Commit the current transaction, if any.""" 267 try: 268 conn = self.transactions.pop(self.id()) 269 except KeyError: 270 # This is critical in order to support polygonal SM structures 271 # (same store being called twice by separate proxies). 272 pass 273 else: 274 self.db.execute("COMMIT;", conn) 275 276 def lock(self, msg=None): 277 """Deny transactions during schema operations (DDL statements). 278 279 Any code which calls this should also call 'unlock' in a try/finally: 280 281 db.connections.lock('dropping storage') 282 try: 283 db.execute("DROP TABLE %s;" % tablename) 284 finally: 285 db.connections.unlock() 286 """ 287 key = self.id() 288 if key in self.transactions: 289 if isinstance(self.transactions[key], errors.TransactionLock): 290 return 291 if self.contention == 'error': 292 raise errors.TransactionLock("Schema operations are not " 293 "allowed inside transactions.") 294 self.commit() 295 296 if msg is None: 297 msg = "Transactions not allowed at the moment." 298 self.transactions[key] = errors.TransactionLock(msg) 299 300 def unlock(self): 301 """Allow transactions.""" 302 key = self.id() 303 trans = self.transactions.get(key, None) 304 if trans is None: 305 return 306 if not isinstance(trans, errors.TransactionLock): 307 raise errors.TransactionLock("Unlock called inside transaction.") 308 del self.transactions[key] 309 310 def in_transaction(self): 311 """Return True if the current context is in a transaction. 312 313 This also returns True if the current context is executing DDL 314 statements, or is barred from starting a transaction for some 315 other reason. 316 """ 317 trans = self.transactions.get(self.id()) 318 if trans is None or isinstance(trans, errors.TransactionLock): 319 return False 320 return True 321 trunk/geniusql/isolation.py
r9 r10 1 """Isolation Level definitions for Dejavu1 """Isolation Level definitions for Geniusql 2 2 3 3 We follow the terminology of ANSI for specifying isolation levels, trunk/geniusql/providers/__init__.py
r9 r10 47 47 "mysql": "geniusql.providers.mysql.MySQLDatabase", 48 48 49 "postgres": "geniusql.providers.pypgsql.P gDatabase",50 "postgresql": "geniusql.providers.pypgsql.P gDatabase",51 "pypgsql": "geniusql.providers.pypgsql.P gDatabase",49 "postgres": "geniusql.providers.pypgsql.PyPgDatabase", 50 "postgresql": "geniusql.providers.pypgsql.PyPgDatabase", 51 "pypgsql": "geniusql.providers.pypgsql.PyPgDatabase", 52 52 53 53 "psycopg": "geniusql.providers.psycopg.PsycoPgDatabase", trunk/geniusql/providers/ado.py
r9 r10 365 365 coldef = self.db.columnclause(column) 366 366 # SQL Server doesn't use the "COLUMN" keyword with "ADD" 367 self.db.execute ("ALTER TABLE %s ADD %s;" % (self.qname, coldef))367 self.db.execute_ddl("ALTER TABLE %s ADD %s;" % (self.qname, coldef)) 368 368 369 369 def _rename(self, oldcol, newcol): 370 conn = self.db.connection ()370 conn = self.db.connections.get() 371 371 try: 372 372 cat = win32com.client.Dispatch(r'ADOX.Catalog') … … 387 387 388 388 389 class ADOConnectionManager(geniusql.ConnectionManager): 390 391 # the amount of time to try to close the db connection 392 # before raising an exception 393 shutdowntimeout = 1 # sec. 394 395 ConnectionTimeout = None 396 CommandTimeout = None 397 398 def _get_conn(self, master=False): 399 if master: 400 # Must shut down all connections to avoid 401 # "being accessed by other users" error. 402 self.shutdown() 403 404 atoms = connatoms(self.Connect) 405 atoms['INITIAL CATALOG'] = "tempdb" 406 connectstr = "; ".join(["%s=%s" % (k, v) 407 for k, v in atoms.iteritems()]) 408 else: 409 connectstr = self.Connect 410 411 conn = win32com.client.Dispatch(r'ADODB.Connection') 412 conn.Open(connectstr) 413 if self.ConnectionTimeout is not None: 414 conn.ConnectionTimeout = self.ConnectionTimeout 415 if self.CommandTimeout is not None: 416 conn.CommandTimeout = self.CommandTimeout 417 return conn 418 419 def _del_conn(self, conn): 420 for trial in xrange(self.shutdowntimeout * 10): 421 try: 422 # This may raise "Operation cannot be performed 423 # while executing asynchronously" 424 # if a prior operation has not yet completed. 425 conn.Close() 426 return 427 except pywintypes.com_error, e: 428 try: 429 ecode = e.args[2][-1] 430 except IndexError: 431 ecode = None 432 if ecode == -2146824577: 433 # "Operation cannot be performed while executing asynchronously" 434 # Try again... 435 time.sleep(0.1) 436 continue 437 raise 438 439 # Transactions # 440 441 def start(self, isolation=None): 442 """Start a transaction. Not needed if self.implicit_trans is True.""" 443 conn = self.get(started=True) 444 self.db.execute("BEGIN TRANSACTION;", conn) 445 self.isolate(conn, isolation) 446 447 389 448 class ADODatabase(geniusql.Database): 390 449 … … 393 452 tableclass = ADOTable 394 453 395 # the amount of time to try to close the db connection 396 # before raising an exception 397 shutdowntimeout = 1 # sec. 398 454 def __init__(self, name, **kwargs): 455 geniusql.Database.__init__(self, name, **kwargs) 456 self.connections.Connect = self.Connect 399 457 400 458 # Discovery # … … 587 645 588 646 def _rename(self, oldtable, newtable): 589 conn = self.connection ()647 conn = self.connections.get() 590 648 try: 591 649 cat = win32com.client.Dispatch(r'ADOX.Catalog') … … 602 660 return '[' + name + ']' 603 661 604 # Connecting #605 606 ConnectionTimeout = None607 CommandTimeout = None608 609 def _get_conn(self):610 conn = win32com.client.Dispatch(r'ADODB.Connection')611 conn.Open(self.Connect)612 if self.ConnectionTimeout is not None:613 conn.ConnectionTimeout = self.ConnectionTimeout614 if self.CommandTimeout is not None:615 conn.CommandTimeout = self.CommandTimeout616 return conn617 618 def _del_conn(self, conn):619 for trial in xrange(self.shutdowntimeout * 10):620 try:621 # This may raise "Operation cannot be performed622 # while executing asynchronously"623 # if a prior operation has not yet completed.624 conn.Close()625 return626 except pywintypes.com_error, e:627 try:628 ecode = e.args[2][-1]629 except IndexError:630 ecode = None631 if ecode == -2146824577:632 # "Operation cannot be performed while executing asynchronously"633 # Try again...634 time.sleep(0.1)635 continue636 raise637 638 662 def execute(self, query, conn=None): 639 663 if conn is None: 640 conn = self.connection ()664 conn = self.connections.get() 641 665 if isinstance(query, unicode): 642 666 query = query.encode(self.adaptertosql.encoding) 643 667 644 self.log( query)668 self.log(repr((id(conn), id(getattr(conn, "conn", None)), query))) 645 669 try: 646 if isinstance(conn, geniusql.ConnectionWrapper): 670 bareconn = conn 671 if hasattr(conn, 'conn'): 647 672 # 'conn' is a ConnectionWrapper object, which .Open 648 673 # won't accept. Pass the unwrapped connection instead. 649 conn = conn.conn 674 # Note that we CANNOT write "conn = conn.conn", because 675 # if we called get() above, we'd lose our only 676 # reference to the wrapper and our weakref callback 677 # would close the conn before we've executed the SQL. 678 bareconn = conn.conn 650 679 651 680 # Call Execute directly, skipping win32com overhead. 652 conn._oleobj_.InvokeTypes(6, 0, 1, (9, 0),653 ((8, 1), (16396, 18), (3, 49)),654 query, pythoncom.Missing, -1)681 bareconn._oleobj_.InvokeTypes(6, 0, 1, (9, 0), 682 ((8, 1), (16396, 18), (3, 49)), 683 query, pythoncom.Missing, -1) 655 684 except pywintypes.com_error, x: 656 685 x.args += (query, ) … … 661 690 """fetch(query, conn=None) -> rowdata, columns.""" 662 691 if conn is None: 663 conn = self.connection ()692 conn = self.connections.get() 664 693 665 694 try: … … 670 699 query, Empty, Empty) 671 700 else: 672 self.log(query) 673 if isinstance(conn, geniusql.ConnectionWrapper): 701 self.log(repr((id(conn), id(getattr(conn, "conn", None)), query))) 702 bareconn = conn 703 if hasattr(conn, 'conn'): 674 704 # 'conn' is a ConnectionWrapper object, which .Open 675 705 # won't accept. Pass the unwrapped connection instead. 676 conn = conn.conn706 bareconn = conn.conn 677 707 678 708 # Call conn.Open(query) directly, skipping win32com overhead. 679 res, rows_affected = conn._oleobj_.InvokeTypes(6, 0, 1, (9, 0),709 res, rows_affected = bareconn._oleobj_.InvokeTypes(6, 0, 1, (9, 0), 680 710 ((8, 1), (16396, 18), (3, 49)), 681 711 # *args = … … 725 755 726 756 return data, columns 727 728 729 # Transactions #730 731 def start(self, isolation=None):732 """Start a transaction. Not needed if self.implicit_trans is True."""733 conn = self.get_transaction(new=True)734 self.execute("BEGIN TRANSACTION;", conn)735 self.isolate(conn, isolation)736 757 737 758 … … 840 861 841 862 def _rename(self, oldcol, newcol): 842 self.db.execute ("EXEC sp_rename '%s.%s', '%s', 'COLUMN'" %843 (self.name, oldcol.name, newcol.name))863 self.db.execute_ddl("EXEC sp_rename '%s.%s', '%s', 'COLUMN'" % 864 (self.name, oldcol.name, newcol.name)) 844 865 845 866 def _grab_new_ids(self, idkeys, conn): … … 853 874 854 875 876 class SQLServerConnectionManager(ADOConnectionManager): 877 878 default_isolation = "READ COMMITTED" 879 880 855 881 class SQLServerDatabase(ADODatabase): 856 882 … … 859 885 adaptertosql = AdapterToADOSQL_SQLServer() 860 886 typeadapter = TypeAdapter_SQLServer() 887 connectionmanager = SQLServerConnectionManager 861 888 862 889 def __init__(self, name, **kwargs): 863 890 ADODatabase.__init__(self, name, **kwargs) 864 891 if "2005" in self.version(): 865 self. isolation_levels.append("SNAPSHOT")892 self.connections.isolation_levels.append("SNAPSHOT") 866 893 867 894 def version(self): 868 adoconn = self._template_conn()869 adov = adoconn.Version870 data, coldefs = self.fetch("SELECT @@VERSION;", adoconn)895 conn = self.connections._get_conn(master=True) 896 adov = conn.Version 897 data, coldefs = self.fetch("SELECT @@VERSION;", conn) 871 898 sqlv, = data[0] 872 adoconn.Close()873 del adoconn899 conn.Close() 900 del conn 874 901 return "ADO Version: %s\n%s" % (adov, sqlv) 875 902 876 def _template_conn(self):877 adoconn = win32com.client.Dispatch(r'ADODB.Connection')878 atoms = connatoms(self.Connect)879 atoms['INITIAL CATALOG'] = "tempdb"880 adoconn.Open("; ".join(["%s=%s" % (k, v) for k, v in atoms.iteritems()]))881 return adoconn882 883 903 def create_database(self): 884 self.lock("Creating database. Transactions not allowed.") 885 try: 886 adoconn = self._template_conn() 887 adoconn.Execute("CREATE DATABASE %s" % self.qname) 888 adoconn.Close() 889 self.clear() 890 finally: 891 self.unlock() 904 conn = self.connections._get_conn(master=True) 905 self.execute_ddl("CREATE DATABASE %s;" % self.qname, conn) 906 conn.Close() 907 self.clear() 892 908 893 909 def drop_database(self): 894 self.lock("Dropping database. Transactions not allowed.") 895 try: 896 # Must shut down all connections to avoid 897 # "being accessed by other users" error. 898 self.connection.shutdown() 899 900 adoconn = self._template_conn() 901 adoconn.Execute("DROP DATABASE %s;" % self.qname) 902 adoconn.Close() 903 self.clear() 904 finally: 905 self.unlock() 910 conn = self.connections._get_conn(master=True) 911 self.execute_ddl("DROP DATABASE %s;" % self.qname, conn) 912 conn.Close() 913 self.clear() 906 914 907 915 def columnclause(self, column): … … 929 937 return '%s %s%s' % (column.qname, dbtype, clause) 930 938 931 # Transactions # 932 933 default_isolation = "READ COMMITTED" 934 935 def is_lock_error(self, exc): 939 def is_timeout_error(self, exc): 936 940 """If the given exception instance is a lock timeout, return True. 937 941 … … 1055 1059 1056 1060 1061 class MSAccessConnectionManager(ADOConnectionManager): 1062 1063 poolsize = 0 1064 default_isolation = "READ UNCOMMITTED" 1065 isolation_levels = ["READ UNCOMMITTED",] 1066 1067 def __init__(self, db, poolsize=0): 1068 self.transactions = {} 1069 self.db = db 1070 self.poolsize = 0 1071 # MS Access can't use a pool, because there doesn't seem 1072 # to be a commit timeout. See http://support.microsoft.com/kb/200300 1073 # for additional synchronization issues. 1074 self._factory = geniusql.SingleConnection(self._get_conn, self._del_conn) 1075 1076 def isolate(self, conn, isolation=None): 1077 """Set the isolation level of the given connection. 1078 1079 If 'isolation' is None, our default_isolation will be used for new 1080 connections. Valid values for the 'isolation' argument may be native 1081 values for your particular database. However, it is recommended you 1082 pass items from the global 'levels' list instead; these will be 1083 automatically replaced with native values. 1084 1085 For many databases, this must be executed after START TRANSACTION. 1086 """ 1087 if isolation is None: 1088 isolation = self.default_isolation 1089 1090 if isinstance(isolation, geniusql.IsolationLevel): 1091 # Map the given IsolationLevel object to a native value. 1092 isolation = isolation.name 1093 if isolation not in self.isolation_levels: 1094 raise ValueError("IsolationLevel %r not allowed by %s." 1095 % (isolation, self.__class__.__name__)) 1096 1097 # No action to take, since you can't actually set iso level. 1098 pass 1099 1100 1057 1101 class MSAccessDatabase(ADODatabase): 1058 1102 … … 1061 1105 typeadapter = TypeAdapter_MSAccess() 1062 1106 tableclass = MSAccessTable 1107 connectionmanager = MSAccessConnectionManager 1063 1108 1064 1109 def version(self): 1065 adoconn = win32com.client.Dispatch(r'ADODB.Connection')1066 v = adoconn.Version1067 del adoconn1110 conn = win32com.client.Dispatch(r'ADODB.Connection') 1111 v = conn.Version 1112 del conn 1068 1113 return "ADO Version: %s" % v 1069 1114 … … 1071 1116 cols = ADODatabase._get_columns(self, tablename, conn) 1072 1117 if conn is None: 1073 conn = self.connection ()1118 conn = self.connections._factory() 1074 1119 1075 1120 try: 1076 1121 # Horrible hack to get autoincrement property 1077 1122 query = "SELECT * FROM %s WHERE FALSE" % self.quote(tablename) 1078 if isinstance(conn, geniusql.ConnectionWrapper): 1123 bareconn = conn 1124 if hasattr(conn, 'conn'): 1079 1125 # 'conn' is a ConnectionWrapper object, which .Open 1080 1126 # won't accept. Pass the unwrapped connection instead. 1081 conn = conn.conn1127 bareconn = conn.conn 1082 1128 1083 1129 # Call conn.Open(query) directly, skipping win32com overhead. … … 1149 1195 return '%s %s' % (column.qname, dbtype) 1150 1196 1151 # Connecting #1152 1153 poolsize = 01154 1155 def connect(self):1156 # MS Access can't use a pool, because there doesn't seem1157 # to be a commit timeout. See http://support.microsoft.com/kb/2003001158 # for additional synchronization issues.1159 self.connection = geniusql.SingleConnection(self._get_conn, self._del_conn)1160 1161 1197 def create_database(self): 1162 self.lock("Creating database. Transactions not allowed.") 1163 try: 1164 # By not providing an Engine Type, it defaults to 5 = Access 2000. 1165 cat = win32com.client.Dispatch(r'ADOX.Catalog') 1166 cat.Create(self.Connect) 1167 cat.ActiveConnection.Close() 1168 self.clear() 1169 finally: 1170 self.unlock() 1198 # By not providing an Engine Type, it defaults to 5 = Access 2000. 1199 cat = win32com.client.Dispatch(r'ADOX.Catalog') 1200 cat.Create(self.Connect) 1201 cat.ActiveConnection.Close() 1202 self.clear() 1171 1203 1172 1204 def drop_database(self): 1173 self.lock("Dropping database. Transactions not allowed.") 1174 try: 1175 # Must shut down our only connection to avoid 1176 # "Permission denied" error on os.remove call below. 1177 self.connection.shutdown() 1178 1179 import os 1180 # This should accept relative or absolute paths 1181 if os.path.exists(self.name): 1182 os.remove(self.name) 1183 self.clear() 1184 finally: 1185 self.unlock() 1186 1187 default_isolation = "READ UNCOMMITTED" 1188 isolation_levels = ["READ UNCOMMITTED",] 1189 1190 def isolate(self, conn, isolation=None): 1191 """Set the isolation level of the given connection. 1192 1193 If 'isolation' is None, our default_isolation will be used for new 1194 connections. Valid values for the 'isolation' argument may be native 1195 values for your particular database. However, it is recommended you 1196 pass items from the global 'levels' list instead; these will be 1197 automatically replaced with native values. 1198 1199 For many databases, this must be executed after START TRANSACTION. 1200 """ 1201 if isolation is None: 1202 isolation = self.default_isolation 1203 1204 if isinstance(isolation, geniusql.IsolationLevel): 1205 # Map the given IsolationLevel object to a native value. 1206 isolation = isolation.name 1207 if isolation not in self.isolation_levels: 1208 raise ValueError("IsolationLevel %r not allowed by %s." 1209 % (isolation, self.__class__.__name__)) 1210 1211 # No action to take, since you can't actually set iso level. 1212 pass 1205 # Must shut down our only connection to avoid 1206 # "Permission denied" error on os.remove call below. 1207 self.connections.shutdown() 1208 1209 import os 1210 # This should accept relative or absolute paths 1211 if os.path.exists(self.name): 1212 os.remove(self.name) 1213 self.clear() 1213 1214 1214 1215 trunk/geniusql/providers/firebird.py
r9 r10 243 243 coldef = self.db.columnclause(column) 244 244 # FB doesn't recognize the keyword "COLUMN" in "ADD". 245 self.db.execute ("ALTER TABLE %s ADD %s;" % (self.qname, coldef))245 self.db.execute_ddl("ALTER TABLE %s ADD %s;" % (self.qname, coldef)) 246 246 247 247 def _drop_column(self, column): 248 248 """Internal function to drop the column from the database.""" 249 249 # FB doesn't recognize the keyword "COLUMN" in "DROP". 250 self.db.execute ("ALTER TABLE %s DROP %s;" %251 (self.qname, column.qname))250 self.db.execute_ddl("ALTER TABLE %s DROP %s;" % 251 (self.qname, column.qname)) 252 252 253 253 def _rename(self, oldcol, newcol): 254 254 # FB doesn't use the keyword "RENAME". 255 self.db.execute ("ALTER TABLE %s ALTER COLUMN %s TO %s;" %256 (self.qname, oldcol.qname, newcol.qname))255 self.db.execute_ddl("ALTER TABLE %s ALTER COLUMN %s TO %s;" % 256 (self.qname, oldcol.qname, newcol.qname)) 257 257 258 258 def insert(self, **inputs): … … 283 283 values = ", ".join(values) 284 284 self.db.execute('INSERT INTO %s (%s) VALUES (%s);' 285 % (self.qname, fields, values), 286 self.db.get_transaction()) 285 % (self.qname, fields, values)) 287 286 288 287 return newids 288 289 290 class FirebirdConnectionManager(geniusql.ConnectionManager): 291 292 default_isolation = kinterbasdb.isc_tpb_read_committed 293 _no_iso_tpb = ( 294 kinterbasdb.isc_tpb_version3 295 + kinterbasdb.isc_tpb_shared 296 + kinterbasdb.isc_tpb_nowait 297 + kinterbasdb.isc_tpb_write 298 + kinterbasdb.isc_tpb_rec_version 299 ) 300 isolation_levels = ["READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE"] 301 302 def _get_conn(self): 303 conn = kinterbasdb.connect(host=self.db.host, 304 database=self.db.name, 305 user=self.db.user, 306 password=self.db.password, 307 charset=self.db.encoding, 308 ) 309 # Set the default TPB (for implicit transactions). 310 conn.default_tpb = self._no_iso_tpb + self.default_isolation 311 312 # Remove converters for FIXED so we can mix fixedpoint and decimal 313 conn.set_type_trans_in({'FIXED': None}) 314 conn.set_type_trans_out({'FIXED': None}) 315 return conn 316 317 def isolate(self, isolation=None): 318 """isolate() is not implemented for Firebird.""" 319 raise NotImplementedError("Firebird does not allow arbitrary re-isolation.") 320 321 def start(self, isolation=None): 322 """Start a transaction. Not needed if self.implicit_trans is True.""" 323 if isolation is None: 324 isolation = self.default_isolation 325 326 if isinstance(isolation, geniusql.IsolationLevel): 327 # Map the given IsolationLevel object to a native value. 328 # This base class uses the four ANSI names as native values. 329 isolation = isolation.name 330 if isolation not in self.isolation_levels: 331 raise ValueError("IsolationLevel %r not allowed by %s. " 332 "Try one of %r instead." 333 % (isolation, self.__class__.__name__, 334 self.isolation_levels)) 335 336 if isolation == "READ COMMITTED": 337 isolation = kinterbasdb.isc_tpb_read_committed 338 elif isolation == "REPEATABLE READ": 339 isolation = kinterbasdb.isc_tpb_concurrency 340 else: 341 isolation = kinterbasdb.isc_tpb_consistency 342 343 conn = self.get(started=True) 344 self.db.log("START TRANSACTION;") 345 conn.begin(self._no_iso_tpb + isolation) 346 347 def rollback(self): 348 """Roll back the current transaction, if any.""" 349 try: 350 conn = self.transactions.pop(self.id()) 351 except KeyError: 352 pass 353 else: 354 self.db.log("ROLLBACK;") 355 conn.rollback() 356 357 def commit(self): 358 """Commit the current transaction, if any.""" 359 try: 360 conn = self.transactions.pop(self.id()) 361 except KeyError: 362 pass 363 else: 364 self.db.log("COMMIT;") 365 conn.commit() 289 366 290 367 … … 298 375 typeadapter = TypeAdapterFirebird() 299 376 tableclass = FirebirdTable 377 connectionmanager = FirebirdConnectionManager 300 378 301 379 sql_name_max_length = 31 … … 315 393 316 394 self.qname = self.quote(self.name) 317 self.transactions = {} 318 self.connect() 395 self.connections = self.connectionmanager(self, kwargs.get('poolsize', 10)) 319 396 self.discover_dbinfo() 320 397 … … 478 555 sname = self.quote("%s_%s_seq" % (table.name, column.name)) 479 556 column.sequence_name = sname 480 self.execute ("CREATE GENERATOR %s;" % sname)481 self.execute ("SET GENERATOR %s TO %s;" % (sname, column.initial - 1))557 self.execute_ddl("CREATE GENERATOR %s;" % sname) 558 self.execute_ddl("SET GENERATOR %s TO %s;" % (sname, column.initial - 1)) 482 559 483 560 def drop_sequence(self, column): 484 561 """Drop a SEQUENCE for the given column and remove its sequence_name.""" 485 562 if column.sequence_name is not None: 486 self.execute ("DROP GENERATOR %s;" % column.sequence_name)563 self.execute_ddl("DROP GENERATOR %s;" % column.sequence_name) 487 564 column.sequence_name = None 488 565 … … 493 570 return '"' + name.replace('"', '""') + '"' 494 571 495 def _get_conn(self):496 conn = kinterbasdb.connect(host=self.host,497 database=self.name,498 user=self.user,499 password=self.password,500 charset=self.encoding,501 )502 # Set the default TPB (for implicit transactions).503 conn.default_tpb = self._no_iso_tpb + self.default_isolation504 505 # Remove converters for FIXED so we can mix fixedpoint and decimal506 conn.set_type_trans_in({'FIXED': None})507 conn.set_type_trans_out({'FIXED': None})508 return conn509 510 572 deadlock_timeout = 10 511 573 … … 513 575 try: 514 576 if conn is None: 515 conn = self.connection ()577 conn = self.connections.get() 516 578 if isinstance(query, unicode): 517 579 query = query.encode(self.adaptertosql.encoding) … … 526 588 # If we're not in a transaction, we need to auto-commit. 527 589 # This prevents "Previous transaction still active" errors. 528 key = self.transaction_key() 529 trans = self.transactions.get(key) 530 if trans is None or isinstance(trans, errors.TransactionLock): 590 if not self.connections.in_transaction(): 531 591 conn.commit() 532 592 533 593 return 534 594 except Exception, x: 535 if self.is_ lock_error(x) and self.deadlock_timeout:595 if self.is_timeout_error(x) and self.deadlock_timeout: 536 596 if time.time() - start < self.deadlock_timeout: 537 597 time.sleep(0.000001) … … 553 613 try: 554 614 if conn is None: 555 conn = self.connection ()615 conn = self.connections.get() 556 616 if isinstance(query, unicode): 557 617 query = query.encode(self.adaptertosql.encoding) … … 566 626 # If we're not in a transaction, we need to auto-commit. 567 627 # This prevents "Previous transaction still active" errors. 568 key = self.transaction_key() 569 trans = self.transactions.get(key) 570 if trans is None or isinstance(trans, errors.TransactionLock): 628 if not self.connections.in_transaction(): 571 629 conn.commit() 572 630 … … 581 639 582 640 def create_database(self): 583 self.lock("Creating database. Transactions not allowed.") 584 try: 585 # Firebird DB 'names' are actually filesystem paths. 586 sql = ("CREATE DATABASE %s USER '%s' PASSWORD '%s';" 587 % (self.qname, self.user, self.password)) 588 589 # Use the kinterbasdb helper methods for cleaner create and drop. 590 # We also use dialect 3 *always* to help with quoted identifiers. 591 conn = kinterbasdb.create_database(sql, 3) 592 conn.close() 593 594 self.clear() 595 finally: 596 self.unlock() 641 # Firebird DB 'names' are actually filesystem paths. 642 sql = ("CREATE DATABASE %s USER '%s' PASSWORD '%s';" 643 % (self.qname, self.user, self.password)) 644 645 # Use the kinterbasdb helper methods for cleaner create and drop. 646 # We also use dialect 3 *always* to help with quoted identifiers. 647 conn = kinterbasdb.create_database(sql, 3) 648 conn.close() 649 650 self.clear() 597 651 598 652 def drop_database(self): 599 self.lock("Dropping database. Transactions not allowed.") 600 try: 601 # Must shut down all connections to avoid 602 # "being accessed by other users" error. 603 self.connection.shutdown() 604 605 conn = self._get_conn() 606 conn.drop_database() 607 608 self.clear() 609 finally: 610 self.unlock() 611 612 # Transactions # 613 614 default_isolation = kinterbasdb.isc_tpb_read_committed 615 _no_iso_tpb = ( 616 kinterbasdb.isc_tpb_version3 617 + kinterbasdb.isc_tpb_shared 618 + kinterbasdb.isc_tpb_nowait 619 + kinterbasdb.isc_tpb_write 620 + kinterbasdb.isc_tpb_rec_version 621 ) 622 isolation_levels = ["READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE"] 623 624 def is_lock_error(self, exc): 653 # Must shut down all connections to avoid 654 # "being accessed by other users" error. 655 self.connections.shutdown() 656 657 conn = self.connections._get_conn() 658 conn.drop_database() 659 # For some reason, the conn is already closed... 660 ## conn.close() 661 self.clear() 662 663 def is_timeout_error(self, exc): 625 664 """If the given exception instance is a lock timeout, return True. 626 665 … … 636 675 return "lock conflict" in exc.args[1] 637 676 638 def isolate(self, isolation=None):639 """isolate() is not implemented for Firebird."""640 raise NotImplementedError("Firebird does not allow arbitrary re-isolation.")641 642 def start(self, isolation=None):643 """Start a transaction. Not needed if self.implicit_trans is True."""644 if isolation is None:645 isolation = self.default_isolation646 647 if isinstance(isolation, geniusql.IsolationLevel):648 # Map the given IsolationLevel object to a native value.649 # This base class uses the four ANSI names as native values.650 isolation = isolation.name651 if isolation not in self.isolation_levels:652 raise ValueError("IsolationLevel %r not allowed by %s. "653 "Try one of %r instead."654 % (isolation, self.__class__.__name__,655 self.isolation_levels))656 657 if isolation == "READ COMMITTED":658 isolation = kinterbasdb.isc_tpb_read_committed659 elif isolation == "REPEATABLE READ":660 isolation = kinterbasdb.isc_tpb_concurrency661 else:662 isolation = kinterbasdb.isc_tpb_consistency663 664 conn = self.get_transaction(new=True)665 conn.begin(self._no_iso_tpb + isolation)666 667 def rollback(self):668 """Roll back the current transaction, if any."""669 key = self.transaction_key()670 try:671 conn = self.transactions.pop(key)672 except KeyError:673 pass674 else:675 conn.rollback()676 677 def commit(self):678 """Commit the current transaction, if any."""679 key = self.transaction_key()680 try:681 conn = self.transactions.pop(key)682 except KeyError:683 pass684 else:685 conn.commit()686 687 677 def version(self): 688 678 import kinterbasdb.services trunk/geniusql/providers/mysql.py
r9 r10 141 141 def __delitem__(self, key): 142 142 t = self.table 143 t.db.lock("Dropping index. Transactions not allowed.") 144 try: 145 # MySQL might rename multiple-column indices to "PRIMARY" 146 for i in t.db._get_indices(t.name): 147 if i.colname == self[key].colname: 148 t.db.execute('DROP INDEX %s ON %s;' % (i.qname, t.qname)) 149 finally: 150 t.db.unlock() 143 # MySQL might rename multiple-column indices to "PRIMARY" 144 for i in t.db._get_indices(t.name): 145 if i.colname == self[key].colname: 146 t.db.execute_ddl('DROP INDEX %s ON %s;' % (i.qname, t.qname)) 151 147 152 148 … … 156 152 157 153 def _rename(self, oldcol, newcol): 158 self.db.execute ("ALTER TABLE %s CHANGE %s %s %s;" %159 (self.qname, oldcol.qname, newcol.qname,160 oldcol.dbtype))154 self.db.execute_ddl("ALTER TABLE %s CHANGE %s %s %s;" % 155 (self.qname, oldcol.qname, newcol.qname, 156 oldcol.dbtype)) 161 157 162 158 def _grab_new_ids(self, idkeys, conn): 163 159 return {idkeys[0]: conn.insert_id()} 160 164 161 165 162 … … 169 166 "cursorclass", "client_flag", 170 167 ] 168 169 class MySQLConnectionManager(geniusql.ConnectionManager): 170 171 # InnoDB default 172 default_isolation = "REPEATABLE READ" 173 174 def _get_conn(self, master=False): 175 if master: 176 args = self.connargs.copy() 177 args['db'] = 'mysql' 178 else: 179 args = self.connargs 180 181 try: 182 conn = _mysql.connect(**args) 183 except _mysql.OperationalError, x: 184 if x.args[0] == 1040: # Too many connections 185 raise errors.OutOfConnectionsError 186 raise 187 return conn 188 171 189 172 190 class MySQLDatabase(geniusql.Database): … … 187 205 tableclass = MySQLTable 188 206 indexsetclass = MySQLIndexSet 189 190 # InnoDB default 191 default_isolation = "REPEATABLE READ" 207 connectionmanager = MySQLConnectionManager 192 208 193 209 def __init__(self, name, **kwargs): 194 210 geniusql.Database.__init__(self, name, **kwargs) 195 211 196 self.conn args = dict([(k, v) for k, v in kwargs.iteritems()197 if k in connargs])212 self.connections.connargs = dict([(k, v) for k, v in kwargs.iteritems() 213 if k in connargs]) 198 214 199 215 self.decompiler = MySQLDecompiler … … 201 217 # Get the version string from MySQL, to see if we need 202 218 # a different decompiler. 203 conn = self. _template_conn()219 conn = self.connections._get_conn(master=True) 204 220 rowdata, cols = self.fetch("SELECT version();", conn) 205 221 conn.close() … … 274 290 encoding = " CHARACTER SET %s" % encoding 275 291 276 self.lock("Creating storage. Transactions not allowed.") 277 try: 278 self.execute('CREATE TABLE %s (%s%s)%s;' % 292 self.execute_ddl('CREATE TABLE %s (%s%s)%s;' % 279 293 (table.qname, ", ".join(fields), pk, encoding)) 280 281 if incr_fields:282 # Wow, what a hack. We have to INSERT a dummy row to set the283 # autoincrement initial value(s), and we can't delete it until284 # after the CREATE INDEX statements (or the counter will revert).285 fields = ", ".join([col.qname for col in incr_fields])286 values = ", ".join([str(col.initial - 1) for col in incr_fields])287 self.execute("INSERT INTO %s (%s) VALUES (%s);"294 295 if incr_fields: 296 # Wow, what a hack. We have to INSERT a dummy row to set the 297 # autoincrement initial value(s), and we can't delete it until 298 # after the CREATE INDEX statements (or the counter will revert). 299 fields = ", ".join([col.qname for col in incr_fields]) 300 values = ", ".join([str(col.initial - 1) for col in incr_fields]) 301 self.execute_ddl("INSERT INTO %s (%s) VALUES (%s);" 288 302 % (table.qname, fields, values)) 289 290 for k, index in table.indices.iteritems():291 dbtype = table[k].dbtype292 if dbtype.endswith('BLOB') or dbtype == 'TEXT':293 # MySQL won't allow indexes on a BLOB field without a294 # specific index prefix length. We choose 255 just for fun.295 self.execute('CREATE INDEX %s ON %s (%s(255));' %303 304 for k, index in table.indices.iteritems(): 305 dbtype = table[k].dbtype 306 if dbtype.endswith('BLOB') or dbtype == 'TEXT': 307 # MySQL won't allow indexes on a BLOB field without a 308 # specific index prefix length. We choose 255 just for fun. 309 self.execute_ddl('CREATE INDEX %s ON %s (%s(255));' % 296 310 (index.qname, table.qname, q(index.colname))) 297 else:298 self.execute('CREATE INDEX %s ON %s (%s);' %311 else: 312 self.execute_ddl('CREATE INDEX %s ON %s (%s);' % 299 313 (index.qname, table.qname, q(index.colname))) 300 301 if incr_fields: 302 self.execute("DELETE FROM %s" % table.qname) 303 finally: 304 self.unlock() 314 315 if incr_fields: 316 self.execute_ddl("DELETE FROM %s" % table.qname) 305 317 306 318 dict.__setitem__(self, key, table) … … 432 444 return '`' + name.replace('`', '``') + '`' 433 445 434 def _get_conn(self):435 try:436 conn = _mysql.connect(**self.connargs)437 except _mysql.OperationalError, x:438 if x.args[0] == 1040: # Too many connections439 raise errors.OutOfConnectionsError440 raise441 return conn442 443 def _template_conn(self):444 tmplconn = self.connargs.copy()445 tmplconn['db'] = 'mysql'446 return _mysql.connect(**tmplconn)447 448 446 def execute(self, query, conn=None): 449 447 """execute(query, conn=None) -> result set.""" 450 448 if conn is None: 451 conn = self.connection ()449 conn = self.connections.get() 452 450 if isinstance(query, unicode): 453 451 query = query.encode(self.adaptertosql.encoding) … … 469 467 """ 470 468 if conn is None: 471 conn = self.connection ()469 conn = self.connections.get() 472 470 self.execute(query, conn) 473 471 … … 489 487 490 488 def create_database(self): 491 self.lock("Creating database. Transactions not allowed.") 492 try: 493 # _mysql has create_db and drop_db commands, but they're deprecated. 494 encoding = self.encoding 495 if encoding: 496 encoding = " CHARACTER SET %s" % encoding 497 sql = 'CREATE DATABASE %s%s;' % (self.qname, encoding) 498 conn = self._template_conn() 499 self.execute(sql, conn) 500 conn.close() 501 self.clear() 502 finally: 503 self.unlock() 489 # _mysql has create_db and drop_db commands, but they're deprecated. 490 encoding = self.encoding 491 if encoding: 492 encoding = " CHARACTER SET %s" % encoding 493 sql = 'CREATE DATABASE %s%s;' % (self.qname, encoding) 494 conn = self.connections._get_conn(master=True) 495 self.execute_ddl(sql, conn) 496 conn.close() 497 self.clear() 504 498 505 499 def drop_database(self): 506 self.lock("Dropping database. Transactions not allowed.") 507 try: 508 sql = 'DROP DATABASE %s;' % self.qname 509 conn = self._template_conn() 510 self.execute(sql, conn) 511 conn.close() 512 self.clear() 513 finally: 514 self.unlock() 515 516 def is_lock_error(self, exc): 500 conn = self.connections._get_conn(master=True) 501 self.execute_ddl('DROP DATABASE %s;' % self.qname, conn) 502 conn.close() 503 self.clear() 504 505 def is_timeout_error(self, exc): 517 506 # OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction') 518 507 if not isinstance(exc, _mysql.OperationalError): trunk/geniusql/providers/psycopg.py
r9 r10 8 8 9 9 10 import datetime11 try:12 import cPickle as pickle13 except ImportError:14 import pickle15 import re16 seq_name = re.compile(r"nextval\('([^:]+)'.*\)")17 escape_oct = re.compile(r"[\000-\037\177-\377]")18 replace_oct = lambda m: r"\\%03o" % ord(m.group(0))19 unescape_oct = re.compile(r"\\(\d\d\d)")20 replace_unoct = lambda m: chr(int(m.group(1), 8))21 22 10 import geniusql 23 from geniusql import errors, typerefs 11 from geniusql import errors 12 from geniusql.providers import postgres 24 13 25 14 26 class AdapterToPsycoPg(geniusql.AdapterToSQL): 27 28 like_escapes = [("%", r"\\%"), ("_", r"\\_")] 29 30 # Do these need to know if "SHOW DateStyle;" != "ISO, MDY" ? 31 def coerce_datetime_datetime_to_any(self, value): 32 return ("'%04d-%02d-%02d %02d:%02d:%02d.%06d'" % 33 (value.year, value.month, value.day, 34 value.hour, value.minute, value.second, 35 value.microsecond)) 36 37 def coerce_datetime_date_to_any(self, value): 38 return "'%04d-%02d-%02d'" % (value.year, value.month, value.day) 39 40 def coerce_datetime_time_to_any(self, value): 41 return ("'%02d:%02d:%02d.%06d'" % 42 (value.hour, value.minute, value.second, value.microsecond)) 43 44 def coerce_any_to_bytea(self, value): 45 # See http://www.postgresql.org/docs/8.1/interactive/datatype-binary.html 46 value = pickle.dumps(value, 2) 47 def repl(char): 48 o = ord(char) 49 if o <= 31 or o == 39 or o == 92 or o >= 127: 50 return r"\\%03d" % int(oct(o)) 51 return char 52 return "'%s'::bytea" % "".join(map(repl, value)) 53 54 def do_pickle(self, value): 55 value = pickle.dumps(value, 2) 56 value = self.coerce_str_to_any(value, skip_encoding=False) 57 return value 58 coerce_dict_to_any = do_pickle 59 coerce_list_to_any = do_pickle 60 coerce_tuple_to_any = do_pickle 61 62 def coerce_str_to_any(self, value, skip_encoding=False): 63 if not skip_encoding and not isinstance(value, str): 64 value = value.encode(self.encoding) 65 for pat, repl in self.escapes: 66 value = value.replace(pat, repl) 67 68 # Escape octal sequences 69 value = escape_oct.sub(replace_oct, value) 70 return "'" + value + "'" 71 72 def coerce_float_to_REAL(self, value): 73 # Use quotes to restrict the value to single precision, so that 74 # comparisons work between existing values and supplied constants. 75 # See http://archives.postgresql.org/pgsql-bugs/2004-02/msg00062.php 76 return "'%r'" % value 77 coerce_float_to_FLOAT4 = coerce_float_to_REAL 78 79 80 class AdapterFromPsycoPg(geniusql.AdapterFromDB): 81 82 def coerce_any_to_str(self, value): 83 # Unescape octal sequences 84 value = unescape_oct.sub(replace_unoct, value) 85 if isinstance(value, unicode): 86 return value.encode(self.encoding) 87 else: 88 return str(value) 15 class AdapterFromPsycoPg(postgres.AdapterFromPg): 89 16 90 17 def coerce_any_to_datetime_datetime(self, value): … … 98 25 99 26 100 class PsycoPg Decompiler(geniusql.SQLDecompiler):27 class PsycoPgConnectionManager(geniusql.ConnectionManager): 101 28 102 def dejavu_icontainedby(self, op1, op2): 103 if isinstance(op1, geniusql.ConstWrapper): 104 # Looking for text in a field. Use ILike (reverse terms). 105 return op2 + " ILIKE '%" + self.adapter.escape_like(op1) + "%'" 29 default_isolation = "READ COMMITTED" 30 31 def _get_conn(self, master=False): 32 if master: 33 # Must shut down all connections to avoid 34 # "being accessed by other users" error. 35 self.shutdown() 36 37 connstr = "" 38 for atom in self.Connect.split(" "): 39 k, v = atom.split("=", 1) 40 if k == 'dbname': 41 v = 'template1' 42 connstr += "%s=%s " % (k, v) 106 43 else: 107 # Looking for field in (a, b, c). 108 # Force all args to lowercase for case-insensitive comparison. 109 atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue] 110 return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms)) 44 connstr = self.Connect 45 46 try: 47 c = _psycopg.connect(connstr) 48 # Allow statements like CREATE DATABASE to run outside a transaction. 49 c.set_isolation_level(0) 50 return c 51 except _psycopg.DatabaseError, x: 52 if x.args[0].startswith('could not connect'): 53 raise errors.OutOfConnectionsError() 54 raise 111 55 112 def dejavu_istartswith(self, x, y): 113 return x + " ILIKE '" + self.adapter.escape_like(y) + "%'" 114 115 def dejavu_iendswith(self, x, y): 116 return x + " ILIKE '%" + self.adapter.escape_like(y) + "'" 117 118 def dejavu_ieq(self, x, y): 119 # ILIKE with no wildcards should behave like ieq. 120 return x + " ILIKE '" + self.adapter.escape_like(y) + "'" 121 122 def dejavu_year(self, x): 123 return "date_part('year', " + x + ")" 124 125 def dejavu_month(self, x): 126 return "date_part('month', " + x + ")" 127 128 def dejavu_day(self, x): 129 return "date_part('day', " + x + ")" 56 def _del_conn(self, conn): 57 conn.close() 130 58 131 59 132 class PsycoPg IndexSet(geniusql.IndexSet):60 class PsycoPgDatabase(postgres.PgDatabase): 133 61 134 def __delitem__(self, key):135 """Drop the specified index."""136 t = self.table137 t.db.lock("Dropping index. Transactions not allowed.")138 try:139 # PG doesn't use DROP INDEX .. ON ..140 t.db.execute('DROP INDEX %s;' % self[key].qname)141 finally:142 t.db.unlock()143 144 145 class PsycoPgTable(geniusql.Table):146 147 indexsetclass = PsycoPgIndexSet148 149 def _grab_new_ids(self, idkeys, conn):150 newids = {}151 for idkey in idkeys:152 col = self[idkey]153 seq = col.sequence_name154 data, _ = self.db.fetch("SELECT last_value FROM %s;" % seq, conn)155 newids[idkey] = data[0][0]156 return newids157 158 159 class PsycoPgDatabase(geniusql.Database):160 161 sql_name_max_length = 63162 quote_all = True163 poolsize = 10164 encoding = 'SQL_ASCII'165 166 decompiler = PsycoPgDecompiler167 adaptertosql = AdapterToPsycoPg()168 62 adapterfromdb = AdapterFromPsycoPg() 169 tableclass = PsycoPgTable63 connectionmanager = PsycoPgConnectionManager 170 64 171 65 def _get_dbinfo(self, conn=None): … … 180 74 return dbinfo 181 75 182 def _get_tables(self, conn=None): 183 data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE schemaname" 184 " not in ('information_schema', 'pg_catalog')", 185 conn=conn) 186 return [self.tableclass(row[0], self.quote(row[0]), 187 self, created=True) 188 for row in data] 189 190 def _get_table(self, tablename, conn=None): 191 data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE " 192 "tablename = '%s'" % tablename, 193 conn=conn) 194 for name, in data: 195 if name == tablename: 196 return self.tableclass(name, self.quote(name), 197 self, created=True) 198 raise errors.MappingError(tablename) 199 200 def _get_columns(self, tablename, conn=None): 201 # Get the OID of the table 202 data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'" 203 % tablename, conn=conn) 204 table_OID = data[0][0] 205 206 # Get index data so we can set col.key if pg_index.indisprimary 207 data, _ = self.fetch("SELECT indkey FROM pg_index WHERE indrelid " 208 "= %s AND indisprimary" % table_OID, conn=conn) 209 if data: 210 # indkey is an "array" (we get a space-separated string of ints). 211 # These will equal pg_attribute.attnum, below. 212 indices = map(int, data[0][0].split(" ")) 213 else: 214 indices = [] 215 216 # Get column data 217 sql = ("SELECT attname, atttypid, attnum, attlen, atttypmod " 218 "FROM pg_attribute WHERE attisdropped = False AND " 219 "attrelid = %s" % table_OID) 220 data, _ = self.fetch(sql, conn=conn) 221 cols = [] 222 for row in data: 223 name = row[0] 224 if name in ('tableoid', 'cmax', 'xmax', 'cmin', 'xmin', 225 'oid', 'ctid'): 226 # This is a column which PostgreSQL defines automatically 227 continue 228 229 # Data type 230 dbtype, _ = self.fetch("SELECT typname, typlen FROM pg_type " 231 "WHERE oid = %s" % row[1]) 232 if dbtype: 233 dbtype = dbtype[0][0].upper() 234 else: 235 dbtype = None 236 c = geniusql.Column(self.python_type(dbtype), dbtype, 237 None, {}, row[2] in indices, 238 row[0], self.quote(row[0])) 239 240 if dbtype in ('FLOAT4', 'FLOAT8'): 241 c.hints['precision'] = row[3] 242 elif dbtype in ('MONEY', 'NUMERIC'): 243 c.hints['precision'] = (row[4] >> 16) & 65535 244 c.hints['scale'] = (row[4] & 65535) - 4 245 246 # Default value 247 default, _ = self.fetch("SELECT adsrc FROM pg_attrdef " 248 "WHERE adnum = %s AND adrelid = %s" 249 % (row[2], table_OID)) 250 if default: 251 default = default[0][0] 252 if default.startswith("nextval("): 253 # Grab seqname from "nextval(seqname::[text|regclass])" 254 c.autoincrement = True 255 c.sequence_name = seq_name.search(default).group(1) 256 c.initial = self.fetch("SELECT min_value FROM %s" % 257 c.sequence_name)[0][0] 258 c.default = None 259 else: 260 # adsrc is always a string, so we must cast 261 # it using our guessed type. 262 c.default = self.python_type(dbtype)(default) 263 else: 264 c.default = None 265 266 if dbtype.startswith('BPCHAR') or dbtype.startswith('VARCHAR'): 267 # See http://archives.postgresql.org/pgsql-interfaces/2004-07/msg00021.php 268 c.hints['bytes'] = row[4] - 4 269 else: 270 bytes = row[3] 271 if bytes > 0: 272 c.hints['bytes'] = bytes 273 elif dbtype == 'TEXT': 274 c.hints['bytes'] = 0 275 276 cols.append(c) 277 return cols 278 279 def _get_indices(self, tablename, conn=None): 280 # Get the OID of the parent table. 281 data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'" 282 % tablename, conn=conn) 283 if not data: 284 return [] 285 286 table_OID = data[0][0] 287 indices = [] 288 data, _ = self.fetch("SELECT pg_class.relname, indkey, indisprimary, " 289 "indisunique FROM pg_index LEFT JOIN pg_class " 290 "ON pg_index.indexrelid = pg_class.oid WHERE " 291 "pg_index.indrelid = %s" % table_OID, conn=conn) 292 for row in data: 293 # indkey is an "array" (we get a space-separated string of ints). 294 cols = map(int, row[1].split(" ")) 295 for col in cols: 296 d, _ = self.fetch("SELECT attname FROM pg_attribute " 297 "WHERE attrelid = %s AND attnum = %s" 298 % (table_OID, col), conn=conn) 299 i = geniusql.Index(row[0], self.quote(row[0]), tablename, 300 d[0][0], bool(row[3])) 301 indices.append(i) 302 303 return indices 304 305 def python_type(self, dbtype): 306 """Return a Python type which can store values of the given dbtype.""" 307 dbtype = dbtype.upper() 308 if dbtype in ('INT2', 'INT4', 'INTEGER', 'SMALLINT'): 309 return int 310 elif dbtype in ('BOOL', 'BOOLEAN'): 311 return bool 312 elif dbtype in ('INT8', 'BIGINT'): 313 return long 314 elif dbtype in ('FLOAT4', 'FLOAT8', 'MONEY', 'DOUBLE PRECISION', 'REAL'): 315 return float 316 elif dbtype.startswith('NUMERIC'): 317 if typerefs.decimal: 318 return typerefs.decimal.Decimal 319 elif typerefs.fixedpoint: 320 return typerefs.fixedpoint.FixedPoint 321 elif dbtype == 'DATE': 322 return datetime.date 323 elif dbtype in ('TIMESTAMP', 'TIMESTAMPTZ'): 324 return datetime.datetime 325 elif dbtype in ('TIME', 'TIMETZ'): 326 return datetime.time 327 elif dbtype in ('BYTEA'): 328 return str 329 for t in ('CHAR', 'VARCHAR', 'BPCHAR', 'TEXT'): 330 if dbtype.startswith(t): 331 return str 332 333 raise TypeError("Database type %r could not be converted " 334 "to a Python type." % dbtype) 335 336 def columnclause(self, column): 337 """Return a clause for the given column for CREATE or ALTER TABLE. 338 339 This will be of the form "name type [DEFAULT [x | nextval('seq')]]". 340 341 PostgreSQL creates the sequence in a separate statement. 342 """ 343 if column.autoincrement: 344 default = "nextval('%s')" % column.sequence_name 345 else: 346 default = column.default or "" 347 if not isinstance(default, str): 348 default = self.adaptertosql.coerce(default, column.dbtype) 349 350 if default: 351 default = " DEFAULT %s" % default 352 353 return '%s %s%s' % (column.qname, column.dbtype, default) 354 355 def create_sequence(self, table, column): 356 """Create a SEQUENCE for the given column and set its sequence_name.""" 357 sname = column.sequence_name 358 if sname is None: 359 sname = self.quote("%s_%s_seq" % (table.name, column.name)) 360 column.sequence_name = sname 361 self.execute("CREATE SEQUENCE %s START %s;" % (sname, column.initial)) 362 363 def drop_sequence(self, column): 364 """Drop a SEQUENCE for the given column and remove its sequence_name.""" 365 if column.sequence_name is not None: 366 self.execute("DROP SEQUENCE %s;" % column.sequence_name) 367 column.sequence_name = None 368 369 def quote(self, name): 370 if self.quote_all: 371 name = '"' + name.replace('"', '""') + '"' 372 return name 373 374 def sql_name(self, name): 375 name = geniusql.Database.sql_name(self, name) 376 if not self.quote_all: 377 name = name.lower() 378 return name 379 380 default_isolation = "READ COMMITTED" 381 382 def _get_conn(self): 383 try: 384 c = _psycopg.connect(self.Connect) 385 c.set_isolation_level(0) 386 return c 387 except _psycopg.DatabaseError, x: 388 if x.args[0].startswith('could not connect'): 389 raise errors.OutOfConnectionsError() 390 raise 391 392 def _del_conn(self, conn): 393 conn.close() 394 395 def _template_conn(self): 396 atoms = self.Connect.split(" ") 397 tmplconn = "" 398 for atom in atoms: 399 k, v = atom.split("=", 1) 400 if k == 'dbname': v = 'template1' 401 tmplconn += "%s=%s " % (k, v) 402 c = _psycopg.connect(tmplconn) 403 # Allow statements like CREATE DATABASE to run outside a transaction. 404 c.set_isolation_level(0) 405 return c 76 def version(self): 77 c = self.connections._get_conn(master=True) 78 data, _ = self.fetch("SELECT version();", c) 79 v, = data[0] 80 c.close() 81 return "%s\npsycopg version: %s" % (v, _psycopg.__version__) 406 82 407 83 def execute(self, query, conn=None): 408 84 """execute(query, conn=None) -> result set.""" 409 85 if conn is None: 410 conn = self.connection ()86 conn = self.connections.get() 411 87 if isinstance(query, unicode): 412 88 query = query.encode(self.adaptertosql.encoding) … … 421 97 """fetch(query, conn=None) -> rowdata, columns.""" 422 98 if conn is None: 423 conn = self.connection ()99 conn = self.connections.get() 424 100 if isinstance(query, unicode): 425 101 query = query.encode(self.adaptertosql.encoding) … … 435 111 436 112 return data, coldefs 437 438 def create_database(self):439 self.lock("Creating database. Transactions not allowed.")440 try:441 # Must shut down all connections to avoid442 # "being accessed by other users" error.443 self.connection.shutdown()444 445 c = self._template_conn()446 encoding = self.encoding447 if encoding:448 encoding = " WITH ENCODING '%s'" % encoding449 self.execute("CREATE DATABASE %s%s" % (self.qname, encoding), c)450 c.close()451 del c452 self.clear()453 finally:454 self.unlock()455 456 def drop_database(self):457 self.lock("Dropping database. Transactions not allowed.")458 try:459 # Must shut down all connections to avoid460 # "being accessed by other users" error.461 self.connection.shutdown()462 463 c = self._template_conn()464 self.execute("DROP DATABASE %s;" % self.qname, c)465 c.close()466 del c467 self.clear()468 finally:469 self.unlock()470 471 def version(self):472 c = self._template_conn()473 data, _ = self.fetch("SELECT version();", c)474 v, = data[0]475 c.close()476 return "%s\npsycopg version: %s" % (v, _psycopg.__version__)477 113 trunk/geniusql/providers/pypgsql.py
r9 r10 2 2 from pyPgSQL import libpq 3 3 4 import datetime 5 try: 6 import cPickle as pickle 7 except ImportError: 8 import pickle 9 10 import re 11 seq_name = re.compile(r"nextval\('([^:]+)'.*\)") 12 escape_oct = re.compile(r"[\000-\037\177-\377]") 13 replace_oct = lambda m: r"\\%03o" % ord(m.group(0)) 14 unescape_oct = re.compile(r"\\(\d\d\d)") 15 replace_unoct = lambda m: chr(int(m.group(1), 8)) 4 import geniusql 5 from geniusql import errors, typerefs 6 from geniusql.providers import postgres 16 7 17 8 18 import geniusql 19 from geniusql import errors, typerefs 9 class PyPgConnectionManager(geniusql.ConnectionManager): 10 11 default_isolation = "READ COMMITTED" 12 13 def _get_conn(self, master=False): 14 if master: 15 # Must shut down all connections to avoid 16 # "being accessed by other users" error. 17 self.shutdown() 18 19 connstr = "" 20 for atom in self.Connect.split(" "): 21 k, v = atom.split("=", 1) 22 if k == 'dbname': 23 v = 'template1' 24 connstr += "%s=%s " % (k, v) 25 else: 26 connstr = self.Connect 27 28 try: 29 return libpq.PQconnectdb(connstr) 30 except libpq.DatabaseError, x: 31 if x.args[0].startswith('could not connect'): 32 raise errors.OutOfConnectionsError() 33 raise 34 35 def _del_conn(self, conn): 36 conn.finish() 20 37 21 38 22 class AdapterToPgSQL(geniusql.AdapterToSQL):39 class PyPgDatabase(postgres.PgDatabase): 23 40 24 like_escapes = [("%", r"\\%"), ("_", r"\\_")] 25 26 # Do these need to know if "SHOW DateStyle;" != "ISO, MDY" ? 27 def coerce_datetime_datetime_to_any(self, value): 28 return ("'%04d-%02d-%02d %02d:%02d:%02d.%06d'" % 29 (value.year, value.month, value.day, 30 value.hour, value.minute, value.second, 31 value.microsecond)) 32 33 def coerce_datetime_date_to_any(self, value): 34 return "'%04d-%02d-%02d'" % (value.year, value.month, value.day) 35 36 def coerce_datetime_time_to_any(self, value): 37 return ("'%02d:%02d:%02d.%06d'" % 38 (value.hour, value.minute, value.second, value.microsecond)) 39 40 def coerce_any_to_bytea(self, value): 41 # See http://www.postgresql.org/docs/8.1/interactive/datatype-binary.html 42 value = pickle.dumps(value, 2) 43 def repl(char): 44 o = ord(char) 45 if o <= 31 or o == 39 or o == 92 or o >= 127: 46 return r"\\%03d" % int(oct(o)) 47 return char 48 return "'%s'::bytea" % "".join(map(repl, value)) 49 50 def do_pickle(self, value): 51 value = pickle.dumps(value, 2) 52 value = self.coerce_str_to_any(value, skip_encoding=False) 53 return value 54 coerce_dict_to_any = do_pickle 55 coerce_list_to_any = do_pickle 56 coerce_tuple_to_any = do_pickle 57 58 def coerce_str_to_any(self, value, skip_encoding=False): 59 if not skip_encoding and not isinstance(value, str): 60 value = value.encode(self.encoding) 61 for pat, repl in self.escapes: 62 value = value.replace(pat, repl) 63 64 # Escape octal sequences 65 value = escape_oct.sub(replace_oct, value) 66 return "'" + value + "'" 67 68 def coerce_float_to_REAL(self, value): 69 # Use quotes to restrict the value to single precision, so that 70 # comparisons work between existing values and supplied constants. 71 # See http://archives.postgresql.org/pgsql-bugs/2004-02/msg00062.php 72 return "'%r'" % value 73 coerce_float_to_FLOAT4 = coerce_float_to_REAL 74 75 76 class AdapterFromPgSQL(geniusql.AdapterFromDB): 77 78 def coerce_any_to_str(self, value): 79 # Unescape octal sequences 80 value = unescape_oct.sub(replace_unoct, value) 81 if isinstance(value, unicode): 82 return value.encode(self.encoding) 83 else: 84 return str(value) 85 86 87 class PgSQLDecompiler(geniusql.SQLDecompiler): 88 89 def dejavu_icontainedby(self, op1, op2): 90 if isinstance(op1, geniusql.ConstWrapper): 91 # Looking for text in a field. Use ILike (reverse terms). 92 return op2 + " ILIKE '%" + self.adapter.escape_like(op1) + "%'" 93 else: 94 # Looking for field in (a, b, c). 95 # Force all args to lowercase for case-insensitive comparison. 96 atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue] 97 return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms)) 98 99 def dejavu_istartswith(self, x, y): 100 return x + " ILIKE '" + self.adapter.escape_like(y) + "%'" 101 102 def dejavu_iendswith(self, x, y): 103 return x + " ILIKE '%" + self.adapter.escape_like(y) + "'" 104 105 def dejavu_ieq(self, x, y): 106 # ILIKE with no wildcards should behave like ieq. 107 return x + " ILIKE '" + self.adapter.escape_like(y) + "'" 108 109 def dejavu_year(self, x): 110 return "date_part('year', " + x + ")" 111 112 def dejavu_month(self, x): 113 return "date_part('month', " + x + ")" 114 115 def dejavu_day(self, x): 116 return "date_part('day', " + x + ")" 117 118 119 class PgIndexSet(geniusql.IndexSet): 120 121 def __delitem__(self, key): 122 """Drop the specified index.""" 123 t = self.table 124 t.db.lock("Dropping index. Transactions not allowed.") 125 try: 126 # PG doesn't use DROP INDEX .. ON .. 127 t.db.execute('DROP INDEX %s;' % self[key].qname) 128 finally: 129 t.db.unlock() 130 131 132 class PgTable(geniusql.Table): 133 134 indexsetclass = PgIndexSet 135 136 def _grab_new_ids(self, idkeys, conn): 137 newids = {} 138 for idkey in idkeys: 139 col = self[idkey] 140 seq = col.sequence_name 141 data, _ = self.db.fetch("SELECT last_value FROM %s;" % seq, conn) 142 newids[idkey] = data[0][0] 143 return newids 144 145 146 class PgDatabase(geniusql.Database): 147 148 sql_name_max_length = 63 149 quote_all = True 150 poolsize = 10 151 encoding = 'SQL_ASCII' 152 153 decompiler = PgSQLDecompiler 154 adaptertosql = AdapterToPgSQL() 155 adapterfromdb = AdapterFromPgSQL() 156 tableclass = PgTable 41 connectionmanager = PyPgConnectionManager 157 42 158 43 def _get_dbinfo(self, conn=None): … … 166 51 raise 167 52 return dbinfo 168 169 def _get_tables(self, conn=None):170 data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE schemaname"171 " not in ('information_schema', 'pg_catalog')",172 conn=conn)173 return [self.tableclass(row[0], self.quote(row[0]),174 self, created=True)175 for row in data]176 177 def _get_table(self, tablename, conn=None):178 data, _ = self.fetch("SELECT tablename FROM pg_tables WHERE "179 "tablename = '%s'" % tablename,180 conn=conn)181 for name, in data:182 if name == tablename:183 return self.tableclass(name, self.quote(name),184 self, created=True)185 raise errors.MappingError(tablename)186 187 def _get_columns(self, tablename, conn=None):188 # Get the OID of the table189 data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'"190 % tablename, conn=conn)191 table_OID = data[0][0]192 193 # Get index data so we can set col.key if pg_index.indisprimary194 data, _ = self.fetch("SELECT indkey FROM pg_index WHERE indrelid "195 "= %s AND indisprimary" % table_OID, conn=conn)196 if data:197 # indkey is an "array" (we get a space-separated string of ints).198 # These will equal pg_attribute.attnum, below.199 indices = map(int, data[0][0].split(" "))200 else:201 indices = []202 203 # Get column data204 sql = ("SELECT attname, atttypid, attnum, attlen, atttypmod "205 "FROM pg_attribute WHERE attisdropped = False AND "206 "attrelid = %s" % table_OID)207 data, _ = self.fetch(sql, conn=conn)208 cols = []209 for row in data:210 name = row[0]211 if name in ('tableoid', 'cmax', 'xmax', 'cmin', 'xmin',212 'oid', 'ctid'):213 # This is a column which PostgreSQL defines automatically214 continue215 216 # Data type217 dbtype, _ = self.fetch("SELECT typname, typlen FROM pg_type "218 "WHERE oid = %s" % row[1])219 if dbtype:220 dbtype = dbtype[0][0].upper()221 else:222 dbtype = None223 c = geniusql.Column(self.python_type(dbtype), dbtype,224 None, {}, row[2] in indices,225 row[0], self.quote(row[0]))226 227 if dbtype in ('FLOAT4', 'FLOAT8'):228 c.hints['precision'] = row[3]229 elif dbtype in ('MONEY', 'NUMERIC'):230 c.hints['precision'] = (row[4] >> 16) & 65535231 c.hints['scale'] = (row[4] & 65535) - 4232 233 # Default value234 default, _ = self.fetch("SELECT adsrc FROM pg_attrdef "235 "WHERE adnum = %s AND adrelid = %s"236 % (row[2], table_OID))237 if default:238 default = default[0][0]239 if default.startswith("nextval("):240 # Grab seqname from "nextval(seqname::[text|regclass])"241 c.autoincrement = True242 c.sequence_name = seq_name.search(default).group(1)243 c.initial = self.fetch("SELECT min_value FROM %s" %244 c.sequence_name)[0][0]245 c.default = None246 else:247 # adsrc is always a string, so we must cast248 # it using our guessed type.249 c.default = self.python_type(dbtype)(default)250 else:251 c.default = None252 253 if dbtype.startswith('BPCHAR') or dbtype.startswith('VARCHAR'):254 # See http://archives.postgresql.org/pgsql-interfaces/2004-07/msg00021.php255 c.hints['bytes'] = row[4] - 4256 else:257 bytes = row[3]258 if bytes > 0:259 c.hints['bytes'] = bytes260 elif dbtype == 'TEXT':261 c.hints['bytes'] = 0262 263 cols.append(c)264 return cols265 266 def _get_indices(self, tablename, conn=None):267 # Get the OID of the parent table.268 data, _ = self.fetch("SELECT oid FROM pg_class WHERE relname = '%s'"269 % tablename, conn=conn)270 if not data:271 return []272 273 table_OID = data[0][0]274 indices = []275 data, _ = self.fetch("SELECT pg_class.relname, indkey, indisprimary, "276 "indisunique FROM pg_index LEFT JOIN pg_class "277 "ON pg_index.indexrelid = pg_class.oid WHERE "278 "pg_index.indrelid = %s" % table_OID, conn=conn)279 for row in data:280 # indkey is an "array" (we get a space-separated string of ints).281 cols = map(int, row[1].split(" "))282 for col in cols:283 d, _ = self.fetch("SELECT attname FROM pg_attribute "284 "WHERE attrelid = %s AND attnum = %s"285 % (table_OID, col), conn=conn)286 i = geniusql.Index(row[0], self.quote(row[0]), tablename,287 d[0][0], bool(row[3]))288 indices.append(i)289 290 return indices291 292 def python_type(self, dbtype):293 """Return a Python type which can store values of the given dbtype."""294 dbtype = dbtype.upper()295 if dbtype in ('INT2', 'INT4', 'INTEGER', 'SMALLINT'):296 return int297 elif dbtype in ('BOOL', 'BOOLEAN'):298 return bool299 elif dbtype in ('INT8', 'BIGINT'):300 return long301 elif dbtype in ('FLOAT4', 'FLOAT8', 'MONEY', 'DOUBLE PRECISION', 'REAL'):302 return float303 elif dbtype.startswith('NUMERIC'):304 if typerefs.decimal:305 return typerefs.decimal.Decimal306 elif typerefs.fixedpoint:307 return typerefs.fixedpoint.FixedPoint308 elif dbtype == 'DATE':309 return datetime.date310 elif dbtype in ('TIMESTAMP', 'TIMESTAMPTZ'):311 return datetime.datetime312 elif dbtype in ('TIME', 'TIMETZ'):313 return datetime.time314 elif dbtype in ('BYTEA'):315 return str316 for t in ('CHAR', 'VARCHAR', 'BPCHAR', 'TEXT'):317 if dbtype.startswith(t):318 return str319 320 raise TypeError("Database type %r could not be converted "321 "to a Python type." % dbtype)322 323 def columnclause(self, column):324 """Return a clause for the given column for CREATE or ALTER TABLE.325 326 This will be of the form "name type [DEFAULT [x | nextval('seq')]]".327 328 PostgreSQL creates the sequence in a separate statement.329 """330 if column.autoincrement:331 default = "nextval('%s')" % column.sequence_name332 else:333 default = column.default or ""334 if not isinstance(default, str):335 default = self.adaptertosql.coerce(default, column.dbtype)336 337 if default:338 default = " DEFAULT %s" % default339 340 return '%s %s%s' % (column.qname, column.dbtype, default)341 342 def create_sequence(self, table, column):343 """Create a SEQUENCE for the given column and set its sequence_name."""344 sname = column.sequence_name345 if sname is None:346 sname = self.quote("%s_%s_seq" % (table.name, column.name))347 column.sequence_name = sname348 self.execute("CREATE SEQUENCE %s START %s;" % (sname, column.initial))349 350 def drop_sequence(self, column):351 """Drop a SEQUENCE for the given column and remove its sequence_name."""352 if column.sequence_name is not None:353 self.execute("DROP SEQUENCE %s;" % column.sequence_name)354 column.sequence_name = None355 356 def quote(self, name):357 if self.quote_all:358 name = '"' + name.replace('"', '""') + '"'359 return name360 361 def sql_name(self, name):362 name = geniusql.Database.sql_name(self, name)363 if not self.quote_all:364 name = name.lower()365 return name366 367 default_isolation = "READ COMMITTED"368 369 def _get_conn(self):370 try:371 return libpq.PQconnectdb(self.Connect)372 except libpq.DatabaseError, x:373 if x.args[0].startswith('could not connect'):374 raise errors.OutOfConnectionsError()375 raise376 377 def _del_conn(self, conn):378 conn.finish()379 380 def _template_conn(self):381 atoms = self.Connect.split(" ")382 tmplconn = ""383 for atom in atoms:384 k, v = atom.split("=", 1)385 if k == 'dbname': v = 'template1'386 tmplconn += "%s=%s " % (k, v)387 return libpq.PQconnectdb(tmplconn)388 53 389 54 def fetch(self, query, conn=None): … … 402 67 return data, columns 403 68 404 def create_database(self):405 self.lock("Creating database. Transactions not allowed.")406 try:407 c = self._template_conn()408 encoding = self.encoding409 if encoding:410 encoding = " WITH ENCODING '%s'" % encoding411 self.execute("CREATE DATABASE %s%s" % (self.qname, encoding), c)412 c.finish()413 self.clear()414 finally:415 self.unlock()416 417 def drop_database(self):418 self.lock("Dropping database. Transactions not allowed.")419 try:420 # Must shut down all connections to avoid421 # "being accessed by other users" error.422 self.connection.shutdown()423 424 c = self._template_conn()425 self.execute("DROP DATABASE %s;" % self.qname, c)426 c.finish()427 self.clear()428 finally:429 self.unlock()430 431 69 def version(self): 432 c = self. _template_conn()70 c = self.connections._get_conn(master=True) 433 71 v = c.version 434 c.finish()72 self.connections._del_conn(c) 435 73 return v 436 74 trunk/geniusql/providers/sqlite.py
r9 r10 56 56 return '1' 57 57 return '0' 58 59 60 class AdapterToSQLiteTypeless(AdapterToSQLite): 61 62 def cast_any_to_object(self, colref): 63 return colref 58 64 59 65 … … 318 324 db.create_sequence(self, column) 319 325 320 db.lock("Adding property. Transactions not allowed.")321 try:322 # Make a temporary copy.323 tempkey, temptable = self._temp_copy()324 # Add the new column to the copy.325 dict.__setitem__(temptable, key, column)326 # Bind the temp table to the DB.327 db[tempkey] = temptable328 329 # Copy data from the old table to the temp table.330 selfields = []331 for k, c in temptable.iteritems():332 qname = c.qname333 if k == key:334 # This is a new column. Populate with NULL.335 qname = "NULL AS %s" % qname336 selfields.append(qname)337 db.execute("INSERT INTO %s SELECT %s FROM %s;" %338 (temptable.qname, ", ".join(selfields), self.qname))339 340 # Copy data from the temp table to a new table for self.341 self._copy_from_temp(temptable, self._parent_key(), tempkey)342 finally:343 db.unlock()344 345 def __delitem__(self, key):346 if key in self.indices:347 del self.indices[key]348 349 if not self.created:350 dict.__delitem__(self, key)351 return352 353 db = self.db354 db.lock("Dropping property. Transactions not allowed.")355 try:356 column = self[key]357 358 326 # Make a temporary copy. 359 327 tempkey, temptable = self._temp_copy() 360 # Drop the column fromthe copy.361 dict.__ delitem__(temptable, key)328 # Add the new column to the copy. 329 dict.__setitem__(temptable, key, column) 362 330 # Bind the temp table to the DB. 363 331 db[tempkey] = temptable … … 367 335 for k, c in temptable.iteritems(): 368 336 qname = c.qname 337 if k == key: 338 # This is a new column. Populate with NULL. 339 qname = "NULL AS %s" % qname 369 340 selfields.append(qname) 370 db.execute("INSERT INTO %s SELECT %s FROM %s;" % 341 db.execute_ddl("INSERT INTO %s SELECT %s FROM %s;" % 342 (temptable.qname, ", ".join(selfields), self.qname)) 343 344 # Copy data from the temp table to a new table for self. 345 self._copy_from_temp(temptable, self._parent_key(), tempkey) 346 347 def __delitem__(self, key): 348 if key in self.indices: 349 del self.indices[key] 350 351 if not self.created: 352 dict.__delitem__(self, key) 353 return 354 355 db = self.db 356 column = self[key] 357 358 # Make a temporary copy. 359 tempkey, temptable = self._temp_copy() 360 # Drop the column from the copy. 361 dict.__delitem__(temptable, key) 362 # Bind the temp table to the DB. 363 db[tempkey] = temptable 364 365 # Copy data from the old table to the temp table. 366 selfields = [] 367 for k, c in temptable.iteritems(): 368 qname = c.qname 369 selfields.append(qname) 370 db.execute_ddl("INSERT INTO %s SELECT %s FROM %s;" % 371 371 (temptable.qname, ", ".join(selfields), self.qname)) 372 373 self._copy_from_temp(temptable, self._parent_key(), tempkey) 374 375 if column.autoincrement: 376 # This may or may not be a no-op, depending on the DB. 377 db.drop_sequence(column) 378 finally: 379 db.unlock() 372 373 self._copy_from_temp(temptable, self._parent_key(), tempkey) 374 375 if column.autoincrement: 376 # This may or may not be a no-op, depending on the DB. 377 db.drop_sequence(column) 380 378 381 379 def rename(self, oldkey, newkey): … … 387 385 388 386 if oldname != newname: 389 db.lock("Renaming property. Transactions not allowed.") 390 try: 391 dict.__delitem__(self, oldkey) 392 dict.__setitem__(self, newkey, oldcol) 393 oldcol.name = newname 394 oldcol.qname = db.quote(newname) 395 396 # Make a temporary copy. 397 tempkey, temptable = self._temp_copy() 398 # Bind the temp table to the DB. 399 db[tempkey] = temptable 400 401 # Copy data from the old table to the temp table. 402 selfields = [] 403 for k, c in temptable.iteritems(): 404 qname = c.qname 405 if k == newkey: 406 qname = "%s AS %s" % (db.quote(oldname), qname) 407 selfields.append(qname) 408 db.execute("INSERT INTO %s SELECT %s FROM %s;" % 387 dict.__delitem__(self, oldkey) 388 dict.__setitem__(self, newkey, oldcol) 389 oldcol.name = newname 390 oldcol.qname = db.quote(newname) 391 392 # Make a temporary copy. 393 tempkey, temptable = self._temp_copy() 394 # Bind the temp table to the DB. 395 db[tempkey] = temptable 396 397 # Copy data from the old table to the temp table. 398 selfields = [] 399 for k, c in temptable.iteritems(): 400 qname = c.qname 401 if k == newkey: 402 qname = "%s AS %s" % (db.quote(oldname), qname) 403 selfields.append(qname) 404 db.execute_ddl("INSERT INTO %s SELECT %s FROM %s;" % 409 405 (temptable.qname, ", ".join(selfields), self.qname)) 410 411 self._copy_from_temp(temptable, self._parent_key(), tempkey) 412 finally: 413 db.unlock() 406 407 self._copy_from_temp(temptable, self._parent_key(), tempkey) 414 408 415 409 def _grab_new_ids(self, idkeys, conn): … … 479 473 480 474 475 class SQLiteConnectionManager(geniusql.ConnectionManager): 476 477 default_isolation = "SERIALIZABLE" 478 isolation_levels = ["SERIALIZABLE"] 479 480 def __init__(self, db, poolsize=10): 481 self.transactions = {} 482 self.db = db 483 self.poolsize = poolsize 484 if self.db.name == ":memory:": 485 # "Multiple connections to ":memory:" within a single process 486 # create a fresh database each time" 487 # http://www.sqlite.org/cvstrac/wiki?p=InMemoryDatabase 488 # So we need to give :memory: databases a SingleConnection. 489 self._factory = geniusql.SingleConnection(self._get_conn, self._del_conn) 490 else: 491 return geniusql.ConnectionManager.__init__(self, db, poolsize) 492 493 if _cursor_required: 494 def _get_conn(self): 495 # SQLite should create the DB if missing. 496 # valid _sqlite3 kwargs: "database", "timeout", "detect_types", 497 # "isolation_level", "check_same_thread", "factory", 498 # "cached_statements". 499 # Instead of "timeout", we re-use the old 500 # deadlock_timeout code inside execute. 501 conn = _sqlite.connect(database=self.db.name, check_same_thread=False) 502 ## # None sets autocommit mode on. 503 ## conn.isolation_level = None 504 conn.text_factory = str 505 return conn.cursor() 506 else: 507 def _get_conn(self): 508 return _sqlite.connect(self.db.name, self.db.mode) 509 510 def isolate(self, conn, isolation=None): 511 """Set the isolation level of the given connection. 512 513 If 'isolation' is None, our default_isolation will be used for new 514 connections. Valid values for the 'isolation' argument may be native 515 values for your particular database. However, it is recommended you 516 pass items from the global 'levels' list instead; these will be 517 automatically replaced with native values. 518 519 For many databases, this must be executed after START TRANSACTION. 520 """ 521 if isolation is None: 522 isolation = self.default_isolation 523 524 if isinstance(isolation, geniusql.IsolationLevel): 525 # Map the given IsolationLevel object to a native value. 526 # This base class uses the four ANSI names as native values. 527 isolation = isolation.name 528 529 if isolation not in self.isolation_levels: 530 raise ValueError("IsolationLevel %r not allowed by %s. " 531 "Try one of %r instead." 532 % (isolation, self.__class__.__name__, 533 self.isolation_levels)) 534 535 # Nothing to do here, since we only allow one level. 536 pass 537 538 def start(self, isolation=None): 539 """Start a transaction.""" 540 conn = self.get(started=True) 541 self.db.execute("BEGIN;", conn) 542 self.isolate(conn, isolation) 543 544 def rollback(self): 545 """Roll back the current transaction.""" 546 key = self.id() 547 if key in self.transactions: 548 self.db.execute("ROLLBACK;", self.transactions[key]) 549 del self.transactions[key] 550 551 def commit(self): 552 """Commit the current transaction.""" 553 key = self.id() 554 if key in self.transactions: 555 self.db.execute("COMMIT;", self.transactions[key]) 556 del self.transactions[key] 557 558 481 559 class SQLiteDatabase(geniusql.Database): 482 560 … … 488 566 adapterfromdb = AdapterFromSQLite() 489 567 typeadapter = TypeAdapterSQLite() 568 connectionmanager = SQLiteConnectionManager 490 569 491 570 tableclass = SQLiteTable … … 512 591 513 592 def _get_tables(self, conn=None): 514 data, _ = self.fetch("SELECT name FROM sqlite_master WHERE type = 'table';") 593 data, _ = self.fetch("SELECT name FROM sqlite_master " 594 "WHERE type = 'table';", conn) 515 595 # Note that we set Table.created here, since these already exist in the DB. 516 596 return [self.tableclass(row[0], self.quote(row[0]), … … 519 599 520 600 def _get_table(self, tablename, conn=None): 521 data, _ = self.fetch("SELECT name FROM sqlite_master WHERE "522 " name = '%s' AND type = 'table';" % tablename)601 data, _ = self.fetch("SELECT name FROM sqlite_master WHERE name = " 602 "'%s' AND type = 'table';" % tablename, conn) 523 603 # Note that we set Table.created here, since these already exist in the DB. 524 604 for name, in data: … … 568 648 def _get_indices(self, tablename, conn=None): 569 649 data, _ = self.fetch("SELECT name, tbl_name, sql FROM sqlite_master " 570 "WHERE type = 'index';" )650 "WHERE type = 'index';", conn) 571 651 indices = [] 572 652 for row in data: … … 670 750 pk = "" 671 751 672 self.lock("Creating storage. Transactions not allowed.") 673 try: 674 self.execute('CREATE TABLE %s (%s%s);' % 752 self.execute_ddl('CREATE TABLE %s (%s%s);' % 675 753 (table.qname, ", ".join(fields), pk)) 676 677 for index in table.indices.itervalues():678 self.execute('CREATE INDEX %s ON %s (%s);' %754 755 for index in table.indices.itervalues(): 756 self.execute_ddl('CREATE INDEX %s ON %s (%s);' % 679 757 (index.qname, table.qname, 680 758 self.quote(index.colname))) 681 682 if autoincr_col: 683 self.create_sequence(table, autoincr_col) 684 685 dict.__setitem__(self, key, table) 686 finally: 687 self.unlock() 759 760 if autoincr_col: 761 self.create_sequence(table, autoincr_col) 762 763 dict.__setitem__(self, key, table) 688 764 689 765 def _rename(self, oldtable, newtable): 690 766 if _rename_table_support: 691 self.execute ("ALTER TABLE %s RENAME TO %s" %692 (oldtable.qname, newtable.qname))767 self.execute_ddl("ALTER TABLE %s RENAME TO %s" % 768 (oldtable.qname, newtable.qname)) 693 769 else: 694 770 raise NotImplementedError … … 710 786 return "[" + name + "]" 711 787 712 def connect(self): 713 if self.name == ":memory:": 714 # "Multiple connections to ":memory:" within a single process 715 # create a fresh database each time" 716 # http://www.sqlite.org/cvstrac/wiki?p=InMemoryDatabase 717 # So we need to give :memory: databases a SingleConnection. 718 self.connection = geniusql.SingleConnection(self._get_conn, self._del_conn) 719 else: 720 return geniusql.Database.connect(self) 721 722 if _cursor_required: 723 def _get_conn(self): 724 # SQLite should create the DB if missing. 725 # valid _sqlite3 kwargs: "database", "timeout", "detect_types", 726 # "isolation_level", "check_same_thread", "factory", 727 # "cached_statements". 728 # Instead of "timeout", we re-use the old 729 # deadlock_timeout code inside execute. 730 conn = _sqlite.connect(database=self.name, check_same_thread=False) 731 ## # None sets autocommit mode on. 732 ## conn.isolation_level = None 733 conn.text_factory = str 734 return conn.cursor() 735 else: 736 def _get_conn(self): 737 return _sqlite.connect(self.name, self.mode) 738 739 create_database = _get_conn 788 def create_database(self): 789 self.connections.get() 740 790 741 791 def drop_database(self): 742 self. disconnect()792 self.connections.shutdown() 743 793 if self.name != ":memory:": 744 794 # This should accept relative or absolute paths … … 746 796 self.clear() 747 797 748 749 # Transactions # 750 751 default_isolation = "SERIALIZABLE" 752 isolation_levels = ["SERIALIZABLE"] 753 754 def is_lock_error(self, exc): 798 def is_timeout_error(self, exc): 755 799 if not isinstance(exc, _sqlite.OperationalError): 756 800 return False 757 801 return exc.args[0] == 'database is locked' 758 802 759 def isolate(self, conn, isolation=None):760 """Set the isolation level of the given connection.761 762 If 'isolation' is None, our default_isolation will be used for new763 connections. Valid values for the 'isolation' argument may be native764 values for your particular database. However, it is recommended you765 pass items from the global 'levels' list instead; these will be766 automatically replaced with native values.767 768 For many databases, this must be executed after START TRANSACTION.769 """770 if isolation is None:771 isolation = self.default_isolation772 773 if isinstance(isolation, geniusql.IsolationLevel):774 # Map the given IsolationLevel object to a native value.775 # This base class uses the four ANSI names as native values.776 isolation = isolation.name777 778 if isolation not in self.isolation_levels:779 raise ValueError("IsolationLevel %r not allowed by %s. "780 "Try one of %r instead."781 % (isolation, self.__class__.__name__,782 self.isolation_levels))783 784 # Nothing to do here, since we only allow one level.785 pass786 787 def start(self, isolation=None):788 """Start a transaction."""789 conn = self.get_transaction(new=True)790 self.execute("BEGIN;", conn)791 self.isolate(conn, isolation)792 793 def rollback(self):794 """Roll back the current transaction."""795 key = self.transaction_key()796 if key in self.transactions:797 self.execute("ROLLBACK;", self.transactions[key])798 del self.transactions[key]799 800 def commit(self):801 """Commit the current transaction."""802 key = self.transaction_key()803 if key in self.transactions:804 self.execute("COMMIT;", self.transactions[key])805 del self.transactions[key]806 807 803 deadlock_timeout = 20 808 804 … … 810 806 try: 811 807 if conn is None: 812 conn = self.connection ()808 conn = self.connections.get() 813 809 if isinstance(query, unicode): 814 810 query = query.encode(self.adaptertosql.encoding) … … 822 818 if ((msg.startswith("no such") or 823 819 msg == "database schema has changed")): 824 tx = self.transactions.get(self.transaction_key()) 825 if tx is None or isinstance(tx, errors.TransactionLock): 820 if not self.connections.in_transaction(): 826 821 # Bah. Shut down all connections and get a new one, 827 822 # since some previous connection changed the schema. 828 self.connection .shutdown()829 conn = self.connection ()823 self.connections.shutdown() 824 conn = self.connections._factory() 830 825 continue 831 if self.is_ lock_error(x) and self.deadlock_timeout:826 if self.is_timeout_error(x) and self.deadlock_timeout: 832 827 if time.time() - start < self.deadlock_timeout: 833 828 time.sleep(0.000001) trunk/geniusql/select.py
r9 r10 2 2 3 3 from geniusql import errors 4 5 4 from dejavu import logic, codewalk 6 5 … … 275 274 276 275 def _compare_constants(self, op1, op2): 277 """Coerce/cast compared types (or mark imperfect).""" 276 """Coerce/cast compared types. 277 278 If a column value is compared to a constant and no coerce or cast 279 adapter function is available, a TypeError is raised. 280 """ 278 281 col = getattr(op1, "col", None) 279 282 if col: trunk/geniusql/test/zoo_fixture.py
r9 r10 1111 1111 except (AttributeError, NotImplementedError): 1112 1112 pass 1113 db. disconnect()1113 db.connections.shutdown() 1114 1114 1115 1115 def run(DB_class, name, opts):
