| 1 |
|
|---|
| 2 |
from pyPgSQL import libpq |
|---|
| 3 |
|
|---|
| 4 |
import warnings |
|---|
| 5 |
import threading |
|---|
| 6 |
import datetime |
|---|
| 7 |
|
|---|
| 8 |
import dejavu |
|---|
| 9 |
from dejavu import storage, logic |
|---|
| 10 |
from dejavu.storage import db |
|---|
| 11 |
|
|---|
| 12 |
|
|---|
| 13 |
AdapterToPgSQL = db.AdapterToSQL() |
|---|
| 14 |
AdapterFromPg = db.AdapterFromDB |
|---|
| 15 |
|
|---|
| 16 |
|
|---|
| 17 |
class StoreIteratorPgSQL(object): |
|---|
| 18 |
"""Iterator for populating Units from storage.""" |
|---|
| 19 |
|
|---|
| 20 |
def __init__(self, store, unitClass, expr): |
|---|
| 21 |
self.store = store |
|---|
| 22 |
self.unitClass = unitClass |
|---|
| 23 |
self.expr = expr |
|---|
| 24 |
self.sql, self.imperfect = store.select(unitClass, expr) |
|---|
| 25 |
|
|---|
| 26 |
def units(self): |
|---|
| 27 |
s = self.store |
|---|
| 28 |
|
|---|
| 29 |
res = s.execute(self.sql) |
|---|
| 30 |
if res.resultType != libpq.EMPTY_QUERY: |
|---|
| 31 |
columns = {} |
|---|
| 32 |
for index in xrange(res.nfields): |
|---|
| 33 |
columns[res.fname(index)] = (index, res.ftype(index)) |
|---|
| 34 |
|
|---|
| 35 |
for row in xrange(res.ntuples): |
|---|
| 36 |
unit = self.unitClass() |
|---|
| 37 |
coercer = AdapterFromPg(unit) |
|---|
| 38 |
for key in unit.__class__.properties(): |
|---|
| 39 |
index, ftype = columns[key] |
|---|
| 40 |
value = res.getvalue(row, index) |
|---|
| 41 |
try: |
|---|
| 42 |
coercer.consume(key, value, ftype) |
|---|
| 43 |
except Exception, x: |
|---|
| 44 |
x.args += (key, ftype, value) |
|---|
| 45 |
raise x |
|---|
| 46 |
|
|---|
| 47 |
|
|---|
| 48 |
if (not self.imperfect) or self.expr.evaluate(unit): |
|---|
| 49 |
yield unit |
|---|
| 50 |
res.clear() |
|---|
| 51 |
|
|---|
| 52 |
|
|---|
| 53 |
class PgSQLDecompiler(db.SQLDecompiler): |
|---|
| 54 |
|
|---|
| 55 |
def dejavu_icontainedby(self, op1, op2): |
|---|
| 56 |
if isinstance(op1, db.ConstWrapper): |
|---|
| 57 |
|
|---|
| 58 |
return op2 + " ILIKE '%" + op1.strip("'\"") + "%'" |
|---|
| 59 |
else: |
|---|
| 60 |
|
|---|
| 61 |
|
|---|
| 62 |
atoms = [self.adapter.coerce(x).lower() for x in op2.basevalue] |
|---|
| 63 |
return "LOWER(%s) IN (%s)" % (op1, ", ".join(atoms)) |
|---|
| 64 |
|
|---|
| 65 |
def dejavu_istartswith(self, x, y): |
|---|
| 66 |
return x + " ILIKE '" + y.strip("'\"") + "%'" |
|---|
| 67 |
|
|---|
| 68 |
def dejavu_iendswith(self, x, y): |
|---|
| 69 |
return x + " ILIKE '%" + y.strip("'\"") + "'" |
|---|
| 70 |
|
|---|
| 71 |
def dejavu_ieq(self, x, y): |
|---|
| 72 |
|
|---|
| 73 |
return x + " ILIKE " + y |
|---|
| 74 |
|
|---|
| 75 |
def dejavu_year(self, x): |
|---|
| 76 |
return "date_part('year', " + x + ")" |
|---|
| 77 |
|
|---|
| 78 |
|
|---|
| 79 |
|
|---|
| 80 |
class StorageManagerPgSQL(storage.StorageManager): |
|---|
| 81 |
"""StoreManager to save and retrieve Units via pyPgSQL 1.35.""" |
|---|
| 82 |
|
|---|
| 83 |
decompiler = PgSQLDecompiler |
|---|
| 84 |
createAdapter = db.FieldTypeAdapter() |
|---|
| 85 |
|
|---|
| 86 |
def __init__(self, name, arena, allOptions={}): |
|---|
| 87 |
storage.StorageManager.__init__(self, name, arena, allOptions) |
|---|
| 88 |
|
|---|
| 89 |
|
|---|
| 90 |
self.connstring = allOptions[u'Connect'] |
|---|
| 91 |
atoms = self.connstring.split(" ") |
|---|
| 92 |
for atom in atoms: |
|---|
| 93 |
k, v = atom.split("=", 1) |
|---|
| 94 |
setattr(self, k, v) |
|---|
| 95 |
self.CreateIfMissing = allOptions.get(u'Create If Missing', '') |
|---|
| 96 |
self.threaded = bool(allOptions.get(u'Threaded', '1')) |
|---|
| 97 |
self._connection = None |
|---|
| 98 |
|
|---|
| 99 |
self.prefix = allOptions.get(u'Prefix', u"djv") |
|---|
| 100 |
self.reserve_lock = threading.Lock() |
|---|
| 101 |
|
|---|
| 102 |
def identifier(self, *atoms): |
|---|
| 103 |
ident = '"' + ''.join(map(str, atoms)).replace('"', '""') + '"' |
|---|
| 104 |
if len(ident) > 63: |
|---|
| 105 |
warnings.warn("Identifier is longer than 63 characters. Most " |
|---|
| 106 |
"installations of Postgres are limited to 63. " |
|---|
| 107 |
"See NAMEDATALEN.") |
|---|
| 108 |
return ident |
|---|
| 109 |
|
|---|
| 110 |
def shutdown(self): |
|---|
| 111 |
if self.threaded: |
|---|
| 112 |
t = threading.currentThread() |
|---|
| 113 |
conn = getattr(t, "dejavu_storage_connection", None) |
|---|
| 114 |
if conn is not None: |
|---|
| 115 |
conn.finish() |
|---|
| 116 |
else: |
|---|
| 117 |
if self._connection is not None: |
|---|
| 118 |
self._connection.finish() |
|---|
| 119 |
|
|---|
| 120 |
def _get_conn(self): |
|---|
| 121 |
try: |
|---|
| 122 |
conn = libpq.PQconnectdb(self.connstring) |
|---|
| 123 |
except Exception, x: |
|---|
| 124 |
if self.CreateIfMissing: |
|---|
| 125 |
self.create_database() |
|---|
| 126 |
conn = libpq.PQconnectdb(self.connstring) |
|---|
| 127 |
else: |
|---|
| 128 |
raise |
|---|
| 129 |
return conn |
|---|
| 130 |
|
|---|
| 131 |
def connection(self): |
|---|
| 132 |
if self.threaded: |
|---|
| 133 |
t = threading.currentThread() |
|---|
| 134 |
if not hasattr(t, 'dejavu_storage_connection'): |
|---|
| 135 |
t.dejavu_storage_connection = self._get_conn() |
|---|
| 136 |
return t.dejavu_storage_connection |
|---|
| 137 |
else: |
|---|
| 138 |
if self._connection is None: |
|---|
| 139 |
self._connection = self._get_conn() |
|---|
| 140 |
return self._connection |
|---|
| 141 |
|
|---|
| 142 |
def _template_conn(self): |
|---|
| 143 |
atoms = self.connstring.split(" ") |
|---|
| 144 |
tmplconn = "" |
|---|
| 145 |
for atom in atoms: |
|---|
| 146 |
k, v = atom.split("=", 1) |
|---|
| 147 |
if k == 'dbname': v = 'template1' |
|---|
| 148 |
tmplconn += "%s=%s " % (k, v) |
|---|
| 149 |
return libpq.PQconnectdb(tmplconn) |
|---|
| 150 |
|
|---|
| 151 |
def create_database(self): |
|---|
| 152 |
self.execute('CREATE DATABASE %s' % self.identifier(self.dbname), |
|---|
| 153 |
self._template_conn()) |
|---|
| 154 |
|
|---|
| 155 |
def drop_database(self): |
|---|
| 156 |
self.execute("DROP DATABASE %s;" % self.identifier(self.dbname), |
|---|
| 157 |
self._template_conn()) |
|---|
| 158 |
|
|---|
| 159 |
def select(self, unitClass, expr, distinct_fields=None): |
|---|
| 160 |
tablename = self.identifier(self.prefix, unitClass.__name__) |
|---|
| 161 |
if distinct_fields: |
|---|
| 162 |
distinct_fields = [self.identifier(x) for x in distinct_fields] |
|---|
| 163 |
sql = (u'SELECT DISTINCT %s FROM %s' % |
|---|
| 164 |
(u', '.join(distinct_fields), tablename)) |
|---|
| 165 |
else: |
|---|
| 166 |
sql = u'SELECT * FROM %s' % tablename |
|---|
| 167 |
w, i = self.where(unitClass, expr) |
|---|
| 168 |
if len(w) > 0: |
|---|
| 169 |
w = u" WHERE " + w |
|---|
| 170 |
else: |
|---|
| 171 |
w = u"" |
|---|
| 172 |
sql += w |
|---|
| 173 |
return sql, i |
|---|
| 174 |
|
|---|
| 175 |
def where(self, cls, expr): |
|---|
| 176 |
decom = self.decompiler(self.prefix + cls.__name__, expr) |
|---|
| 177 |
return decom.code(), decom.imperfect |
|---|
| 178 |
|
|---|
| 179 |
def execute(self, query, conn=None): |
|---|
| 180 |
if conn is None: |
|---|
| 181 |
conn = self.connection() |
|---|
| 182 |
try: |
|---|
| 183 |
return conn.query(query) |
|---|
| 184 |
except Exception, x: |
|---|
| 185 |
x.args += (query,) |
|---|
| 186 |
raise x |
|---|
| 187 |
|
|---|
| 188 |
def recall(self, cls, expr=None, pairs=None): |
|---|
| 189 |
if expr is None: |
|---|
| 190 |
expr = logic.Expression(lambda x: True) |
|---|
| 191 |
return StoreIteratorPgSQL(self, cls, expr).units() |
|---|
| 192 |
|
|---|
| 193 |
def reserve(self, unit): |
|---|
| 194 |
"""reserve(unit). -> Reserve a persistent slot for unit. |
|---|
| 195 |
|
|---|
| 196 |
Notice in particular that we do not use the auto-number or |
|---|
| 197 |
sequence generation capabilities within some databases, etc. |
|---|
| 198 |
The ID should be supplied by UnitSequencers via reserve(). |
|---|
| 199 |
""" |
|---|
| 200 |
clsname = unit.__class__.__name__ |
|---|
| 201 |
tblname = self.identifier(self.prefix, clsname) |
|---|
| 202 |
id = self.identifier("ID") |
|---|
| 203 |
self.reserve_lock.acquire() |
|---|
| 204 |
try: |
|---|
| 205 |
if unit.ID is None: |
|---|
| 206 |
data = [] |
|---|
| 207 |
res = self.execute(u'SELECT %s FROM %s;' % (id, tblname)) |
|---|
| 208 |
if res.resultType != libpq.EMPTY_QUERY: |
|---|
| 209 |
data = [res.getvalue(row, 0) for row in xrange(res.ntuples)] |
|---|
| 210 |
unit.ID = unit.sequencer.next(data) |
|---|
| 211 |
|
|---|
| 212 |
self.execute('INSERT INTO %s (%s) VALUES (%s)' % |
|---|
| 213 |
(tblname, id, AdapterToPgSQL.coerce(unit.ID))) |
|---|
| 214 |
finally: |
|---|
| 215 |
self.reserve_lock.release() |
|---|
| 216 |
|
|---|
| 217 |
def save(self, unit, forceSave=False): |
|---|
| 218 |
"""save(unit, forceSave=False) -> Update storage from unit's data.""" |
|---|
| 219 |
if unit.dirty() or forceSave: |
|---|
| 220 |
cls = unit.__class__ |
|---|
| 221 |
clsname = cls.__name__ |
|---|
| 222 |
tablename = self.identifier(self.prefix, clsname) |
|---|
| 223 |
|
|---|
| 224 |
parms = [] |
|---|
| 225 |
for key in cls.properties(): |
|---|
| 226 |
if key != "ID": |
|---|
| 227 |
val = AdapterToPgSQL.coerce(getattr(unit, key)) |
|---|
| 228 |
parms.append('%s = %s' % (self.identifier(key), val)) |
|---|
| 229 |
sql = ('UPDATE %s SET %s WHERE %s = %s' % |
|---|
| 230 |
(tablename, u", ".join(parms), |
|---|
| 231 |
self.identifier("ID"), |
|---|
| 232 |
AdapterToPgSQL.coerce(unit.ID, cls.property_type("ID")))) |
|---|
| 233 |
self.execute(sql) |
|---|
| 234 |
unit.cleanse() |
|---|
| 235 |
|
|---|
| 236 |
def destroy(self, unit): |
|---|
| 237 |
"""Delete the unit.""" |
|---|
| 238 |
|
|---|
| 239 |
deleteStatement = (u'DELETE * FROM %s WHERE %s = %s' % |
|---|
| 240 |
(self.identifier(self.prefix, unit.__class__.__name__), |
|---|
| 241 |
self.identifier("ID"), |
|---|
| 242 |
AdapterToPgSQL.coerce(unit.ID))) |
|---|
| 243 |
self.execute(deleteStatement) |
|---|
| 244 |
|
|---|
| 245 |
def create_storage(self, unitClass): |
|---|
| 246 |
tblname = self.identifier(self.prefix, unitClass.__name__) |
|---|
| 247 |
|
|---|
| 248 |
coerce = self.createAdapter.coerce |
|---|
| 249 |
fields = [] |
|---|
| 250 |
for key in unitClass.properties(): |
|---|
| 251 |
fields.append(u'%s %s' % (self.identifier(key), |
|---|
| 252 |
coerce(unitClass, key))) |
|---|
| 253 |
try: |
|---|
| 254 |
self.execute(u'CREATE TABLE %s (%s)' % (tblname, ", ".join(fields))) |
|---|
| 255 |
except libpq.OperationalError, x: |
|---|
| 256 |
if not x.args[0].endswith(' already exists\n'): |
|---|
| 257 |
raise |
|---|
| 258 |
else: |
|---|
| 259 |
for index in unitClass.indices(): |
|---|
| 260 |
indexname = self.identifier(self.prefix, "i", |
|---|
| 261 |
unitClass.__name__, index) |
|---|
| 262 |
self.execute(u'CREATE INDEX %s ON %s (%s)' |
|---|
| 263 |
% (indexname, tblname, self.identifier(index))) |
|---|
| 264 |
|
|---|
| 265 |
def distinct(self, cls, fields, expr=None): |
|---|
| 266 |
"""Return distinct values for specified fields.""" |
|---|
| 267 |
if expr is None: |
|---|
| 268 |
expr = logic.Expression(lambda x: True) |
|---|
| 269 |
|
|---|
| 270 |
|
|---|
| 271 |
|
|---|
| 272 |
|
|---|
| 273 |
sql, imperfect = self.select(cls, expr, fields) |
|---|
| 274 |
|
|---|
| 275 |
|
|---|
| 276 |
|
|---|
| 277 |
|
|---|
| 278 |
|
|---|
| 279 |
|
|---|
| 280 |
res = self.execute(sql) |
|---|
| 281 |
if res.resultType == libpq.EMPTY_QUERY: |
|---|
| 282 |
return [] |
|---|
| 283 |
|
|---|
| 284 |
coerce = AdapterFromPg().coerce |
|---|
| 285 |
data = [] |
|---|
| 286 |
for row in xrange(res.ntuples): |
|---|
| 287 |
coerced_row = [] |
|---|
| 288 |
for i in xrange(len(fields)): |
|---|
| 289 |
expectedType = cls.property_type(field[i]) |
|---|
| 290 |
actualType = res.ftype(i) |
|---|
| 291 |
val = coerce(res.getvalue(row, i), actualType, expectedType) |
|---|
| 292 |
coerced_row.append(val) |
|---|
| 293 |
data.append(coerced_row) |
|---|
| 294 |
return zip(*data) |
|---|
| 295 |
|
|---|