Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

Changeset 201

Show
Ignore:
Timestamp:
03/20/06 06:48:36
Author:
fumanchu
Message:

Separated connection factories (pools, etc) out of the StorageManager? class. SM's now have a "_del_conn" method instead of a "close_connection_method" attribute. Also, the LOGCONN logflag has been removed.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/arenas.py

    r200 r201  
    88 
    99__all__ = ['Arena', 'Sandbox', 
    10            'LOGCONN', 'LOGFORGET', 'LOGMEMORIZE', 'LOGRECALL', 
     10           'LOGFORGET', 'LOGMEMORIZE', 'LOGRECALL', 
    1111           'LOGREPRESS', 'LOGSANDBOX', 'LOGSQL', 'LOGVIEW', 
    1212           ] 
     
    1515# logging flags (see Arena.logflags) 
    1616LOGSQL = 4 
    17 LOGCONN = 8 
    1817 
    1918LOGMEMORIZE = 128 
  • trunk/storage/db.py

    r200 r201  
    6060 
    6161import dejavu 
    62 from dejavu import codewalk, logic, storage, LOGCONN, LOGSQL, xray 
     62from dejavu import codewalk, logic, storage, LOGSQL, xray 
    6363 
    6464 
     
    733733 
    734734 
     735# ------------------------- Connection Factories ------------------------- # 
     736 
     737 
    735738class ConnectionWrapper(object): 
    736739    """Connection object wrapper, so it can be used as a weak reference.""" 
     
    741744    def __getattr__(self, attr): 
    742745        return getattr(self.conn, attr) 
     746 
     747 
     748class OutOfConnectionsError(dejavu.DejavuError): 
     749    """Exception raised when a database store has run out of connections.""" 
     750    pass 
     751 
     752 
     753class ConnectionFactory(object): 
     754    """A connection factory which creates a new connection for each request.""" 
     755     
     756    def __init__(self, open, close, retry=5): 
     757        self.open = open 
     758        self.close = close 
     759        self.retry = retry 
     760        self.refs = {} 
     761     
     762    def __call__(self): 
     763        """Return a connection.""" 
     764        for i in xrange(self.retry): 
     765            try: 
     766                conn = self.open() 
     767                w = ConnectionWrapper(conn) 
     768                self.refs[weakref.ref(w, self._release)] = w.conn 
     769                return w 
     770            except OutOfConnectionsError: 
     771                time.sleep(i + 1) 
     772                conn = None 
     773        raise OutOfConnectionsError() 
     774     
     775    def _release(self, ref): 
     776        """Release a connection.""" 
     777        self.close(self.refs.pop(ref)) 
     778     
     779    def shutdown(self): 
     780        """Release all database connections.""" 
     781        # Empty self.refs. 
     782        while self.refs: 
     783            ref, conn = self.refs.popitem() 
     784            self.close(conn) 
     785 
     786 
     787class ConnectionPool(object): 
     788    """A database connection factory which keeps a pool of connections.""" 
     789     
     790    def __init__(self, open, close, size=10, retry=5): 
     791        self.open = open 
     792        self.close = close 
     793        self.refs = {} 
     794        self.pool = Queue.Queue(size) 
     795        self.retry = retry 
     796     
     797    def __call__(self): 
     798        """Return a connection from the pool.""" 
     799        for i in xrange(self.retry): 
     800            try: 
     801                conn = self.pool.get_nowait() 
     802                # Okay, this is freaky. If we wrap here, all goes well. 
     803                # If we wrap on Queue.put(), mysql crashes after 1700 
     804                # or so inserts (when migrating Access tables to MySQL). 
     805                # Go figure. 
     806                w = ConnectionWrapper(conn) 
     807                self.refs[weakref.ref(w, self._release)] = w.conn 
     808                return w 
     809            except Queue.Empty: 
     810                pass 
     811             
     812            try: 
     813                conn = self.open() 
     814                w = ConnectionWrapper(conn) 
     815                self.refs[weakref.ref(w, self._release)] = w.conn 
     816                return w 
     817            except OutOfConnectionsError: 
     818                time.sleep(i + 1) 
     819                conn = None 
     820        raise OutOfConnectionsError() 
     821     
     822    def _release(self, ref): 
     823        """Release a connection.""" 
     824        conn = self.refs.pop(ref) 
     825        try: 
     826            self.pool.put_nowait(conn) 
     827            return 
     828        except Queue.Full: 
     829            pass 
     830        self.close(conn) 
     831     
     832    def shutdown(self): 
     833        """Release all database connections.""" 
     834        # Empty the pool. 
     835        while True: 
     836            try: 
     837                self.pool.get(True, 0.5) 
     838            except Queue.Empty: 
     839                break 
     840         
     841        # Empty self.refs. 
     842        while self.refs: 
     843            ref, conn = self.refs.popitem() 
     844            self.close(conn) 
     845 
     846 
     847class SingleConnection(object): 
     848    """A single database connection for all consumers. 
     849     
     850    Use this when your database cannot handle multiple connections at once, 
     851    but can handle multiple threads using the same connection. 
     852    """ 
     853     
     854    def __init__(self, open, close): 
     855        self.open = open 
     856        self.close = close 
     857        # Delay opening the connection, because the 
     858        # SM may need to create the database first. 
     859        self._conn = None 
     860     
     861    def __call__(self): 
     862        """Return our only connection.""" 
     863        if self._conn is None: 
     864            self._conn = self.open() 
     865        return self._conn 
     866     
     867    def shutdown(self): 
     868        """Release all database connections.""" 
     869        if self._conn is not None: 
     870            self.close(self._conn) 
     871 
     872 
     873# --------------------------- Storage Manager --------------------------- # 
    743874 
    744875 
     
    795926    sql_name_max_length = 64 
    796927    sql_name_caseless = False 
    797     close_connection_method = 'close' 
    798928    use_asterisk_to_get_all = False 
    799929     
     
    802932    toAdapter = AdapterToSQL() 
    803933    fromAdapter = AdapterFromDB() 
    804     debug_connections = False 
    805934     
    806935    def __init__(self, name, arena, allOptions={}): 
     
    821950        if adapter: self.fromAdapter = adapter 
    822951         
    823         self.pool_size = int(allOptions.get('Pool Size', '10')) 
    824          
    825         self.refs = {} 
    826         if self.pool_size > 0: 
    827             self.pool = Queue.Queue(self.pool_size) 
    828         else: 
    829             self.pool = None 
    830         self.retry = 5 
    831         self.threaded = True 
     952        size = int(allOptions.get('Pool Size', '10')) 
     953        if size > 0: 
     954            self.connection = ConnectionPool(self._get_conn, self._del_conn, size) 
     955        else: 
     956            self.connection = ConnectionFactory(self._get_conn, self._del_conn) 
    832957         
    833958        self.prefix = allOptions.get('Prefix', "djv") 
     
    8801005        raise NotImplementedError 
    8811006     
    882     def connection(self): 
    883         """Return a connection from the pool.""" 
    884         if not self.threaded: 
    885             # Place a single 'conn' entry in self.refs. 
    886             try: 
    887                 return self.refs['conn'] 
    888             except KeyError: 
    889                 self.refs['conn'] = conn = self._get_conn() 
    890                 return conn 
    891          
    892         retry = 0 
    893         while True: 
    894             if self.pool is not None: 
    895                 try: 
    896                     conn = self.pool.get_nowait() 
    897                     # Okay, this is freaky. If we wrap here, all goes well. 
    898                     # If we wrap on Queue.put(), mysql crashes after 1700 
    899                     # or so inserts (when migrating Access tables to MySQL). 
    900                     # Go figure. 
    901                     w = ConnectionWrapper(conn) 
    902                     self.refs[weakref.ref(w, self.release)] = w.conn 
    903                     self.arena.log("-->get %s" % self.__class__.__name__, 
    904                                    LOGCONN) 
    905                     return w 
    906                 except Queue.Empty: 
    907                     pass 
    908              
    909             try: 
    910                 conn = self._get_conn() 
    911                 w = ConnectionWrapper(conn) 
    912                 self.refs[weakref.ref(w, self.release)] = w.conn 
    913                 self.arena.log("create %s" % self.__class__.__name__, LOGCONN) 
    914                 return w 
    915             except OutOfConnectionsError: 
    916                 retry += 1 
    917                 if retry < self.retry: 
    918                     time.sleep(retry * 1) 
    919                     conn = None 
    920                     continue 
    921                 raise OutOfConnectionsError() 
    922      
    923     def release(self, ref): 
    924         # This method should only be called if self.threaded is True 
    925         conn = self.refs.pop(ref) 
    926          
    927         if self.pool is not None: 
    928             try: 
    929                 self.pool.put_nowait(conn) 
    930                 self.arena.log("<--put %s" % self.__class__.__name__, LOGCONN) 
    931                 return 
    932             except Queue.Full: 
    933                 pass 
    934          
    935         getattr(conn, self.close_connection_method)() 
    936         self.arena.log("___close___ %s" % self.__class__.__name__, LOGCONN) 
     1007    def _del_conn(self, conn): 
     1008        # Override this with the close call (if any) for your DB. 
     1009        conn.close() 
    9371010     
    9381011    def shutdown(self): 
    9391012        """Release all database connections.""" 
    940         # Empty the pool. 
    941         if self.pool: 
    942             while True: 
    943                 try: 
    944                     self.pool.get(True, 0.5) 
    945                 except Queue.Empty: 
    946                     break 
    947          
    948         # Empty self.refs. 
    949         while self.refs: 
    950             ref, conn = self.refs.popitem() 
    951             getattr(conn, self.close_connection_method)() 
     1013        self.connection.shutdown() 
    9521014     
    9531015    def select(self, cls, expr, fields=None, distinct=False): 
     
    14561518                 repr(self.pk), repr(self.unique))) 
    14571519 
    1458  
    1459 class OutOfConnectionsError(dejavu.DejavuError): 
    1460     """Exception raised when a database store has run out of connections.""" 
    1461     pass 
    1462  
  • trunk/storage/storeado.py

    r200 r201  
    353353    """ 
    354354     
    355     close_connection_method = 'Close' 
    356355    decompiler = ADOSQLDecompiler 
    357356    fromAdapter = AdapterFromADO() 
     
    374373        conn.Open(self.connstring) 
    375374        return conn 
     375     
     376    def _del_conn(self, conn): 
     377        conn.Close() 
    376378     
    377379    def execute(self, query, conn=None): 
     
    399401                self.arena.log(query, dejavu.LOGSQL) 
    400402                res = win32com.client.Dispatch(r'ADODB.Recordset') 
    401                 if self.threaded
    402                     # 'conn' will be a ConnectionWrapper object, which .Open 
     403                if hasattr(conn, "conn")
     404                    # 'conn' is be a ConnectionWrapper object, which .Open 
    403405                    # won't accept. Pass the unwrapped connection instead. 
    404406                    res.Open(query, conn.conn, adOpenForwardOnly, adLockReadOnly) 
     
    661663     
    662664    def __init__(self, name, arena, allOptions={}): 
    663         db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    664          
    665665        self.connstring = allOptions['Connect'] 
    666666        atoms = self.connatoms() 
    667667        self.dbname = atoms['INITIAL CATALOG'] 
     668        db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    668669     
    669670    def _seq_UnitSequencerInteger(self, unit): 
     
    884885     
    885886    def __init__(self, name, arena, allOptions={}): 
    886         db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    887          
    888887        self.connstring = allOptions['Connect'] 
    889888        atoms = self.connatoms() 
     
    891890                       atoms.get('DATA SOURCE NAME') or 
    892891                       atoms.get('DBQ')) 
     892        db.StorageManagerDB.__init__(self, name, arena, allOptions) 
     893         
    893894        # MS Access can't use a pool, because there doesn't seem 
    894895        # to be a commit timeout. 
    895         self.pool = None 
    896         self.threaded = False 
    897         self.debug_connections = True 
     896        self.connection = db.SingleConnection(self._get_conn, self._del_conn) 
    898897     
    899898    def _seq_UnitSequencerInteger(self, unit): 
  • trunk/storage/storefirebird.py

    r200 r201  
    4848     
    4949    def __init__(self, name, arena, allOptions={}): 
    50         db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    51          
    5250        # DSN = "host:database" 
    5351        self.DSN = dsn = allOptions['DSN'] 
     
    5654        self.password = allOptions['password'] 
    5755        self.encoding = allOptions.get('encoding', 'utf8') 
     56        db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    5857     
    5958    def sql_name(self, name, quoted=True): 
     
    6463     
    6564    def _get_conn(self): 
    66         c = kinterbasdb.connect(dsn=self.DSN, 
    67                                 user=self.user, 
    68                                 password=self.password, 
    69                                 charset=self.encoding, 
    70                                 ) 
    71         return c 
     65        return kinterbasdb.connect(dsn=self.DSN, 
     66                                   user=self.user, 
     67                                   password=self.password, 
     68                                   charset=self.encoding, 
     69                                   ) 
    7270     
    7371    def execute(self, query, conn=None): 
  • trunk/storage/storemysql.py

    r200 r201  
    157157     
    158158    def __init__(self, name, arena, allOptions={}): 
    159         db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    160          
    161159        connargs = ["host", "user", "passwd", "db", "port", "unix_socket", 
    162160                    "conv", "connect_time", "compress", "named_pipe", 
     
    167165                              if k in connargs]) 
    168166        self.dbname = self.connargs['db'] 
     167         
     168        db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    169169         
    170170        self.decompiler = MySQLDecompiler 
  • trunk/storage/storeodbc.py

    r200 r201  
    9494     
    9595    def __init__(self, name, arena, allOptions={}): 
     96        self.connstring = allOptions['Connect'] 
    9697        db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    97         self.connstring = allOptions['Connect'] 
    9898     
    9999    def _get_conn(self): 
  • trunk/storage/storepypgsql.py

    r200 r201  
    7171     
    7272    sql_name_max_length = 63 
    73     close_connection_method = 'finish' 
    7473    decompiler = PgSQLDecompiler 
    7574    toAdapter = AdapterToPgSQL() 
     
    7776     
    7877    def __init__(self, name, arena, allOptions={}): 
    79         db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    80          
    8178        # connstring = (host=h port=p dbname=d user=u password=p options=o tty=t) 
    8279        self.connstring = allOptions['Connect'] 
     
    8582            k, v = atom.split("=", 1) 
    8683            setattr(self, k, v) 
     84        db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    8785     
    8886    def sql_name(self, name, quoted=True): 
     
    9795        except libpq.DatabaseError, x: 
    9896            if x.args[0].startswith('could not connect'): 
    99                 raise db.OutOfConnectionsError 
     97                raise db.OutOfConnectionsError() 
    10098            raise 
     99     
     100    def _del_conn(self, conn): 
     101        conn.finish() 
    101102     
    102103    def _template_conn(self): 
  • trunk/storage/storesqlite.py

    r200 r201  
    160160     
    161161    def __init__(self, name, arena, allOptions={}): 
    162         db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    163          
    164162        dbfile = allOptions.get('Database', '') 
    165163        if not os.path.isabs(dbfile): 
    166164            dbfile = os.path.join(os.getcwd(), dbfile) 
    167165        self.database = dbfile 
    168          
    169166        self.mode = int(allOptions.get('Mode', '0755'), 8) 
     167        db.StorageManagerDB.__init__(self, name, arena, allOptions) 
    170168     
    171169    def sql_name(self, name, quoted=True): 
     
    257255     
    258256    def join(self, unitjoin): 
    259         # SQLite doesn't do nested JOINs, but instead applies them 
    260         # in order. Therefore, we need a single ON-clause at th
    261         # end of the list of tables. For example: 
    262         # "From a LEFT JOIN b LEFT JOIN c ON a.ID = b.ID AND b.Name = c.Name 
     257        # SQLite doesn't do nested JOINs, but instead applies 
     258        # them in order. Therefore, we need a single ON-claus
     259        # at the end of the list of tables. For example: 
     260        # "From a LEFT JOIN b LEFT JOIN c ON a.ID = b.ID AND b.Name = c.Name" 
    263261        joins, on_clauses = self._join(unitjoin) 
    264262        return joins + " ON " + " AND ".join(on_clauses)