Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

I think I've seen this ORM somewhere before...

root/trunk/storage/sockets.py

Revision 62 (checked in by fumanchu, 8 years ago)

1. Tests added for CachingProxy?, BurnedProxy?.
2. Edit tests added to zoo_fixture.py
3. Bug fixed in CachingProxy?.save (should not have checked dirty).
4. CachingProxy? now holds current Timer and cancels it on shutdown.

Line 
1 try:
2     import cPickle as pickle
3 except ImportError:
4     import pickle
5
6 import socket
7 import select
8
9
10 def _define_fixedpoint_states():
11     """Add methods to fixedpoint to support pickling."""
12     import fixedpoint
13    
14     if not hasattr(fixedpoint.FixedPoint, "__getstate__"):
15         def __getstate__(self):
16             return (self.n, self.p)
17         fixedpoint.FixedPoint.__getstate__ = __getstate__
18        
19         def __setstate__(self, v):
20             self.n, self.p = v
21         fixedpoint.FixedPoint.__setstate__ = __setstate__
22
23
24 def stream(unit):
25     data = {}
26     if isinstance(unit, dict):
27         for key, value in unit.iteritems():
28             if value.__class__.__name__ == "FixedPoint":
29                 _define_fixedpoint_states()
30             data[key] = value
31     else:
32         # Assume it's a dejavu.Unit.
33         for key in unit.__class__.properties():
34             value = getattr(unit, key)
35             if value.__class__.__name__ == "FixedPoint":
36                 _define_fixedpoint_states()
37             data[key] = value
38     return pickle.dumps(data)
39
40 def destream(unit, data):
41     # data will be a pickled dictionary of properties for a unit.
42     attrdict = pickle.loads(data)
43     cls = unit.__class__
44     for key in cls.properties():
45         # Set the attribute directly to avoid __set__ overhead.
46         unit._properties[key] = attrdict[key]
47
48 def dechunk(data):
49     """Extract chunks from a stream of data."""
50     chunks = []
51     while data:
52         size = (256 * int(ord(data[0]))) + int(ord(data[1]))
53         chunks.append(data[2:size + 2])
54         data = data[size + 2:]
55     return chunks
56
57 def sendall(conn, msg):
58     size = len(msg)
59     if size > (256 * 256):
60         raise ValueError("msg to send is too large (%s)." % size)
61     conn.sendall(chr(size >> 8))
62     conn.sendall(chr(size & 0xFF))
63     conn.sendall(msg)
64
65
66 class SocketClient(object):
67     """Client (dejavu) end of a socket for handling dejavu Units."""
68    
69     # For some reason, the default of '' for localhost doesn't work on Win2k.
70     def __init__(self, host='127.0.0.1', port=51111, timeout=5):
71         self.host = host
72         self.port = port
73         self.timeout = timeout
74    
75     def query(self, msg):
76         """Send msg to the peer and return the response(s) in a list.
77         
78         Socket responses specify their length in the first two
79         bytes (hi, lo). Then, the data follows. If the socket wishes to
80         return another "object", it will follow the data with another
81         length+data stream.
82         """
83         conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
84 ##        conn.settimeout(self.timeout)
85 ##        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
86         conn.connect((self.host, self.port))
87         sendall(conn, msg)
88         conn.shutdown(1)
89        
90         # Receive reply.
91         data = []
92         while True:
93             try:
94                 chunk = conn.recv(1024)
95             except Exception, x:
96                 if x.args[0] != 10035:
97                     raise x
98             else:
99                 if chunk == '':
100                     break
101                 data.append(chunk)
102        
103         conn.close()
104        
105         data = dechunk(''.join(data))
106         if data and data[0] == 'ERROR':
107             # The value in data[1] is a pickled Exception.
108             raise pickle.loads(data[1])
109         return data
110
111
112 class SocketServer(object):
113     """Server end of a socket for handling dejavu Units.
114     
115     Use this class to build wrappers for non-standard databases or
116     other sources of dejavu Units.
117     """
118    
119     def open(self, host='127.0.0.1', port=51111, backlog=5):
120         """open(host='127.0.0.1', port=51111, backlog=5)."""
121         self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
122 ##        self.conn.setblocking(0)
123         self.conn.bind((host, port))
124         self.conn.listen(backlog)
125    
126     def read(self):
127         """read(). Read from the socket."""
128         readables, writables, e = select.select([self.conn], [self.conn], [], 1)
129         if self.conn in readables:
130             (sockobj, address) = self.conn.accept()
131            
132             data = []
133             while True:
134                 try:
135                     chunk = sockobj.recv(1024)
136                 except Exception, x:
137                     if x.args[0] != 10035:
138                         raise x
139                 else:
140                     if chunk == '':
141                         break
142                     data.append(chunk)
143             return sockobj, dechunk(''.join(data))[0]
144         else:
145             return None, ''
146    
147     def close(self):
148         self.conn.close()
149
Note: See TracBrowser for help on using the browser.