Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/sockets.py

Revision 16 (checked in by fumanchu, 9 years ago)

Doc, docstring, and comment updates

Line 
1 import fixedpoint
2 try:
3     import cPickle as pickle
4 except ImportError:
5     import pickle
6 import socket
7 import select
8
9
10 def stream(unit):
11     data = {}
12     if isinstance(unit, dict):
13         for key, value in unit.iteritems():
14             if isinstance(value, fixedpoint.FixedPoint):
15                 # Unpicklable
16                 data[key] = (value.n, value.p)
17             else:
18                 data[key] = value
19     else:
20         # Assume it's a dejavu.Unit.
21         for key in unit.__class__.properties():
22             value = getattr(unit, key)
23             if isinstance(value, fixedpoint.FixedPoint):
24                 # Unpicklable
25                 data[key] = (value.n, value.p)
26             else:
27                 data[key] = value
28     return pickle.dumps(data)
29
30 def destream(unit, data):
31     # data will be a pickled dictionary of properties for a unit.
32     attrdict = pickle.loads(data)
33     cls = unit.__class__
34     for key in cls.properties():
35         value = attrdict[key]
36         if cls.property_type(key) == fixedpoint.FixedPoint:
37             if value is not None:
38                 n, p = value
39                 value = fixedpoint.FixedPoint(0, 2)
40                 value.n, value.p = n, p
41         # Set the attribute directly to avoid __set__ overhead.
42         unit._properties[key] = value
43
44 def dechunk(data):
45     """Extract chunks from a stream of data."""
46     chunks = []
47     while data:
48         size = (256 * int(ord(data[0]))) + int(ord(data[1]))
49         chunks.append(data[2:size + 2])
50         data = data[size + 2:]
51     return chunks
52
53 def sendall(conn, msg):
54     size = len(msg)
55     if size > (256 * 256):
56         raise ValueError("msg to send is too large (%s)." % size)
57     conn.sendall(chr(size >> 8))
58     conn.sendall(chr(size & 0xFF))
59     conn.sendall(msg)
60
61
62 class SocketClient(object):
63     """Client (dejavu) end of a socket for handling dejavu Units."""
64    
65     # For some reason, the default of '' for localhost doesn't work on Win2k.
66     def __init__(self, host='127.0.0.1', port=51111, timeout=5):
67         self.host = host
68         self.port = port
69         self.timeout = timeout
70    
71     def query(self, msg):
72         """Send msg to the peer and return the response(s) in a list.
73         
74         Socket responses specify their length in the first two
75         bytes (hi, lo). Then, the data follows. If the socket wishes to
76         return another "object", it will follow the data with another
77         length+data stream.
78         """
79         conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
80 ##        conn.settimeout(self.timeout)
81         conn.connect((self.host, self.port))
82         sendall(conn, msg)
83         conn.shutdown(1)
84        
85         # Receive reply.
86         data = []
87         while True:
88             try:
89                 chunk = conn.recv(1024)
90             except Exception, x:
91                 if x.args[0] != 10035:
92                     raise x
93             else:
94                 if chunk == '':
95                     break
96                 data.append(chunk)
97        
98         conn.close()
99        
100         data = dechunk(''.join(data))
101         if data and data[0] == 'ERROR':
102             # The value in data[1] is a pickled Exception.
103             raise pickle.loads(data[1])
104         return data
105
106
107 class SocketServer(object):
108     """Server end of a socket for handling dejavu Units.
109     
110     Use this class to build wrappers for non-standard databases or
111     other sources of dejavu Units.
112     """
113    
114     def open(self, host='127.0.0.1', port=51111, backlog=5):
115         """open(host='127.0.0.1', port=51111, backlog=5)."""
116         self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
117 ##        self.conn.setblocking(0)
118         self.conn.bind((host, port))
119         self.conn.listen(backlog)
120    
121     def read(self):
122         """read(). Read from the socket."""
123         readables, writables, e = select.select([self.conn], [self.conn], [], 1)
124         if self.conn in readables:
125             (sockobj, address) = self.conn.accept()
126            
127             data = []
128             while True:
129                 try:
130                     chunk = sockobj.recv(1024)
131                 except Exception, x:
132                     if x.args[0] != 10035:
133                         raise x
134                 else:
135                     if chunk == '':
136                         break
137                     data.append(chunk)
138             return sockobj, dechunk(''.join(data))[0]
139         else:
140             return None, ''
141    
142     def close(self):
143         self.conn.close()
144
Note: See TracBrowser for help on using the browser.