Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/sockets.py

Revision 12 (checked in by fumanchu, 9 years ago)

1. Rewrote UnitProperty? to enable declaration within class body (instead of after with set_property).
2. Modified storage.Adapter to deny default coercers.
3. Fixed scope problems when unpickling logic.Expressions.

Line 
1 import fixedpoint
2 try:
3     import cPickle as pickle
4 except ImportError:
5     import pickle
6 import socket
7 import select
8
9
10 def stream(unit):
11     data = {}
12     if isinstance(unit, dict):
13         for key, value in unit.iteritems():
14             if isinstance(value, fixedpoint.FixedPoint):
15                 # Unpicklable
16                 data[key] = (value.n, value.p)
17             else:
18                 data[key] = value
19     else:
20         # Assume it's a dejavu.Unit.
21         for key in unit.__class__.properties():
22             value = getattr(unit, key)
23             if isinstance(value, fixedpoint.FixedPoint):
24                 # Unpicklable
25                 data[key] = (value.n, value.p)
26             else:
27                 data[key] = value
28     return pickle.dumps(data)
29
30 def destream(unit, data):
31     # data will be a pickled dictionary of properties for a unit.
32     attrdict = pickle.loads(data)
33     cls = unit.__class__
34     for key in cls.properties():
35         value = attrdict[key]
36         if cls.property_type(key) == fixedpoint.FixedPoint:
37             if value is not None:
38                 n, p = value
39                 value = fixedpoint.FixedPoint(0, 2)
40                 value.n, value.p = n, p
41         # Set the attribute directly to avoid __set__ overhead.
42         unit._properties[key] = value
43
44 def dechunk(data):
45     """Extract chunks from a stream of data."""
46     chunks = []
47     while data:
48         size = (256 * int(ord(data[0]))) + int(ord(data[1]))
49         chunks.append(data[2:size + 2])
50         data = data[size + 2:]
51     return chunks
52
53 def sendall(conn, msg):
54     size = len(msg)
55     if size > (256 * 256):
56         raise ValueError("msg to send is too large (%s)." % size)
57     conn.sendall(chr(size >> 8))
58     conn.sendall(chr(size & 0xFF))
59     conn.sendall(msg)
60
61
62 class SocketClient(object):
63     """Client (dejavu) end of a socket for handling dejavu Units."""
64    
65     # For some reason, the default of '' for localhost doesn't work on Win2k.
66     def __init__(self, host='127.0.0.1', port=51111, timeout=5):
67         self.host = host
68         self.port = port
69         self.timeout = timeout
70    
71     def query(self, msg):
72         """Send msg to the peer and return the response(s) in a list.
73         
74         Socket responses specify their length in the first two
75         bytes (hi, lo). Then, the data follows. If the socket wishes to
76         return another "object", it will follow the data with another
77         length+data stream.
78         """
79         conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
80 ##        conn.settimeout(self.timeout)
81         conn.connect((self.host, self.port))
82         sendall(conn, msg)
83         conn.shutdown(1)
84        
85         # Receive reply.
86         data = []
87         while True:
88             try:
89                 chunk = conn.recv(1024)
90             except Exception, x:
91                 if x.args[0] != 10035:
92                     raise x
93             else:
94                 if chunk == '':
95                     break
96                 data.append(chunk)
97        
98         conn.close()
99        
100         data = dechunk(''.join(data))
101         if data and data[0] == 'ERROR':
102             raise pickle.loads(data[1])
103         return data
104
105
106 class SocketServer(object):
107     """Server end of a socket for handling dejavu Units.
108     
109     Use this class to build wrappers for non-standard databases or
110     other sources of dejavu Units.
111     """
112    
113     def open(self, host='127.0.0.1', port=51111, backlog=5):
114         """open(host='127.0.0.1', port=51111, backlog=5)."""
115         self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
116 ##        self.conn.setblocking(0)
117         self.conn.bind((host, port))
118         self.conn.listen(backlog)
119    
120     def read(self):
121         """read(). Read from the socket."""
122         readables, writables, e = select.select([self.conn], [self.conn], [], 1)
123         if self.conn in readables:
124             (sockobj, address) = self.conn.accept()
125            
126             data = []
127             while True:
128                 try:
129                     chunk = sockobj.recv(1024)
130                 except Exception, x:
131                     if x.args[0] != 10035:
132                         raise x
133                 else:
134                     if chunk == '':
135                         break
136                     data.append(chunk)
137             return sockobj, dechunk(''.join(data))[0]
138         else:
139             return None, ''
140    
141     def close(self):
142         self.conn.close()
143
Note: See TracBrowser for help on using the browser.