Contact: fumanchu@aminus.org

Log in as guest/dejavu to create tickets

root/branches/crazycache/dejavu/storage/memcache.py

Revision 564 (checked in by fumanchu, 1 year ago)

New get_multi in memcache.py.

Line 
1 """Memcached client for use with storememcached.MemcachedStorageManager."""
2
3 try:
4     import cPickle as pickle
5 except ImportError:
6     import pickle
7 import socket
8 import threading
9 import time
10
11
12 class Flagset(object):
13     """A set of coercion rules for memcache data encoding."""
14    
15     def push(self, value):
16         """Return (flags, coerced value) for output."""
17         if isinstance(value, basestring):
18             return 0, value
19         elif isinstance(value, int):
20             return 2, str(value)
21         elif isinstance(value, long):
22             return 4, str(value)
23         return 1, pickle.dumps(value, 2)
24    
25     def pull(self, value, flags):
26         """Return value, coerced for input according to the given flags."""
27         if flags == 0:
28             return value
29         elif flags & 2:
30             return int(value)
31         elif flags & 4:
32             return long(value)
33         elif flags & 1:
34             try:
35                 return pickle.loads(value)
36             except:
37                 return None
38         raise ValueError("unknown flags %x for value %r" % (flags, value))
39
40
41 class Client(object):
42     """Memcache client interface.
43     
44     servers: a sequence of "host[:port]" strings.
45     """
46    
47     flagset = Flagset()
48    
49     def __init__(self, servers):
50         self.servers = [Server(s) for s in servers]
51    
52     def _key_to_server(self, key):
53         """returns a server without locking it"""
54         return self.servers[hash(key) % len(self.servers)]
55    
56     def _get_server(self, key):
57         """returns a server and locks it"""
58         server = self._key_to_server(key)
59         if server.acquire():
60             return server
61         return None
62    
63     def delete(self, key, time=0):
64         """Delete the cached value for the given key."""
65         if time:
66             cmd = "delete %s %d" % (key, time)
67         else:
68             cmd = "delete %s" % key
69         server = self._get_server(key)
70         if server:
71             try:
72                 r = server.store(cmd)
73                 if r not in ('DELETED', 'NOT_FOUND'):
74                     raise IOError(r)
75             finally:
76                 server.release()
77    
78     def _set(self, cmd, key, val, time):
79         server = self._get_server(key)
80         if server:
81             try:
82                 flags, val = self.flagset.push(val)
83                 msg = ("%s %s %d %d %d\r\n%s" %
84                        (cmd, key, flags, time, len(val), val))
85                 r = server.store(msg)
86                 if r != "STORED":
87                     raise IOError(r)
88             finally:
89                 server.release()
90    
91     def add(self, key, val, time=0):
92         """Add a new key/value pair to the cache (unless it exists)."""
93         self._set("add", key, val, time)
94    
95     def replace(self, key, val, time=0):
96         """Replace an existing key/value pair, only if it exists."""
97         self._set("replace", key, val, time)
98    
99     def set(self, key, val, time=0):
100         """Set the given key/value pair."""
101         self._set("set", key, val, time)
102    
103     def get(self, key):
104         """Retrieve the cached value for the given key."""
105         server = self._get_server(key)
106         if server:
107             try:
108                 server.retrieve("get %s" % key)
109                 line = server.readline()
110                 if not line.startswith('VALUE'):
111                     return None
112                
113                 _, _, flags, length = line.split()
114                 buf = server.recv(int(length) + 2)[:-2]
115                
116                 if server.readline() != "END":
117                     return None
118                 return self.flagset.pull(buf, int(flags))
119             finally:
120                 server.release()
121    
122     def get_multi(self, keys):
123         """Return a dict of cached values for the given keys.
124         
125         The returned dict will contain only those entries which
126         were found in the cache.
127         """
128        
129         # the list of memcached servers we need
130         servers = {}
131         for key in keys:
132             server = self._key_to_server(key)
133             if server not in servers:
134                 servers[server] = [str(key)]
135             else:
136                 servers[server].append(str(key))
137        
138         results = {}
139         try:
140             # Connect to all servers
141             for server in servers:
142                 if not server.acquire():
143                     # server couldn't be acquired (no socket), remove it
144                     del servers[server]
145                     continue
146            
147             # request the keys in every server
148             for server, lkeys in servers.items():
149                 try:
150                     server.retrieve("get %s" % " ".join(lkeys))
151                 except socket.error:
152                     # remove the dead server
153                     del servers[server]
154                     server.release()
155            
156             # Read the results from each server
157             for server in servers:
158                 try:
159                     line = server.readline()
160                     while line and line.strip() != 'END':
161                         _, key, flags, length = line.split()
162                         buf = server.recv(int(length) + 2)[:-2]
163                         results[key] = self.flagset.pull(buf, int(flags))
164                         line = server.readline()
165                 except socket.error:
166                     pass
167         finally:
168             for server in servers:
169                 server.release()
170        
171         return results
172    
173     def flush_all(self):
174         """Expire all data currently in the memcache servers."""
175         for s in self.servers:
176             if s.acquire():
177                 try:
178                     s.flush_all()
179                 finally:
180                     s.release()
181    
182     def disconnect_all(self):
183         """Disconnect from all servers."""
184         for s in self.servers:
185             s.close()
186
187
188 class Server(object):
189    
190     deaduntil = 0
191     socket = None
192     retry_delay = 5
193    
194     def __init__(self, host):
195         if ":" in host:
196             self.ip, port = host.split(":")
197             self.port = int(port)
198         else:
199             self.ip, self.port = host, 11211
200         self._socket_lock = threading.Lock()
201    
202     def mark_dead(self):
203         self.deaduntil = time.time() + self.retry_delay
204         self.close()
205    
206     def acquire(self):
207         if self.deaduntil and self.deaduntil > time.time():
208             return None
209         else:
210             self.deaduntil = 0
211        
212         if not self.socket:
213             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
214             s.settimeout(1.0)
215             try:
216                 s.connect((self.ip, self.port))
217             except (socket.error, socket.timeout), msg:
218                 # For #120
219                 try:
220                     detail = msg[1]
221                 except IndexError:
222                     detail = repr(msg)
223                 self.mark_dead()
224                 return None
225             self.socket = s
226        
227         self._socket_lock.acquire()
228         return self.socket
229    
230     def release(self):
231         """Release a connection."""
232         self._socket_lock.release()
233    
234     def close(self):
235         if self.socket:
236             self.socket.close()
237             self.socket = None
238    
239     def store(self, command):
240         """Send the given command string and return its single-line response."""
241         try:
242             self.socket.sendall(command + '\r\n')
243         except socket.error, exc:
244             self.mark_dead()
245             raise
246         return self.readline()
247    
248     def retrieve(self, command):
249         """Send the given command string and let the caller recv the response."""
250         try:
251             self.socket.sendall(command + '\r\n')
252         except socket.error, exc:
253             self.mark_dead()
254             raise
255    
256     def readline(self):
257         try:
258             buffers = ''
259             recv = self.socket.recv
260             while 1:
261                 data = recv(1)
262                 if not data:
263                     self.mark_dead()
264                     break
265                 if data == '\n' and buffers and buffers[-1] == '\r':
266                     return(buffers[:-1])
267                 buffers = buffers + data
268             return buffers
269         except socket.error, exc:
270             self.mark_dead()
271             raise
272    
273     def recv(self, size):
274         try:
275             buf = ''
276             buflen = 0
277             recv = self.socket.recv
278             while True:
279                 buf += recv(size - buflen)
280                 buflen = len(buf)
281                 if buflen >= size:
282                     break
283             if buflen != size:
284                 raise IOError("%d bytes received (expected %d)." %
285                               (buflen, size), buf)
286             if not buf.endswith('\r\n'):
287                 raise IOError("Response did not end with CRLF.", buf)
288            
289             return buf
290         except socket.error, exc:
291             self.mark_dead()
292             raise
293    
294     def flush_all(self):
295         """Expire all data currently in the server."""
296         r = self.store('flush_all')
297         if r != "OK":
298             raise IOError(r)
299    
300     def __repr__(self):
301         d = ''
302         if self.deaduntil:
303             d = " (dead until %d)" % self.deaduntil
304         return "<memcached at %s port: %d%s>" % (self.ip, self.port, d)
305
306
Note: See TracBrowser for help on using the browser.