Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

Changeset 456

Show
Ignore:
Timestamp:
06/11/07 22:37:50
Author:
fumanchu
Message:

Added logging to fs store, and replaced thread locks with file locks.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/storage/storefs.py

    r455 r456  
    77import shutil 
    88import threading 
     9import time 
    910 
    1011import dejavu 
    11 from dejavu import errors, storage 
     12from dejavu import errors, logflags, storage 
    1213 
    1314from geniusql import logic 
     
    5253                            if '.' in k]) 
    5354        self.extdefault = allOptions.get('extdefault', '.txt') 
    54          
    55         self.locks = {} 
    5655     
    5756    def shelf(self, cls): 
    58         """Return path, lock for the given cls.""" 
     57        """Return path for the given cls.""" 
    5958        clsname = cls.__name__ 
    60         if clsname not in self.locks: 
    61             lock = self.locks[clsname] = threading.Lock() 
    62         else: 
    63             lock = self.locks[clsname] 
    64         lock.acquire() 
    65          
    6659        path = os.path.join(self.root, clsname) 
    6760        if not os.path.exists(path): 
    6861            os.mkdir(path, self.mode) 
    6962         
    70         return path, lock 
     63        return path 
     64     
     65    def get_lock(self, cls): 
     66        clsname = cls.__name__ 
     67        path = os.path.join(self.root, clsname, "class.lock") 
     68        while True: 
     69            try: 
     70                lockfd = os.open(path, os.O_CREAT|os.O_WRONLY|os.O_EXCL) 
     71            except OSError: 
     72                time.sleep(0.1) 
     73            else: 
     74                os.close(lockfd)  
     75                break 
     76     
     77    def release_lock(self, cls): 
     78        clsname = cls.__name__ 
     79        path = os.path.join(self.root, clsname, "class.lock") 
     80        os.unlink(path) 
    7181     
    7282    def _pull(self, cls, root, idset): 
     
    8797                if getattr(cls, k).type is not str: 
    8898                    v = pickle.loads(v) 
    89             except IOError, exc: 
     99            except (EOFError, IOError), exc: 
    90100                v = None 
    91101            unitdict[k] = v 
     
    93103        return unitdict 
    94104     
    95     def recall(self, cls, expr=None): 
    96         clsname = cls.__name__ 
    97         units = [] 
    98          
    99         path, lock = self.shelf(cls) 
     105    def xrecall(self, cls, expr=None): 
     106        if self.logflags & logflags.RECALL: 
     107            self.log(logflags.RECALL.message(cls, expr)) 
     108         
     109        path = self.shelf(cls) 
     110        self.get_lock(cls) 
    100111        try: 
    101112            root, dirs, _ = os.walk(path).next() 
    102             for idset in dirs: 
    103                 unit = cls() 
    104                 unit._properties = self._pull(cls, root, idset) 
    105                 if expr is None or expr(unit): 
    106                     unit.cleanse() 
    107                     units.append(unit) 
    108         finally: 
    109             lock.release() 
    110          
    111         for unit in units: 
    112             yield unit 
     113        finally: 
     114            self.release_lock(cls) 
     115         
     116        for idset in dirs: 
     117            unit = cls() 
     118            unit._properties = self._pull(cls, root, idset) 
     119            if expr is None or expr(unit): 
     120                unit.cleanse() 
     121                yield unit 
    113122     
    114123    def ids_from_folder(self, cls, fname): 
    115         """Return a list of identifier (k, v) pairs from the given folder.""" 
     124        """Return a list of identifier (k, v) pairs for the named folder.""" 
    116125        if cls.identifiers: 
    117126            if fname == "__blank__": 
    118127                return [(k, "") for k in cls.identifiers] 
    119128             
     129            # cls.identifiers is ordered, and should match 
     130            # the order of atoms inside fname. 
    120131            return [(k, getattr(cls, k).coerce(None, v)) 
    121132                    for k, v in zip(cls.identifiers, 
     
    162173    def reserve(self, unit): 
    163174        """Reserve a persistent slot for unit.""" 
     175        if self.logflags & logflags.RESERVE: 
     176            self.log(logflags.RESERVE.message(unit)) 
     177         
    164178        cls = unit.__class__ 
    165         path, lock = self.shelf(unit.__class__) 
     179        path = self.shelf(cls) 
     180        self.get_lock(cls) 
    166181        try: 
    167182            if not unit.sequencer.valid_id(unit.identity()): 
     
    177192            self._push(unit, fname) 
    178193        finally: 
    179             lock.release(
     194            self.release_lock(cls
    180195            unit.cleanse() 
    181196     
    182197    def save(self, unit, forceSave=False): 
    183198        """Update storage from unit's data.""" 
    184         if unit.dirty() or forceSave: 
    185             path, lock = self.shelf(unit.__class__) 
     199        if self.logflags & logflags.SAVE: 
     200            self.log(logflags.SAVE.message(unit, forceSave)) 
     201         
     202        if forceSave or unit.dirty(): 
     203            cls = unit.__class__ 
     204            path = self.shelf(cls) 
     205            self.get_lock(cls) 
    186206            try: 
    187207                fname = self.folder_from_unit(unit) 
    188208                self._push(unit, os.path.join(path, fname)) 
    189209            finally: 
    190                 lock.release(
     210                self.release_lock(cls
    191211                unit.cleanse() 
    192212     
    193213    def destroy(self, unit): 
    194214        """Delete the unit.""" 
    195         path, lock = self.shelf(unit.__class__) 
     215        if self.logflags & logflags.DESTROY: 
     216            self.log(logflags.DESTROY.message(unit)) 
     217         
     218        cls = unit.__class__ 
     219        path = self.shelf(cls) 
     220        self.get_lock(cls) 
    196221        try: 
    197222            fname = self.folder_from_unit(unit) 
    198223            shutil.rmtree(os.path.join(path, fname)) 
    199224        finally: 
    200             lock.release(
    201      
    202     __version__ = "0.1
     225            self.release_lock(cls
     226     
     227    __version__ = "0.2
    203228     
    204229    def version(self): 
     
    209234     
    210235    def create_database(self): 
     236        if self.logflags & logflags.DDL: 
     237            self.log(logflags.DDL.message("create database")) 
     238         
    211239        if not os.path.exists(self.root): 
    212240            os.makedirs(self.root) 
    213241     
    214242    def drop_database(self): 
     243        if self.logflags & logflags.DDL: 
     244            self.log(logflags.DDL.message("drop database")) 
     245         
    215246        shutil.rmtree(self.root) 
    216247     
    217248    def create_storage(self, cls): 
    218         clsname = cls.__name__ 
    219         if clsname not in self.locks: 
    220             lock = self.locks[clsname] = threading.Lock() 
    221         else: 
    222             lock = self.locks[clsname] 
    223         lock.acquire() 
    224          
    225         try: 
    226             path = os.path.join(self.root, clsname) 
    227             if not os.path.exists(path): 
    228                 os.mkdir(path, self.mode) 
    229         finally: 
    230             lock.release() 
     249        if self.logflags & logflags.DDL: 
     250            self.log(logflags.DDL.message("create storage %s" % cls)) 
     251        self.shelf(cls) 
    231252     
    232253    def has_storage(self, cls): 
    233         path = os.path.join(self.root, cls.__name__) 
    234         return os.path.exists(path) 
     254        return os.path.exists(self.shelf(cls)) 
    235255     
    236256    def drop_storage(self, cls): 
    237         path = os.path.join(self.root, cls.__name__) 
    238         shutil.rmtree(path) 
     257        if self.logflags & logflags.DDL: 
     258            self.log(logflags.DDL.message("drop storage %s" % cls)) 
     259        shutil.rmtree(self.shelf(cls)) 
    239260     
    240261    def add_property(self, cls, name): 
    241         pass 
     262        if self.logflags & logflags.DDL: 
     263            self.log(logflags.DDL.message("add property %s %s" 
     264                                          % (cls, name))) 
    242265     
    243266    def drop_property(self, cls, name): 
    244         path, lock = self.shelf(cls) 
     267        if self.logflags & logflags.DDL: 
     268            self.log(logflags.DDL.message("drop property %s %s" 
     269                                          % (cls, name))) 
     270         
     271        extkey = "%s.%s" % (cls.__name__, name) 
     272        ext = self.extmap.get(extkey, self.extdefault) 
     273        fname = "%s%s" % (name, ext) 
     274         
     275        path = self.shelf(cls) 
     276        self.get_lock(cls) 
    245277        try: 
    246278            root, dirs, _ = os.walk(path).next() 
    247             extkey = "%s.%s" % (cls.__name__, name) 
    248             ext = self.extmap.get(extkey, self.extdefault) 
    249             fname = "%s%s" % (name, ext) 
    250              
    251279            for idset in dirs: 
    252280                os.remove(os.path.join(root, idset, fname)) 
    253281        finally: 
    254             lock.release(
     282            self.release_lock(cls
    255283     
    256284    def rename_property(self, cls, oldname, newname): 
    257         path, lock = self.shelf(cls) 
    258         try: 
    259             extkey = "%s.%s" % (cls.__name__, oldname) 
    260             ext = self.extmap.get(extkey, self.extdefault) 
    261             oname = "%s%s" % (oldname, ext) 
    262              
    263             extkey = "%s.%s" % (cls.__name__, newname) 
    264             ext = self.extmap.get(extkey, self.extdefault) 
    265             nname = "%s%s" % (newname, ext) 
    266              
     285        if self.logflags & logflags.DDL: 
     286            self.log(logflags.DDL.message( 
     287                "rename property %s from %s to %s" 
     288                % (cls, oldname, newname))) 
     289         
     290        extkey = "%s.%s" % (cls.__name__, oldname) 
     291        ext = self.extmap.get(extkey, self.extdefault) 
     292        oname = "%s%s" % (oldname, ext) 
     293         
     294        extkey = "%s.%s" % (cls.__name__, newname) 
     295        ext = self.extmap.get(extkey, self.extdefault) 
     296        nname = "%s%s" % (newname, ext) 
     297         
     298        path = self.shelf(cls) 
     299        self.get_lock(cls) 
     300        try: 
    267301            root, dirs, _ = os.walk(path).next() 
    268302            for idset in dirs: 
     
    270304                          os.path.join(root, idset, nname)) 
    271305        finally: 
    272             lock.release(
     306            self.release_lock(cls