Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/sockets.py

Revision 51 (checked in by fumanchu, 8 years ago)

1. Changed arena.roster to ._registered_classes. Dropped containers.Prism and .Index
2. First attempt at arena.migrate (migrate class data from one store to another).
3. Fixed weakref bug in db pooling.
4. db.SM.reserve() now writes all data, not just ID.
5. Various store bugs (adapters, mostly).

Line 
1 try:
2     import cPickle as pickle
3 except ImportError:
4     import pickle
5
6 import socket
7 import select
8
9
10 def _define_fixedpoint_states():
11     """Add methods to fixedpoint to support pickling."""
12     import fixedpoint
13    
14     if not hasattr(fixedpoint.FixedPoint, "__getstate__"):
15         def __getstate__(self):
16             return (self.n, self.p)
17         fixedpoint.FixedPoint.__getstate__ = __getstate__
18        
19         def __setstate__(self, v):
20             self.n, self.p = v
21         fixedpoint.FixedPoint.__setstate__ = __setstate__
22
23
24 def stream(unit):
25     data = {}
26     if isinstance(unit, dict):
27         for key, value in unit.iteritems():
28             if value.__class__.__name__ == "FixedPoint":
29                 _define_fixedpoint_states()
30             data[key] = value
31     else:
32         # Assume it's a dejavu.Unit.
33         for key in unit.__class__.properties():
34             value = getattr(unit, key)
35             if value.__class__.__name__ == "FixedPoint":
36                 _define_fixedpoint_states()
37             data[key] = value
38     return pickle.dumps(data)
39
40 def destream(unit, data):
41     # data will be a pickled dictionary of properties for a unit.
42     attrdict = pickle.loads(data)
43     cls = unit.__class__
44     for key in cls.properties():
45         # Set the attribute directly to avoid __set__ overhead.
46         unit._properties[key] = attrdict[key]
47
48 def dechunk(data):
49     """Extract chunks from a stream of data."""
50     chunks = []
51     while data:
52         size = (256 * int(ord(data[0]))) + int(ord(data[1]))
53         chunks.append(data[2:size + 2])
54         data = data[size + 2:]
55     return chunks
56
57 def sendall(conn, msg):
58     size = len(msg)
59     if size > (256 * 256):
60         raise ValueError("msg to send is too large (%s)." % size)
61     conn.sendall(chr(size >> 8))
62     conn.sendall(chr(size & 0xFF))
63     conn.sendall(msg)
64
65
66 class SocketClient(object):
67     """Client (dejavu) end of a socket for handling dejavu Units."""
68    
69     # For some reason, the default of '' for localhost doesn't work on Win2k.
70     def __init__(self, host='127.0.0.1', port=51111, timeout=5):
71         self.host = host
72         self.port = port
73         self.timeout = timeout
74    
75     def query(self, msg):
76         """Send msg to the peer and return the response(s) in a list.
77         
78         Socket responses specify their length in the first two
79         bytes (hi, lo). Then, the data follows. If the socket wishes to
80         return another "object", it will follow the data with another
81         length+data stream.
82         """
83         conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
84 ##        conn.settimeout(self.timeout)
85         conn.connect((self.host, self.port))
86         sendall(conn, msg)
87         conn.shutdown(1)
88        
89         # Receive reply.
90         data = []
91         while True:
92             try:
93                 chunk = conn.recv(1024)
94             except Exception, x:
95                 if x.args[0] != 10035:
96                     raise x
97             else:
98                 if chunk == '':
99                     break
100                 data.append(chunk)
101        
102         conn.close()
103        
104         data = dechunk(''.join(data))
105         if data and data[0] == 'ERROR':
106             # The value in data[1] is a pickled Exception.
107             raise pickle.loads(data[1])
108         return data
109
110
111 class SocketServer(object):
112     """Server end of a socket for handling dejavu Units.
113     
114     Use this class to build wrappers for non-standard databases or
115     other sources of dejavu Units.
116     """
117    
118     def open(self, host='127.0.0.1', port=51111, backlog=5):
119         """open(host='127.0.0.1', port=51111, backlog=5)."""
120         self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
121 ##        self.conn.setblocking(0)
122         self.conn.bind((host, port))
123         self.conn.listen(backlog)
124    
125     def read(self):
126         """read(). Read from the socket."""
127         readables, writables, e = select.select([self.conn], [self.conn], [], 1)
128         if self.conn in readables:
129             (sockobj, address) = self.conn.accept()
130            
131             data = []
132             while True:
133                 try:
134                     chunk = sockobj.recv(1024)
135                 except Exception, x:
136                     if x.args[0] != 10035:
137                         raise x
138                 else:
139                     if chunk == '':
140                         break
141                     data.append(chunk)
142             return sockobj, dechunk(''.join(data))[0]
143         else:
144             return None, ''
145    
146     def close(self):
147         self.conn.close()
148
Note: See TracBrowser for help on using the browser.