Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/storepypgsql.py

Revision 44 (checked in by fumanchu, 8 years ago)

1. Changed all storage tests to use common zoo_fixture.py.
2. Added SQLite Storage Manager.
3. Fixed icontains, icontainedby.
4. Added storage.Version class for comparing version strings.
5. Moved decompiler code into common db.SQLDecompiler.
6. SQLDecompiler.code() sets imperfect, but doesn't return it anymore.
7. Fixed bug in storeado expanded columns (no repr of None).
8. All db SM's now have a _get_conn method in prep for a common db.StorageManagerDB class.
9. All db SM's now have create_database and drop_database.
10. Added separate decom for mysql 4.1.1. Autodetects version.
11. Bugfix: SM.shutdown() methods weren't closing current thread's connection (if threaded).
12. Started cleaning up storeodbc.
13. Bugfix in storeshelve if Unit.ID was non-string.
14. Changed Zoo.Animal.Options to .PreviousZoos? (a list).

Line 
1 # Use libpq directly to avoid all of the DB-API overhead.
2 from pyPgSQL import libpq
3
4 import warnings
5 import threading
6 import datetime
7
8 import dejavu
9 from dejavu import storage, logic
10 from dejavu.storage import db
11
12
13 AdapterToPgSQL = db.AdapterToSQL()
14 AdapterFromPg = db.AdapterFromDB
15
16
17 class StoreIteratorPgSQL(object):
18     """Iterator for populating Units from storage."""
19    
20     def __init__(self, store, unitClass, expr):
21         self.store  = store
22         self.unitClass = unitClass
23         self.expr = expr
24         self.sql, self.imperfect = store.select(unitClass, expr)
25    
26     def units(self):
27         s = self.store
28        
29         res = s.execute(self.sql)
30         if res.resultType != libpq.EMPTY_QUERY:
31             columns = {}
32             for index in xrange(res.nfields):
33                 columns[res.fname(index)] = (index, res.ftype(index))
34            
35             for row in xrange(res.ntuples):
36                 unit = self.unitClass()
37                 coercer = AdapterFromPg(unit)
38                 for key in unit.__class__.properties():
39                     index, ftype = columns[key]
40                     value = res.getvalue(row, index)
41                     try:
42                         coercer.consume(key, value, ftype)
43                     except Exception, x:
44                         x.args += (key, ftype, value)
45                         raise x
46                 # If our SQL is imperfect, don't yield it to the
47                 # caller unless it passes evaluate().
48                 if (not self.imperfect) or self.expr.evaluate(unit):
49                     yield unit
50         res.clear()
51
52
53 class PgSQLDecompiler(db.SQLDecompiler):
54    
55     def dejavu_icontainedby(self, op1, op2):
56         if isinstance(op1, db.ConstWrapper):
57             # Looking for text in a field. Use ILike (reverse terms).
58             return op2 + " ILIKE '%" + op1.strip("'\"") + "%'"
59         else:
60             # Looking for field in (a, b, c).
61             # Force all args to lowercase for case-insensitive comparison.
62             atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue]
63             return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms))
64    
65     def dejavu_istartswith(self, x, y):
66         return x + " ILIKE '" + y.strip("'\"") + "%'"
67    
68     def dejavu_iendswith(self, x, y):
69         return x + " ILIKE '%" + y.strip("'\"") + "'"
70    
71     def dejavu_ieq(self, x, y):
72         # ILIKE with no wildcards should behave like ieq.
73         return x + " ILIKE " + y
74    
75     def dejavu_year(self, x):
76         return "date_part('year', " + x + ")"
77
78
79
80 class StorageManagerPgSQL(storage.StorageManager):
81     """StoreManager to save and retrieve Units via pyPgSQL 1.35."""
82    
83     decompiler = PgSQLDecompiler
84     createAdapter = db.FieldTypeAdapter()
85    
86     def __init__(self, name, arena, allOptions={}):
87         storage.StorageManager.__init__(self, name, arena, allOptions)
88        
89         # connstring = (host=h port=p dbname=d user=u password=p options=o tty=t)
90         self.connstring = allOptions[u'Connect']
91         atoms = self.connstring.split(" ")
92         for atom in atoms:
93             k, v = atom.split("=", 1)
94             setattr(self, k, v)
95         self.CreateIfMissing = allOptions.get(u'Create If Missing', '')
96         self.threaded = bool(allOptions.get(u'Threaded', '1'))
97         self._connection = None
98        
99         self.prefix = allOptions.get(u'Prefix', u"djv")
100         self.reserve_lock = threading.Lock()
101    
102     def identifier(self, *atoms):
103         ident = '"' + ''.join(map(str, atoms)).replace('"', '""') + '"'
104         if len(ident) > 63:
105             warnings.warn("Identifier is longer than 63 characters. Most "
106                           "installations of Postgres are limited to 63. "
107                           "See NAMEDATALEN.")
108         return ident
109    
110     def shutdown(self):
111         if self.threaded:
112             t = threading.currentThread()
113             conn = getattr(t, "dejavu_storage_connection", None)
114             if conn is not None:
115                 conn.finish()
116         else:
117             if self._connection is not None:
118                 self._connection.finish()
119    
120     def _get_conn(self):
121         try:
122             conn = libpq.PQconnectdb(self.connstring)
123         except Exception, x:
124             if self.CreateIfMissing:
125                 self.create_database()
126                 conn = libpq.PQconnectdb(self.connstring)
127             else:
128                 raise
129         return conn
130    
131     def connection(self):
132         if self.threaded:
133             t = threading.currentThread()
134             if not hasattr(t, 'dejavu_storage_connection'):
135                 t.dejavu_storage_connection = self._get_conn()
136             return t.dejavu_storage_connection
137         else:
138             if self._connection is None:
139                 self._connection = self._get_conn()
140             return self._connection
141    
142     def _template_conn(self):
143         atoms = self.connstring.split(" ")
144         tmplconn = ""
145         for atom in atoms:
146             k, v = atom.split("=", 1)
147             if k == 'dbname': v = 'template1'
148             tmplconn += "%s=%s " % (k, v)
149         return libpq.PQconnectdb(tmplconn)
150    
151     def create_database(self):
152         self.execute('CREATE DATABASE %s' % self.identifier(self.dbname),
153                      self._template_conn())
154    
155     def drop_database(self):
156         self.execute("DROP DATABASE %s;" % self.identifier(self.dbname),
157                      self._template_conn())
158    
159     def select(self, unitClass, expr, distinct_fields=None):
160         tablename = self.identifier(self.prefix, unitClass.__name__)
161         if distinct_fields:
162             distinct_fields = [self.identifier(x) for x in distinct_fields]
163             sql = (u'SELECT DISTINCT %s FROM %s' %
164                    (u', '.join(distinct_fields), tablename))
165         else:
166             sql = u'SELECT * FROM %s' % tablename
167         w, i = self.where(unitClass, expr)
168         if len(w) > 0:
169             w = u" WHERE " + w
170         else:
171             w = u""
172         sql += w
173         return sql, i
174    
175     def where(self, cls, expr):
176         decom = self.decompiler(self.prefix + cls.__name__, expr)
177         return decom.code(), decom.imperfect
178    
179     def execute(self, query, conn=None):
180         if conn is None:
181             conn = self.connection()
182         try:
183             return conn.query(query)
184         except Exception, x:
185             x.args += (query,)
186             raise x
187    
188     def recall(self, cls, expr=None, pairs=None):
189         if expr is None:
190             expr = logic.Expression(lambda x: True)
191         return StoreIteratorPgSQL(self, cls, expr).units()
192    
193     def reserve(self, unit):
194         """reserve(unit). -> Reserve a persistent slot for unit.
195         
196         Notice in particular that we do not use the auto-number or
197         sequence generation capabilities within some databases, etc.
198         The ID should be supplied by UnitSequencers via reserve().
199         """
200         clsname = unit.__class__.__name__
201         tblname = self.identifier(self.prefix, clsname)
202         id = self.identifier("ID")
203         self.reserve_lock.acquire()
204         try:
205             if unit.ID is None:
206                 data = []
207                 res = self.execute(u'SELECT %s FROM %s;' % (id, tblname))
208                 if res.resultType != libpq.EMPTY_QUERY:
209                     data = [res.getvalue(row, 0) for row in xrange(res.ntuples)]
210                 unit.ID = unit.sequencer.next(data)
211            
212             self.execute('INSERT INTO %s (%s) VALUES (%s)' %
213                          (tblname, id, AdapterToPgSQL.coerce(unit.ID)))
214         finally:
215             self.reserve_lock.release()
216    
217     def save(self, unit, forceSave=False):
218         """save(unit, forceSave=False) -> Update storage from unit's data."""
219         if unit.dirty() or forceSave:
220             cls = unit.__class__
221             clsname = cls.__name__
222             tablename = self.identifier(self.prefix, clsname)
223            
224             parms = []
225             for key in cls.properties():
226                 if key != "ID":
227                     val = AdapterToPgSQL.coerce(getattr(unit, key))
228                     parms.append('%s = %s' % (self.identifier(key), val))
229             sql = ('UPDATE %s SET %s WHERE %s = %s' %
230                    (tablename, u", ".join(parms),
231                     self.identifier("ID"),
232                     AdapterToPgSQL.coerce(unit.ID, cls.property_type("ID"))))
233             self.execute(sql)
234             unit.cleanse()
235    
236     def destroy(self, unit):
237         """Delete the unit."""
238         # Use a DELETE command instead of a cursor for better performance.
239         deleteStatement = (u'DELETE * FROM %s WHERE %s = %s' %
240                            (self.identifier(self.prefix, unit.__class__.__name__),
241                             self.identifier("ID"),
242                             AdapterToPgSQL.coerce(unit.ID)))
243         self.execute(deleteStatement)
244    
245     def create_storage(self, unitClass):
246         tblname = self.identifier(self.prefix, unitClass.__name__)
247        
248         coerce = self.createAdapter.coerce
249         fields = []
250         for key in unitClass.properties():
251             fields.append(u'%s %s' % (self.identifier(key),
252                                       coerce(unitClass, key)))
253         try:
254             self.execute(u'CREATE TABLE %s (%s)' % (tblname, ", ".join(fields)))
255         except libpq.OperationalError, x:
256             if not x.args[0].endswith(' already exists\n'):
257                 raise
258         else:
259             for index in unitClass.indices():
260                 indexname = self.identifier(self.prefix, "i",
261                                             unitClass.__name__, index)
262                 self.execute(u'CREATE INDEX %s ON %s (%s)'
263                              % (indexname, tblname, self.identifier(index)))
264    
265     def distinct(self, cls, fields, expr=None):
266         """Return distinct values for specified fields."""
267         if expr is None:
268             expr = logic.Expression(lambda x: True)
269        
270         # ^%$#@! There's no way to handle imperfect queries without
271         # creating all involved Units, which defeats the purpose of
272         # distinct, which was a speed issue more than anything. Grr.
273         sql, imperfect = self.select(cls, expr, fields)
274         # Ignore for now.
275 ##        if imperfect:
276 ##            raise ValueError(u"The following query cannot be reliably "
277 ##                             u"returned from a Postgres data source.",
278 ##                             u"distinct()", cls, fields, expr)
279        
280         res = self.execute(sql)
281         if res.resultType == libpq.EMPTY_QUERY:
282             return []
283        
284         coerce = AdapterFromPg().coerce
285         data = []
286         for row in xrange(res.ntuples):
287             coerced_row = []
288             for i in xrange(len(fields)):
289                 expectedType = cls.property_type(field[i])
290                 actualType = res.ftype(i)
291                 val = coerce(res.getvalue(row, i), actualType, expectedType)
292                 coerced_row.append(val)
293             data.append(coerced_row)
294         return zip(*data)
295
Note: See TracBrowser for help on using the browser.