Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/tags/1.4.0/storage/storepypgsql.py

Revision 151 (checked in by fumanchu, 3 years ago)

Removed the CreateIfMissing? option; this should now be performed with a schema instead.

  • Property svn:eol-style set to native
Line 
1 # Use libpq directly to avoid all of the DB-API overhead.
2 from pyPgSQL import libpq
3 import datetime
4 import dejavu
5 from dejavu.storage import db
6
7
8 class AdapterToPgSQL(db.AdapterToSQL):
9    
10     like_escapes = [("%", r"\\%"), ("_", r"\\_")]
11
12
13 class PgSQLDecompiler(db.SQLDecompiler):
14    
15     def dejavu_icontainedby(self, op1, op2):
16         if isinstance(op1, db.ConstWrapper):
17             # Looking for text in a field. Use ILike (reverse terms).
18             return op2 + " ILIKE '%" + self.adapter.escape_like(op1) + "%'"
19         else:
20             # Looking for field in (a, b, c).
21             # Force all args to lowercase for case-insensitive comparison.
22             atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue]
23             return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms))
24    
25     def dejavu_istartswith(self, x, y):
26         return x + " ILIKE '" + self.adapter.escape_like(y) + "%'"
27    
28     def dejavu_iendswith(self, x, y):
29         return x + " ILIKE '%" + self.adapter.escape_like(y) + "'"
30    
31     def dejavu_ieq(self, x, y):
32         # ILIKE with no wildcards should behave like ieq.
33         return x + " ILIKE '" + self.adapter.escape_like(y) + "'"
34    
35     def dejavu_year(self, x):
36         return "date_part('year', " + x + ")"
37
38
39
40 class StorageManagerPgSQL(db.StorageManagerDB):
41     """StoreManager to save and retrieve Units via pyPgSQL 1.35."""
42    
43     sql_name_max_length = 63
44     close_connection_method = 'finish'
45     decompiler = PgSQLDecompiler
46     toAdapter = AdapterToPgSQL()
47    
48     def __init__(self, name, arena, allOptions={}):
49         db.StorageManagerDB.__init__(self, name, arena, allOptions)
50        
51         # connstring = (host=h port=p dbname=d user=u password=p options=o tty=t)
52         self.connstring = allOptions[u'Connect']
53         atoms = self.connstring.split(" ")
54         for atom in atoms:
55             k, v = atom.split("=", 1)
56             setattr(self, k, v)
57    
58     def sql_name(self, name, quoted=True):
59         name = db.StorageManagerDB.sql_name(self, name, quoted)
60         if quoted:
61             name = '"' + name.replace('"', '""') + '"'
62         return name
63    
64     def _get_conn(self):
65         try:
66             return libpq.PQconnectdb(self.connstring)
67         except libpq.DatabaseError, x:
68             if x.args[0].startswith('could not connect'):
69                 raise db.OutOfConnectionsError
70             raise
71    
72     def _template_conn(self):
73         atoms = self.connstring.split(" ")
74         tmplconn = ""
75         for atom in atoms:
76             k, v = atom.split("=", 1)
77             if k == 'dbname': v = 'template1'
78             tmplconn += "%s=%s " % (k, v)
79         return libpq.PQconnectdb(tmplconn)
80    
81     def version(self):
82         c = self._template_conn()
83         v = c.version
84         c.finish()
85         return v
86    
87     def fetch(self, query, conn=None):
88         """fetch(query, conn=None) -> rowdata, columns."""
89         res = self.execute(query, conn)
90        
91         columns = []
92         if res.resultType != libpq.EMPTY_QUERY:
93             for index in xrange(res.nfields):
94                 columns.append((res.fname(index), res.ftype(index)))
95        
96         data = [[res.getvalue(row, col) for col in xrange(res.nfields)]
97                 for row in xrange(res.ntuples)]
98         res.clear()
99        
100         return data, columns
101    
102     def create_database(self):
103         c = self._template_conn()
104         self.execute('CREATE DATABASE %s' % self.sql_name(self.dbname), c)
105         c.finish()
106    
107     def drop_database(self):
108         c = self._template_conn()
109         self.execute("DROP DATABASE %s;" % self.sql_name(self.dbname), c)
110         c.finish()
111    
112     def has_storage(self, cls):
113         # For some odd reason, libpq errors if you try to filter by tablename.
114         sql = u"SELECT tablename FROM pg_tables"
115         data, cols = self.fetch(sql)
116         return [self.table_name(cls.__name__, quoted=False)] in data
117
Note: See TracBrowser for help on using the browser.