# $Id$ # An experimental streaming memcached client. This is a work in progress. # # Copyright (c) 2006 by Fredrik Lundh. All rights reserved. import asyncore, asynchat import os, socket, string, sys ## # A producer that chops up large data block into a stream of smaller # blocks. class Producer: def __init__(self, data): self.data = data self.pos = 0 def more(self): pos = self.pos + 512 block = self.data[self.pos:pos] if block: self.pos = pos return block else: self.data = "" return "" ## # Command handler base class. The {@link Command.send} method issues # a command, the {@link Command.recv} method configures the client to # deal with the command response. class Command: def __init__(self, consumer, *command): self.command = " ".join(command) + "\r\n" self.consumer = consumer self.result = None ## # Adds a command to the client's send queue. def send(self, client): client.push(self.command) ## # Configures the client to deal with the command response. This # should return the next block handler, or None if no response is # expected. def recv(self, client): client.set_terminator("\r\n") self.data = "" return self.line ## # Handles the first response line. def line(self, client, data): if data: self.data += data return self.line else: self.consumer.feed(self.data) self.consumer.close() return None class GetCommand(Command): def __init__(self, consumer, keys): Command.__init__(self, consumer, "get", *keys) def line(self, client, data): # handle response line if data: self.data += data else: # print "<-", repr(self.data) value = self.data.split() self.data = "" if not value: return self.line if value[0] == "VALUE": client.set_terminator(int(value[3])) self.consumer.open(value[1]) return self.body elif value[0] == "END": return None # end of transmission raise IOError("unknown response: %s" % repr(self.data)) def body(self, client, data): # handle body if data: self.consumer.feed(data) return else: self.consumer.close() return self.recv(client) # start all over class SetCommand(Command): def __init__(self, consumer, cmd, key, body): Command.__init__(self, consumer, cmd, key, "0", "0", str(len(body))) self.body = body def send(self, client): client.push(self.command) client.push_with_producer(Producer(self.body)) client.push("\r\n") class StatsCommand(Command): def __init__(self, consumer): Command.__init__(self, consumer, "stats") self.open() def line(self, client, data): # handle response line if data: self.data += data else: if self.data == "END": self.close() return None self.feed(self.data) self.data = "" return self.line def open(self): pass def feed(self, data): print data def close(self): pass ## # Asynchronous memcache communication manager. class asyncMemCache(asynchat.async_chat): def __init__(self, host="localhost", port=11211): asynchat.async_chat.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((host, port)) self.set_terminator(1) self.feed = None # points to the active command self.commands = [] def handle_connect(self): pass def handle_expt(self): if sys.platform == "win32": print "NO SERVER" self.close() def collect_incoming_data(self, data): if self.feed: self.feed(self, data) else: # FIXME: report error to consumer? print "IGNORE", repr(data), len(self.commands) raise IOError def found_terminator(self): if self.feed: self.feed = self.feed(self, None) else: # FIXME: report error to consumer? print "IGNORE", len(self.commands) raise IOError if self.feed is None: try: command = self.commands.pop(0) except IndexError: pass else: self.feed = command.recv(self) def do(self, command): command.send(self) if self.feed: self.commands.append(command) else: self.feed = command.recv(self) # ---------------------------------------------------------------- # memcache commands def get(self, consumer, *keys): self.do(GetCommand(consumer, keys)) def set(self, consumer, key, data): self.do(SetCommand(consumer, "set", key, data)) def add(self, consumer, key, data): self.do(SetCommand(consumer, "add", key, data)) def replace(self, consumer, key, data): self.do(SetCommand(consumer, "replace", key, data)) def delete(self, consumer, key, time=0): self.do(Command(consumer, "delete", key, time)) def incr(self, consumer, key, value=1): self.do(Command(consumer, "incr", key, value)) def decr(self, consumer, key, value=1): self.do(Command(consumer, "decr", key, value)) def stats(self, consumer): self.do(StatsCommand(consumer, )) def version(self, consumer): self.do(Command(consumer, "version"))