Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/sockets.py

Revision 191 (checked in by fumanchu, 7 years ago)

Fix for #49 (Unit._properties should be a list, not a dict). cls._properties no longer exists; use cls.properties (a list now, not a function) instead.

  • Property svn:eol-style set to native
Line 
1 try:
2     import cPickle as pickle
3 except ImportError:
4     import pickle
5
6 import socket
7 import select
8
9
10 def _define_fixedpoint_states():
11     """Add methods to fixedpoint to support pickling."""
12     import fixedpoint
13    
14     if not hasattr(fixedpoint.FixedPoint, "__getstate__"):
15         def __getstate__(self):
16             return (self.n, self.p)
17         fixedpoint.FixedPoint.__getstate__ = __getstate__
18        
19         def __setstate__(self, v):
20             self.n, self.p = v
21         fixedpoint.FixedPoint.__setstate__ = __setstate__
22
23
24 def stream(unit):
25     data = {}
26     if isinstance(unit, dict):
27         for key, value in unit.iteritems():
28             if value.__class__.__name__ == "FixedPoint":
29                 _define_fixedpoint_states()
30             data[key] = value
31     else:
32         # Assume it's a dejavu.Unit.
33         for key in unit.properties:
34             value = getattr(unit, key)
35             if value.__class__.__name__ == "FixedPoint":
36                 _define_fixedpoint_states()
37             data[key] = value
38     return pickle.dumps(data)
39
40 def destream(unit, data):
41     # data will be a pickled dictionary of properties for a unit.
42     try:
43         attrdict = pickle.loads(data)
44         if not isinstance(attrdict, dict):
45             raise TypeError(attrdict)
46     except pickle.UnpicklingError:
47         raise TypeError(data)
48     for key in unit.properties:
49         # Set the attribute directly to avoid __set__ overhead.
50         unit._properties[key] = attrdict[key]
51
52 def dechunk(data):
53     """Extract chunks from a stream of data."""
54     chunks = []
55     while data:
56         size = (256 * int(ord(data[0]))) + int(ord(data[1]))
57         chunks.append(data[2:size + 2])
58         data = data[size + 2:]
59     return chunks
60
61 def sendall(conn, msg):
62     size = len(msg)
63     if size > (256 * 256):
64         raise ValueError("msg to send is too large (%s)." % size)
65     conn.sendall(chr(size >> 8))
66     conn.sendall(chr(size & 0xFF))
67     conn.sendall(msg)
68
69
70 class SocketClient(object):
71     """Client (dejavu) end of a socket for handling dejavu Units."""
72    
73     # For some reason, the default of '' for localhost doesn't work on Win2k.
74     def __init__(self, host='127.0.0.1', port=51111, timeout=5):
75         self.host = host
76         self.port = port
77         self.timeout = timeout
78    
79     def query(self, msg):
80         """Send msg to the peer and return the response(s) in a list.
81         
82         Socket responses specify their length in the first two
83         bytes (hi, lo). Then, the data follows. If the socket wishes to
84         return another "object", it will follow the data with another
85         length+data stream.
86         """
87         conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
88         # Not sure if we should have a timeout or not--
89         # some users getting "Address already in use" errors..?
90         conn.settimeout(self.timeout)
91 ##        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
92         conn.connect((self.host, self.port))
93         sendall(conn, msg)
94         conn.shutdown(1)
95        
96         # Receive reply.
97         data = []
98         while True:
99             try:
100                 chunk = conn.recv(1024)
101             except Exception, x:
102                 if x.args[0] != 10035:
103                     raise x
104             else:
105                 if chunk == '':
106                     break
107                 data.append(chunk)
108        
109         conn.close()
110        
111         data = dechunk(''.join(data))
112         if data and data[0] == 'ERROR':
113             # The value in data[1] is a pickled Exception.
114             raise pickle.loads(data[1])
115         return data
116
117
118 class SocketServer(object):
119     """Server end of a socket for handling dejavu Units.
120     
121     Use this class to build wrappers for non-standard databases or
122     other sources of dejavu Units.
123     """
124    
125     def open(self, host='127.0.0.1', port=51111, backlog=5):
126         """open(host='127.0.0.1', port=51111, backlog=5)."""
127         self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
128 ##        self.conn.setblocking(0)
129         self.conn.bind((host, port))
130         self.conn.listen(backlog)
131    
132     def read(self):
133         """read(). Read from the socket."""
134         readables, writables, e = select.select([self.conn], [self.conn], [], 1)
135         if self.conn in readables:
136             (sockobj, address) = self.conn.accept()
137            
138             data = []
139             while True:
140                 try:
141                     chunk = sockobj.recv(1024)
142                 except Exception, x:
143                     if x.args[0] != 10035:
144                         raise x
145                 else:
146                     if chunk == '':
147                         break
148                     data.append(chunk)
149             return sockobj, dechunk(''.join(data))[0]
150         else:
151             return None, ''
152    
153     def close(self):
154         self.conn.close()
155
Note: See TracBrowser for help on using the browser.