Changeset 201
- Timestamp:
- 03/20/06 06:48:36
- Files:
-
- trunk/arenas.py (modified) (2 diffs)
- trunk/storage/db.py (modified) (8 diffs)
- trunk/storage/storeado.py (modified) (6 diffs)
- trunk/storage/storefirebird.py (modified) (3 diffs)
- trunk/storage/storemysql.py (modified) (2 diffs)
- trunk/storage/storeodbc.py (modified) (1 diff)
- trunk/storage/storepypgsql.py (modified) (4 diffs)
- trunk/storage/storesqlite.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/arenas.py
r200 r201 8 8 9 9 __all__ = ['Arena', 'Sandbox', 10 'LOG CONN', 'LOGFORGET', 'LOGMEMORIZE', 'LOGRECALL',10 'LOGFORGET', 'LOGMEMORIZE', 'LOGRECALL', 11 11 'LOGREPRESS', 'LOGSANDBOX', 'LOGSQL', 'LOGVIEW', 12 12 ] … … 15 15 # logging flags (see Arena.logflags) 16 16 LOGSQL = 4 17 LOGCONN = 818 17 19 18 LOGMEMORIZE = 128 trunk/storage/db.py
r200 r201 60 60 61 61 import dejavu 62 from dejavu import codewalk, logic, storage, LOG CONN, LOGSQL, xray62 from dejavu import codewalk, logic, storage, LOGSQL, xray 63 63 64 64 … … 733 733 734 734 735 # ------------------------- Connection Factories ------------------------- # 736 737 735 738 class ConnectionWrapper(object): 736 739 """Connection object wrapper, so it can be used as a weak reference.""" … … 741 744 def __getattr__(self, attr): 742 745 return getattr(self.conn, attr) 746 747 748 class OutOfConnectionsError(dejavu.DejavuError): 749 """Exception raised when a database store has run out of connections.""" 750 pass 751 752 753 class ConnectionFactory(object): 754 """A connection factory which creates a new connection for each request.""" 755 756 def __init__(self, open, close, retry=5): 757 self.open = open 758 self.close = close 759 self.retry = retry 760 self.refs = {} 761 762 def __call__(self): 763 """Return a connection.""" 764 for i in xrange(self.retry): 765 try: 766 conn = self.open() 767 w = ConnectionWrapper(conn) 768 self.refs[weakref.ref(w, self._release)] = w.conn 769 return w 770 except OutOfConnectionsError: 771 time.sleep(i + 1) 772 conn = None 773 raise OutOfConnectionsError() 774 775 def _release(self, ref): 776 """Release a connection.""" 777 self.close(self.refs.pop(ref)) 778 779 def shutdown(self): 780 """Release all database connections.""" 781 # Empty self.refs. 782 while self.refs: 783 ref, conn = self.refs.popitem() 784 self.close(conn) 785 786 787 class ConnectionPool(object): 788 """A database connection factory which keeps a pool of connections.""" 789 790 def __init__(self, open, close, size=10, retry=5): 791 self.open = open 792 self.close = close 793 self.refs = {} 794 self.pool = Queue.Queue(size) 795 self.retry = retry 796 797 def __call__(self): 798 """Return a connection from the pool.""" 799 for i in xrange(self.retry): 800 try: 801 conn = self.pool.get_nowait() 802 # Okay, this is freaky. If we wrap here, all goes well. 803 # If we wrap on Queue.put(), mysql crashes after 1700 804 # or so inserts (when migrating Access tables to MySQL). 805 # Go figure. 806 w = ConnectionWrapper(conn) 807 self.refs[weakref.ref(w, self._release)] = w.conn 808 return w 809 except Queue.Empty: 810 pass 811 812 try: 813 conn = self.open() 814 w = ConnectionWrapper(conn) 815 self.refs[weakref.ref(w, self._release)] = w.conn 816 return w 817 except OutOfConnectionsError: 818 time.sleep(i + 1) 819 conn = None 820 raise OutOfConnectionsError() 821 822 def _release(self, ref): 823 """Release a connection.""" 824 conn = self.refs.pop(ref) 825 try: 826 self.pool.put_nowait(conn) 827 return 828 except Queue.Full: 829 pass 830 self.close(conn) 831 832 def shutdown(self): 833 """Release all database connections.""" 834 # Empty the pool. 835 while True: 836 try: 837 self.pool.get(True, 0.5) 838 except Queue.Empty: 839 break 840 841 # Empty self.refs. 842 while self.refs: 843 ref, conn = self.refs.popitem() 844 self.close(conn) 845 846 847 class SingleConnection(object): 848 """A single database connection for all consumers. 849 850 Use this when your database cannot handle multiple connections at once, 851 but can handle multiple threads using the same connection. 852 """ 853 854 def __init__(self, open, close): 855 self.open = open 856 self.close = close 857 # Delay opening the connection, because the 858 # SM may need to create the database first. 859 self._conn = None 860 861 def __call__(self): 862 """Return our only connection.""" 863 if self._conn is None: 864 self._conn = self.open() 865 return self._conn 866 867 def shutdown(self): 868 """Release all database connections.""" 869 if self._conn is not None: 870 self.close(self._conn) 871 872 873 # --------------------------- Storage Manager --------------------------- # 743 874 744 875 … … 795 926 sql_name_max_length = 64 796 927 sql_name_caseless = False 797 close_connection_method = 'close'798 928 use_asterisk_to_get_all = False 799 929 … … 802 932 toAdapter = AdapterToSQL() 803 933 fromAdapter = AdapterFromDB() 804 debug_connections = False805 934 806 935 def __init__(self, name, arena, allOptions={}): … … 821 950 if adapter: self.fromAdapter = adapter 822 951 823 self.pool_size = int(allOptions.get('Pool Size', '10')) 824 825 self.refs = {} 826 if self.pool_size > 0: 827 self.pool = Queue.Queue(self.pool_size) 828 else: 829 self.pool = None 830 self.retry = 5 831 self.threaded = True 952 size = int(allOptions.get('Pool Size', '10')) 953 if size > 0: 954 self.connection = ConnectionPool(self._get_conn, self._del_conn, size) 955 else: 956 self.connection = ConnectionFactory(self._get_conn, self._del_conn) 832 957 833 958 self.prefix = allOptions.get('Prefix', "djv") … … 880 1005 raise NotImplementedError 881 1006 882 def connection(self): 883 """Return a connection from the pool.""" 884 if not self.threaded: 885 # Place a single 'conn' entry in self.refs. 886 try: 887 return self.refs['conn'] 888 except KeyError: 889 self.refs['conn'] = conn = self._get_conn() 890 return conn 891 892 retry = 0 893 while True: 894 if self.pool is not None: 895 try: 896 conn = self.pool.get_nowait() 897 # Okay, this is freaky. If we wrap here, all goes well. 898 # If we wrap on Queue.put(), mysql crashes after 1700 899 # or so inserts (when migrating Access tables to MySQL). 900 # Go figure. 901 w = ConnectionWrapper(conn) 902 self.refs[weakref.ref(w, self.release)] = w.conn 903 self.arena.log("-->get %s" % self.__class__.__name__, 904 LOGCONN) 905 return w 906 except Queue.Empty: 907 pass 908 909 try: 910 conn = self._get_conn() 911 w = ConnectionWrapper(conn) 912 self.refs[weakref.ref(w, self.release)] = w.conn 913 self.arena.log("create %s" % self.__class__.__name__, LOGCONN) 914 return w 915 except OutOfConnectionsError: 916 retry += 1 917 if retry < self.retry: 918 time.sleep(retry * 1) 919 conn = None 920 continue 921 raise OutOfConnectionsError() 922 923 def release(self, ref): 924 # This method should only be called if self.threaded is True 925 conn = self.refs.pop(ref) 926 927 if self.pool is not None: 928 try: 929 self.pool.put_nowait(conn) 930 self.arena.log("<--put %s" % self.__class__.__name__, LOGCONN) 931 return 932 except Queue.Full: 933 pass 934 935 getattr(conn, self.close_connection_method)() 936 self.arena.log("___close___ %s" % self.__class__.__name__, LOGCONN) 1007 def _del_conn(self, conn): 1008 # Override this with the close call (if any) for your DB. 1009 conn.close() 937 1010 938 1011 def shutdown(self): 939 1012 """Release all database connections.""" 940 # Empty the pool. 941 if self.pool: 942 while True: 943 try: 944 self.pool.get(True, 0.5) 945 except Queue.Empty: 946 break 947 948 # Empty self.refs. 949 while self.refs: 950 ref, conn = self.refs.popitem() 951 getattr(conn, self.close_connection_method)() 1013 self.connection.shutdown() 952 1014 953 1015 def select(self, cls, expr, fields=None, distinct=False): … … 1456 1518 repr(self.pk), repr(self.unique))) 1457 1519 1458 1459 class OutOfConnectionsError(dejavu.DejavuError):1460 """Exception raised when a database store has run out of connections."""1461 pass1462 trunk/storage/storeado.py
r200 r201 353 353 """ 354 354 355 close_connection_method = 'Close'356 355 decompiler = ADOSQLDecompiler 357 356 fromAdapter = AdapterFromADO() … … 374 373 conn.Open(self.connstring) 375 374 return conn 375 376 def _del_conn(self, conn): 377 conn.Close() 376 378 377 379 def execute(self, query, conn=None): … … 399 401 self.arena.log(query, dejavu.LOGSQL) 400 402 res = win32com.client.Dispatch(r'ADODB.Recordset') 401 if self.threaded:402 # 'conn' willbe a ConnectionWrapper object, which .Open403 if hasattr(conn, "conn"): 404 # 'conn' is be a ConnectionWrapper object, which .Open 403 405 # won't accept. Pass the unwrapped connection instead. 404 406 res.Open(query, conn.conn, adOpenForwardOnly, adLockReadOnly) … … 661 663 662 664 def __init__(self, name, arena, allOptions={}): 663 db.StorageManagerDB.__init__(self, name, arena, allOptions)664 665 665 self.connstring = allOptions['Connect'] 666 666 atoms = self.connatoms() 667 667 self.dbname = atoms['INITIAL CATALOG'] 668 db.StorageManagerDB.__init__(self, name, arena, allOptions) 668 669 669 670 def _seq_UnitSequencerInteger(self, unit): … … 884 885 885 886 def __init__(self, name, arena, allOptions={}): 886 db.StorageManagerDB.__init__(self, name, arena, allOptions)887 888 887 self.connstring = allOptions['Connect'] 889 888 atoms = self.connatoms() … … 891 890 atoms.get('DATA SOURCE NAME') or 892 891 atoms.get('DBQ')) 892 db.StorageManagerDB.__init__(self, name, arena, allOptions) 893 893 894 # MS Access can't use a pool, because there doesn't seem 894 895 # to be a commit timeout. 895 self.pool = None 896 self.threaded = False 897 self.debug_connections = True 896 self.connection = db.SingleConnection(self._get_conn, self._del_conn) 898 897 899 898 def _seq_UnitSequencerInteger(self, unit): trunk/storage/storefirebird.py
r200 r201 48 48 49 49 def __init__(self, name, arena, allOptions={}): 50 db.StorageManagerDB.__init__(self, name, arena, allOptions)51 52 50 # DSN = "host:database" 53 51 self.DSN = dsn = allOptions['DSN'] … … 56 54 self.password = allOptions['password'] 57 55 self.encoding = allOptions.get('encoding', 'utf8') 56 db.StorageManagerDB.__init__(self, name, arena, allOptions) 58 57 59 58 def sql_name(self, name, quoted=True): … … 64 63 65 64 def _get_conn(self): 66 c = kinterbasdb.connect(dsn=self.DSN, 67 user=self.user, 68 password=self.password, 69 charset=self.encoding, 70 ) 71 return c 65 return kinterbasdb.connect(dsn=self.DSN, 66 user=self.user, 67 password=self.password, 68 charset=self.encoding, 69 ) 72 70 73 71 def execute(self, query, conn=None): trunk/storage/storemysql.py
r200 r201 157 157 158 158 def __init__(self, name, arena, allOptions={}): 159 db.StorageManagerDB.__init__(self, name, arena, allOptions)160 161 159 connargs = ["host", "user", "passwd", "db", "port", "unix_socket", 162 160 "conv", "connect_time", "compress", "named_pipe", … … 167 165 if k in connargs]) 168 166 self.dbname = self.connargs['db'] 167 168 db.StorageManagerDB.__init__(self, name, arena, allOptions) 169 169 170 170 self.decompiler = MySQLDecompiler trunk/storage/storeodbc.py
r200 r201 94 94 95 95 def __init__(self, name, arena, allOptions={}): 96 self.connstring = allOptions['Connect'] 96 97 db.StorageManagerDB.__init__(self, name, arena, allOptions) 97 self.connstring = allOptions['Connect']98 98 99 99 def _get_conn(self): trunk/storage/storepypgsql.py
r200 r201 71 71 72 72 sql_name_max_length = 63 73 close_connection_method = 'finish'74 73 decompiler = PgSQLDecompiler 75 74 toAdapter = AdapterToPgSQL() … … 77 76 78 77 def __init__(self, name, arena, allOptions={}): 79 db.StorageManagerDB.__init__(self, name, arena, allOptions)80 81 78 # connstring = (host=h port=p dbname=d user=u password=p options=o tty=t) 82 79 self.connstring = allOptions['Connect'] … … 85 82 k, v = atom.split("=", 1) 86 83 setattr(self, k, v) 84 db.StorageManagerDB.__init__(self, name, arena, allOptions) 87 85 88 86 def sql_name(self, name, quoted=True): … … 97 95 except libpq.DatabaseError, x: 98 96 if x.args[0].startswith('could not connect'): 99 raise db.OutOfConnectionsError 97 raise db.OutOfConnectionsError() 100 98 raise 99 100 def _del_conn(self, conn): 101 conn.finish() 101 102 102 103 def _template_conn(self): trunk/storage/storesqlite.py
r200 r201 160 160 161 161 def __init__(self, name, arena, allOptions={}): 162 db.StorageManagerDB.__init__(self, name, arena, allOptions)163 164 162 dbfile = allOptions.get('Database', '') 165 163 if not os.path.isabs(dbfile): 166 164 dbfile = os.path.join(os.getcwd(), dbfile) 167 165 self.database = dbfile 168 169 166 self.mode = int(allOptions.get('Mode', '0755'), 8) 167 db.StorageManagerDB.__init__(self, name, arena, allOptions) 170 168 171 169 def sql_name(self, name, quoted=True): … … 257 255 258 256 def join(self, unitjoin): 259 # SQLite doesn't do nested JOINs, but instead applies them260 # in order. Therefore, we need a single ON-clause at the261 # end of the list of tables. For example:262 # "From a LEFT JOIN b LEFT JOIN c ON a.ID = b.ID AND b.Name = c.Name 257 # SQLite doesn't do nested JOINs, but instead applies 258 # them in order. Therefore, we need a single ON-clause 259 # at the end of the list of tables. For example: 260 # "From a LEFT JOIN b LEFT JOIN c ON a.ID = b.ID AND b.Name = c.Name" 263 261 joins, on_clauses = self._join(unitjoin) 264 262 return joins + " ON " + " AND ".join(on_clauses)
