Contact: fumanchu@aminus.org

Log in as guest/geniusql to create tickets

root/trunk/geniusql/providers/psycopg.py

Revision 326 (checked in by lakin, 2 months ago)

fixing the postgresql connection tests and the sqlite join sql writer

  • Property svn:eol-style set to native
Line 
1 # Use _psycopg directly to avoid overhead.
2 try:
3     # If possible, you should copy the _psycopg.pyd file into a top level
4     # so this SM can avoid importing the entire package.
5     import _psycopg
6 except ImportError:
7     from psycopg2 import _psycopg
8
9 from geniusql import conns, errors
10 from geniusql.providers import postgres
11
12 #-------------------------------------------------------------------------------
13 def safe_connection_close(conn):
14     """If you call close on an already closed connection, psycopg raises an error.
15
16     Normally - this is fine - but we try to close our connections/cursors in the case
17     of connection errors so that we can properly re-connect - so we catch and ignore
18     ONLY those errors right now.
19     """
20     try:
21         conn.close()
22     except _psycopg.InterfaceError, exc:
23         msg = exc.args[0]
24         if msg.startswith('connection already closed'):
25             return
26         else:
27             raise
28
29 #-------------------------------------------------------------------------------
30 def safe_cursor_close(cursor):
31     """If you call close on an already closed cursor, psycopg raises an error.
32
33     Normally - this is fine - but we try to close our connections/cursors in the case
34     of connection errors so that we can properly re-connect - so we catch and ignore
35     ONLY those errors right now.
36     """
37     try:
38         cursor.close()
39     except _psycopg.InterfaceError, exc:
40         msg = exc.args[0]
41         if msg.startswith('cursor already closed'):
42             return
43         else:
44             raise
45
46 class PsycoPgConnectionManager(conns.ConnectionManager):
47
48     default_isolation = "READ COMMITTED"
49
50     def _get_conn(self, master=False):
51         if master:
52             # Commit any pending transaction in the current thread.
53             if self.in_transaction():
54                 self.commit()
55
56             # Must shut down all connections to avoid
57             # "being accessed by other users" error.
58             self.shutdown()
59
60             connstr = ""
61             for atom in self.Connect.split(" "):
62                 k, v = atom.split("=", 1)
63                 if k == 'dbname':
64                     v = 'template1'
65                 connstr += "%s=%s " % (k, v)
66         else:
67             connstr = self.Connect
68
69         try:
70             c = _psycopg.connect(connstr)
71             # Allow statements like CREATE DATABASE to run outside a transaction.
72             c.set_isolation_level(0)
73             if self.initial_sql:
74                 cursor.execute(self.initial_sql)
75             return c
76         except _psycopg.DatabaseError, x:
77             if x.args[0].startswith('could not connect'):
78                 raise errors.OutOfConnectionsError(*x.args)
79             raise
80
81     def _del_conn(self, conn):
82         safe_connection_close(conn)
83
84
85 class PsycoPgSchema(postgres.PgSchema):
86
87     def _get_dbinfo(self, conn=None):
88         dbinfo = {}
89         try:
90             data, _ = self.db.fetch("SELECT pg_encoding_to_char(encoding) "
91                                     "FROM pg_database;", conn=conn)
92             dbinfo['encoding'] = data[0][0]
93         except _psycopg.DatabaseError, x:
94             if "does not exist" not in x.args[0]:
95                 raise
96         return dbinfo
97
98
99 class PsycoPgDatabase(postgres.PgDatabase):
100
101     connectionmanager = PsycoPgConnectionManager
102     schemaclass = PsycoPgSchema
103
104     def version(self):
105         c = self.connections._get_conn(master=True)
106         data, _ = self.fetch("SELECT version();", c)
107         v, = data[0]
108         c.close()
109         return "%s\npsycopg version: %s" % (v, _psycopg.__version__)
110
111     def is_connection_error(self, exc):
112         """If the given exception instance is a connection error, return True.
113
114         This should return True for errors which arise from broken connections;
115         for example, if the database server has dropped the connection socket,
116         or is unreachable.
117         """
118         if isinstance(exc, _psycopg.OperationalError):
119             # OperationalError: connection not open
120             msg = exc.args[0]
121             return msg.startswith('connection not open') or msg.startswith("terminating connection due")
122         if isinstance(exc, _psycopg.InterfaceError):
123             # InterfaceError: connection already closed
124             msg = exc.args[0]
125             return msg.startswith('connection already closed')
126         return False
127
128     def execute(self, sql, conn=None):
129         """Return a native response for the given SQL."""
130         if conn is None:
131             conn = self.connections.get()
132         if isinstance(sql, unicode):
133             sql = sql.encode(self.encoding)
134         self.log(sql)
135
136         def _execute_implementation(sql):
137             cursor = conn.cursor()
138             try:
139                 return cursor.execute(sql)
140             finally:
141                 safe_cursor_close(cursor)
142
143         try:
144             return _execute_implementation(sql)
145         except Exception, x:
146             if self.is_connection_error(x):
147                 self.connections.reset(conn)
148                 return _execute_implementation(sql)
149             raise
150
151     def fetch(self, sql, conn=None):
152         """Return rowdata, columns(name, type) for the given sql.
153
154         sql should be a SQL string.
155
156         rowdata will be an iterable of iterables containing the result values.
157         columns will be an iterable of (column name, data type) pairs.
158         """
159         if conn is None:
160             conn = self.connections.get()
161         if isinstance(sql, unicode):
162             sql = sql.encode(self.encoding)
163         self.log(sql)
164
165         def _fetch_implementation(sql):
166             cursor = conn.cursor()
167             try:
168                 cursor.execute(sql)
169                 data = cursor.fetchall()
170                 coldefs = cursor.description
171             finally:
172                 safe_cursor_close(cursor)
173             return data, coldefs
174
175         try:
176             return _fetch_implementation(sql)
177         except Exception, x:
178             if self.is_connection_error(x):
179                 self.connections.reset(conn)
180                 return _fetch_implementation(sql)
181             else:
182                 raise
183
184         return data, coldefs
185
Note: See TracBrowser for help on using the browser.