Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/sockets.py

Revision 88 (checked in by fumanchu, 8 years ago)

Set eol-style:native on all .py files.

  • Property svn:eol-style set to native
Line 
1 try:
2     import cPickle as pickle
3 except ImportError:
4     import pickle
5
6 import socket
7 import select
8
9
10 def _define_fixedpoint_states():
11     """Add methods to fixedpoint to support pickling."""
12     import fixedpoint
13    
14     if not hasattr(fixedpoint.FixedPoint, "__getstate__"):
15         def __getstate__(self):
16             return (self.n, self.p)
17         fixedpoint.FixedPoint.__getstate__ = __getstate__
18        
19         def __setstate__(self, v):
20             self.n, self.p = v
21         fixedpoint.FixedPoint.__setstate__ = __setstate__
22
23
24 def stream(unit):
25     data = {}
26     if isinstance(unit, dict):
27         for key, value in unit.iteritems():
28             if value.__class__.__name__ == "FixedPoint":
29                 _define_fixedpoint_states()
30             data[key] = value
31     else:
32         # Assume it's a dejavu.Unit.
33         for key in unit.__class__.properties():
34             value = getattr(unit, key)
35             if value.__class__.__name__ == "FixedPoint":
36                 _define_fixedpoint_states()
37             data[key] = value
38     return pickle.dumps(data)
39
40 def destream(unit, data):
41     # data will be a pickled dictionary of properties for a unit.
42     try:
43         attrdict = pickle.loads(data)
44         if not isinstance(attrdict, dict):
45             raise TypeError(attrdict)
46     except pickle.UnpicklingError:
47         raise TypeError(data)
48     cls = unit.__class__
49     for key in cls.properties():
50         # Set the attribute directly to avoid __set__ overhead.
51         unit._properties[key] = attrdict[key]
52
53 def dechunk(data):
54     """Extract chunks from a stream of data."""
55     chunks = []
56     while data:
57         size = (256 * int(ord(data[0]))) + int(ord(data[1]))
58         chunks.append(data[2:size + 2])
59         data = data[size + 2:]
60     return chunks
61
62 def sendall(conn, msg):
63     size = len(msg)
64     if size > (256 * 256):
65         raise ValueError("msg to send is too large (%s)." % size)
66     conn.sendall(chr(size >> 8))
67     conn.sendall(chr(size & 0xFF))
68     conn.sendall(msg)
69
70
71 class SocketClient(object):
72     """Client (dejavu) end of a socket for handling dejavu Units."""
73    
74     # For some reason, the default of '' for localhost doesn't work on Win2k.
75     def __init__(self, host='127.0.0.1', port=51111, timeout=5):
76         self.host = host
77         self.port = port
78         self.timeout = timeout
79    
80     def query(self, msg):
81         """Send msg to the peer and return the response(s) in a list.
82         
83         Socket responses specify their length in the first two
84         bytes (hi, lo). Then, the data follows. If the socket wishes to
85         return another "object", it will follow the data with another
86         length+data stream.
87         """
88         conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
89         # Not sure if we should have a timeout or not--
90         # some users getting "Address already in use" errors..?
91         conn.settimeout(self.timeout)
92 ##        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
93         conn.connect((self.host, self.port))
94         sendall(conn, msg)
95         conn.shutdown(1)
96        
97         # Receive reply.
98         data = []
99         while True:
100             try:
101                 chunk = conn.recv(1024)
102             except Exception, x:
103                 if x.args[0] != 10035:
104                     raise x
105             else:
106                 if chunk == '':
107                     break
108                 data.append(chunk)
109        
110         conn.close()
111        
112         data = dechunk(''.join(data))
113         if data and data[0] == 'ERROR':
114             # The value in data[1] is a pickled Exception.
115             raise pickle.loads(data[1])
116         return data
117
118
119 class SocketServer(object):
120     """Server end of a socket for handling dejavu Units.
121     
122     Use this class to build wrappers for non-standard databases or
123     other sources of dejavu Units.
124     """
125    
126     def open(self, host='127.0.0.1', port=51111, backlog=5):
127         """open(host='127.0.0.1', port=51111, backlog=5)."""
128         self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129 ##        self.conn.setblocking(0)
130         self.conn.bind((host, port))
131         self.conn.listen(backlog)
132    
133     def read(self):
134         """read(). Read from the socket."""
135         readables, writables, e = select.select([self.conn], [self.conn], [], 1)
136         if self.conn in readables:
137             (sockobj, address) = self.conn.accept()
138            
139             data = []
140             while True:
141                 try:
142                     chunk = sockobj.recv(1024)
143                 except Exception, x:
144                     if x.args[0] != 10035:
145                         raise x
146                 else:
147                     if chunk == '':
148                         break
149                     data.append(chunk)
150             return sockobj, dechunk(''.join(data))[0]
151         else:
152             return None, ''
153    
154     def close(self):
155         self.conn.close()
156
Note: See TracBrowser for help on using the browser.