Changeset 456
- Timestamp:
- 06/11/07 22:37:50
- Files:
-
- trunk/storage/storefs.py (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/storage/storefs.py
r455 r456 7 7 import shutil 8 8 import threading 9 import time 9 10 10 11 import dejavu 11 from dejavu import errors, storage12 from dejavu import errors, logflags, storage 12 13 13 14 from geniusql import logic … … 52 53 if '.' in k]) 53 54 self.extdefault = allOptions.get('extdefault', '.txt') 54 55 self.locks = {}56 55 57 56 def shelf(self, cls): 58 """Return path , lockfor the given cls."""57 """Return path for the given cls.""" 59 58 clsname = cls.__name__ 60 if clsname not in self.locks:61 lock = self.locks[clsname] = threading.Lock()62 else:63 lock = self.locks[clsname]64 lock.acquire()65 66 59 path = os.path.join(self.root, clsname) 67 60 if not os.path.exists(path): 68 61 os.mkdir(path, self.mode) 69 62 70 return path, lock 63 return path 64 65 def get_lock(self, cls): 66 clsname = cls.__name__ 67 path = os.path.join(self.root, clsname, "class.lock") 68 while True: 69 try: 70 lockfd = os.open(path, os.O_CREAT|os.O_WRONLY|os.O_EXCL) 71 except OSError: 72 time.sleep(0.1) 73 else: 74 os.close(lockfd) 75 break 76 77 def release_lock(self, cls): 78 clsname = cls.__name__ 79 path = os.path.join(self.root, clsname, "class.lock") 80 os.unlink(path) 71 81 72 82 def _pull(self, cls, root, idset): … … 87 97 if getattr(cls, k).type is not str: 88 98 v = pickle.loads(v) 89 except IOError, exc:99 except (EOFError, IOError), exc: 90 100 v = None 91 101 unitdict[k] = v … … 93 103 return unitdict 94 104 95 def recall(self, cls, expr=None): 96 clsname = cls.__name__ 97 units = [] 98 99 path, lock = self.shelf(cls) 105 def xrecall(self, cls, expr=None): 106 if self.logflags & logflags.RECALL: 107 self.log(logflags.RECALL.message(cls, expr)) 108 109 path = self.shelf(cls) 110 self.get_lock(cls) 100 111 try: 101 112 root, dirs, _ = os.walk(path).next() 102 for idset in dirs: 103 unit = cls() 104 unit._properties = self._pull(cls, root, idset) 105 if expr is None or expr(unit): 106 unit.cleanse() 107 units.append(unit) 108 finally: 109 lock.release() 110 111 for unit in units: 112 yield unit 113 finally: 114 self.release_lock(cls) 115 116 for idset in dirs: 117 unit = cls() 118 unit._properties = self._pull(cls, root, idset) 119 if expr is None or expr(unit): 120 unit.cleanse() 121 yield unit 113 122 114 123 def ids_from_folder(self, cls, fname): 115 """Return a list of identifier (k, v) pairs f rom the givenfolder."""124 """Return a list of identifier (k, v) pairs for the named folder.""" 116 125 if cls.identifiers: 117 126 if fname == "__blank__": 118 127 return [(k, "") for k in cls.identifiers] 119 128 129 # cls.identifiers is ordered, and should match 130 # the order of atoms inside fname. 120 131 return [(k, getattr(cls, k).coerce(None, v)) 121 132 for k, v in zip(cls.identifiers, … … 162 173 def reserve(self, unit): 163 174 """Reserve a persistent slot for unit.""" 175 if self.logflags & logflags.RESERVE: 176 self.log(logflags.RESERVE.message(unit)) 177 164 178 cls = unit.__class__ 165 path, lock = self.shelf(unit.__class__) 179 path = self.shelf(cls) 180 self.get_lock(cls) 166 181 try: 167 182 if not unit.sequencer.valid_id(unit.identity()): … … 177 192 self._push(unit, fname) 178 193 finally: 179 lock.release()194 self.release_lock(cls) 180 195 unit.cleanse() 181 196 182 197 def save(self, unit, forceSave=False): 183 198 """Update storage from unit's data.""" 184 if unit.dirty() or forceSave: 185 path, lock = self.shelf(unit.__class__) 199 if self.logflags & logflags.SAVE: 200 self.log(logflags.SAVE.message(unit, forceSave)) 201 202 if forceSave or unit.dirty(): 203 cls = unit.__class__ 204 path = self.shelf(cls) 205 self.get_lock(cls) 186 206 try: 187 207 fname = self.folder_from_unit(unit) 188 208 self._push(unit, os.path.join(path, fname)) 189 209 finally: 190 lock.release()210 self.release_lock(cls) 191 211 unit.cleanse() 192 212 193 213 def destroy(self, unit): 194 214 """Delete the unit.""" 195 path, lock = self.shelf(unit.__class__) 215 if self.logflags & logflags.DESTROY: 216 self.log(logflags.DESTROY.message(unit)) 217 218 cls = unit.__class__ 219 path = self.shelf(cls) 220 self.get_lock(cls) 196 221 try: 197 222 fname = self.folder_from_unit(unit) 198 223 shutil.rmtree(os.path.join(path, fname)) 199 224 finally: 200 lock.release()201 202 __version__ = "0. 1"225 self.release_lock(cls) 226 227 __version__ = "0.2" 203 228 204 229 def version(self): … … 209 234 210 235 def create_database(self): 236 if self.logflags & logflags.DDL: 237 self.log(logflags.DDL.message("create database")) 238 211 239 if not os.path.exists(self.root): 212 240 os.makedirs(self.root) 213 241 214 242 def drop_database(self): 243 if self.logflags & logflags.DDL: 244 self.log(logflags.DDL.message("drop database")) 245 215 246 shutil.rmtree(self.root) 216 247 217 248 def create_storage(self, cls): 218 clsname = cls.__name__ 219 if clsname not in self.locks: 220 lock = self.locks[clsname] = threading.Lock() 221 else: 222 lock = self.locks[clsname] 223 lock.acquire() 224 225 try: 226 path = os.path.join(self.root, clsname) 227 if not os.path.exists(path): 228 os.mkdir(path, self.mode) 229 finally: 230 lock.release() 249 if self.logflags & logflags.DDL: 250 self.log(logflags.DDL.message("create storage %s" % cls)) 251 self.shelf(cls) 231 252 232 253 def has_storage(self, cls): 233 path = os.path.join(self.root, cls.__name__) 234 return os.path.exists(path) 254 return os.path.exists(self.shelf(cls)) 235 255 236 256 def drop_storage(self, cls): 237 path = os.path.join(self.root, cls.__name__) 238 shutil.rmtree(path) 257 if self.logflags & logflags.DDL: 258 self.log(logflags.DDL.message("drop storage %s" % cls)) 259 shutil.rmtree(self.shelf(cls)) 239 260 240 261 def add_property(self, cls, name): 241 pass 262 if self.logflags & logflags.DDL: 263 self.log(logflags.DDL.message("add property %s %s" 264 % (cls, name))) 242 265 243 266 def drop_property(self, cls, name): 244 path, lock = self.shelf(cls) 267 if self.logflags & logflags.DDL: 268 self.log(logflags.DDL.message("drop property %s %s" 269 % (cls, name))) 270 271 extkey = "%s.%s" % (cls.__name__, name) 272 ext = self.extmap.get(extkey, self.extdefault) 273 fname = "%s%s" % (name, ext) 274 275 path = self.shelf(cls) 276 self.get_lock(cls) 245 277 try: 246 278 root, dirs, _ = os.walk(path).next() 247 extkey = "%s.%s" % (cls.__name__, name)248 ext = self.extmap.get(extkey, self.extdefault)249 fname = "%s%s" % (name, ext)250 251 279 for idset in dirs: 252 280 os.remove(os.path.join(root, idset, fname)) 253 281 finally: 254 lock.release()282 self.release_lock(cls) 255 283 256 284 def rename_property(self, cls, oldname, newname): 257 path, lock = self.shelf(cls) 258 try: 259 extkey = "%s.%s" % (cls.__name__, oldname) 260 ext = self.extmap.get(extkey, self.extdefault) 261 oname = "%s%s" % (oldname, ext) 262 263 extkey = "%s.%s" % (cls.__name__, newname) 264 ext = self.extmap.get(extkey, self.extdefault) 265 nname = "%s%s" % (newname, ext) 266 285 if self.logflags & logflags.DDL: 286 self.log(logflags.DDL.message( 287 "rename property %s from %s to %s" 288 % (cls, oldname, newname))) 289 290 extkey = "%s.%s" % (cls.__name__, oldname) 291 ext = self.extmap.get(extkey, self.extdefault) 292 oname = "%s%s" % (oldname, ext) 293 294 extkey = "%s.%s" % (cls.__name__, newname) 295 ext = self.extmap.get(extkey, self.extdefault) 296 nname = "%s%s" % (newname, ext) 297 298 path = self.shelf(cls) 299 self.get_lock(cls) 300 try: 267 301 root, dirs, _ = os.walk(path).next() 268 302 for idset in dirs: … … 270 304 os.path.join(root, idset, nname)) 271 305 finally: 272 lock.release()306 self.release_lock(cls)
