Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/storepypgsql.py

Revision 47 (checked in by fumanchu, 8 years ago)

1. AdapterToSQL now separates bool constants from bool expressions.
2. storage.db now has automatic connection pooling.
3. Fixed storeado against SQL Server (MSDE).
4. Added multithreading tests to zoo_fixture.

Line 
1 # Use libpq directly to avoid all of the DB-API overhead.
2 from pyPgSQL import libpq
3 import time
4 import datetime
5 import dejavu
6 from dejavu.storage import db
7
8
9 class PgSQLDecompiler(db.SQLDecompiler):
10    
11     def dejavu_icontainedby(self, op1, op2):
12         if isinstance(op1, db.ConstWrapper):
13             # Looking for text in a field. Use ILike (reverse terms).
14             return op2 + " ILIKE '%" + op1.strip("'\"") + "%'"
15         else:
16             # Looking for field in (a, b, c).
17             # Force all args to lowercase for case-insensitive comparison.
18             atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue]
19             return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms))
20    
21     def dejavu_istartswith(self, x, y):
22         return x + " ILIKE '" + y.strip("'\"") + "%'"
23    
24     def dejavu_iendswith(self, x, y):
25         return x + " ILIKE '%" + y.strip("'\"") + "'"
26    
27     def dejavu_ieq(self, x, y):
28         # ILIKE with no wildcards should behave like ieq.
29         return x + " ILIKE " + y
30    
31     def dejavu_year(self, x):
32         return "date_part('year', " + x + ")"
33
34
35
36 class StorageManagerPgSQL(db.StorageManagerDB):
37     """StoreManager to save and retrieve Units via pyPgSQL 1.35."""
38    
39     identifier_length = 63
40     close_connection_method = 'finish'
41     decompiler = PgSQLDecompiler
42    
43     def __init__(self, name, arena, allOptions={}):
44         db.StorageManagerDB.__init__(self, name, arena, allOptions)
45        
46         # connstring = (host=h port=p dbname=d user=u password=p options=o tty=t)
47         self.connstring = allOptions[u'Connect']
48         atoms = self.connstring.split(" ")
49         for atom in atoms:
50             k, v = atom.split("=", 1)
51             setattr(self, k, v)
52        
53         self.retry = 5
54    
55     def _get_conn(self):
56         retry = 0
57         while True:
58             try:
59                 return libpq.PQconnectdb(self.connstring)
60             except libpq.DatabaseError, x:
61                 msg = x.args[0]
62                 if msg.endswith('does not exist\n'):
63                     if self.CreateIfMissing:
64                         self.create_database()
65                         return libpq.PQconnectdb(self.connstring)
66                 elif msg.startswith('could not connect'):
67                     retry += 1
68                     if retry < self.retry:
69                         time.sleep(retry * 0.1)
70                         continue
71                     x.args += ("Try increasing your Postgres server's max_connections",)
72                 raise x
73    
74     def _template_conn(self):
75         atoms = self.connstring.split(" ")
76         tmplconn = ""
77         for atom in atoms:
78             k, v = atom.split("=", 1)
79             if k == 'dbname': v = 'template1'
80             tmplconn += "%s=%s " % (k, v)
81         return libpq.PQconnectdb(tmplconn)
82    
83     def create_database(self):
84         c = self._template_conn()
85         self.execute('CREATE DATABASE %s' % self.identifier(self.dbname), c)
86         c.finish()
87    
88     def drop_database(self):
89         c = self._template_conn()
90         self.execute("DROP DATABASE %s;" % self.identifier(self.dbname), c)
91         c.finish()
92    
93     def fetch(self, query, conn=None):
94         """fetch(query, conn=None) -> rowdata, columns."""
95         res = self.execute(query, conn)
96        
97         columns = []
98         if res.resultType != libpq.EMPTY_QUERY:
99             for index in xrange(res.nfields):
100                 columns.append((res.fname(index), res.ftype(index)))
101        
102         def iterator():
103             for row in xrange(res.ntuples):
104                 yield [res.getvalue(row, col) for col in xrange(res.nfields)]
105             # This should be more robust--needs a class with a cleanup call.
106             res.clear()
107        
108         return iterator(), columns
109
Note: See TracBrowser for help on using the browser.