给ssdb python 接口提速

pylist 291

ssdb 有三个python 操作库:

ssdb-py https://github.com/wrongwaycn/ssdb-py (类似redis.py 较慢,但方便,省掉后期处理)
pyssdb https://github.com/ifduyue/pyssdb (简单,支持连接池,推荐)
ssdb.py https://github.com/hit9/ssdb.py (输出和上面一样,速度很快,但不支持连接池,推荐)

主要瓶颈在解析返回数据,ssdb-py 和 pyssdb 都使用socket 的 readline() 方法。引入spp_py 来解析
spp_py 是专门用来解析ssdb 报文的,速度很快。

https://github.com/hit9/spp_py 1

安装spp_py

pip install spp

可以在ssdb-py 和 pyssdb 里引入spp_py。

以 ssdb-py 为例:

打开 /usr/local/lib/python2.7/dist-packages/ssdb/connection.py

在头部import

import spp

添加

class PythonParser(BaseParser):
    """
    Plain Python parsing class
    """
    encoding = None

    def __init__(self, socket_read_size):
        self.socket_read_size = socket_read_size
        self._sock = None
        self._buffer = None
        self.parser = None  # add

    def on_connect(self, connection):
        """
        Called when the socket connects
        """
        self._sock = connection._sock
        self._buffer = SocketBuffer(self._sock, self.socket_read_size)        
        if connection.decode_responses:
            self.encoding = connection.encoding
        self.parser = spp.Parser()  # add
            
    def on_disconnect(self):
        "Called when the socket disconnects"
        if self._sock is not None:
            self._sock.close()
            self._sock = None
        if self._buffer is not None:
            self._buffer.close()
            self._buffer = None
        self.encoding = None
        self.parser.clear()  # add
        self.parser = None  # add          

    def read_response(self):
        # add
        chunks = []
        while 1:
            buf = self._sock.recv(4096)

            if not isinstance(buf, bytes) and not len(buf):
                self.on_disconnect()
                raise socket.error('Socket closed on remote end')

            self.parser.feed(str(buf))
            chunk = self.parser.get()
            if chunk is not None:
                chunks.append(chunk)
                break
        return chunks[0]
        """ 
        # hidden
        try:
            lgt = int(self._buffer.readline())
        except ValueError:
            raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
        status = self._buffer.readline()
        if status not in RES_STATUS or lgt!=len(status):
            raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
        result = [status]
        print 'status:', status
        while True:
            lgt = self._buffer.readline()
            if lgt == '':
                break
            try:
                value = self._buffer.read(int(lgt))
            except ValueError:
                raise ConnectionError(RES_STATUS_MSG.ERROR)
            if isinstance(value, bytes) and self.encoding:
                value = value.decode(self.encoding)
            print 'value:', value
            result.append(value)

        return result
        """

提速效果

提速前每秒get 个数11111,
提速后每秒get 个数14285。

提速近三分之一,因为只get 一个键,当然是纯解析的提速。

对pyssdb 做改造

import spp

class Connection(object):
    def __init__(self, host='127.0.0.1', port=8888, socket_timeout=None):
        self.pid = os.getpid()
        self.host = host
        self.port = port
        self.socket_timeout = socket_timeout
        self._sock = None
        self._fp = None
        self._parser = None  # add

    def connect(self):
        if self._sock:
            return
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.socket_timeout)
            sock.connect((self.host, self.port))
            self._sock = sock
            self._fp = sock.makefile('r')
            self._parser = spp.Parser()  # add
        except socket.error:
            raise

    def disconnect(self):
        self._parser.clear()  # add
        if self._sock is None:
            return
        try:
            self._sock.close()
        except socket.error:
            pass
        self._sock = self._fp = self._parser = None  # edit

    close = disconnect

    def reconnect(self):
        self.disconnect()
        self.connect()

    def send(self, cmd, *args):
        if cmd == 'delete':
            cmd = 'del'
        self.last_cmd = cmd
        if self._sock is None:
            self.connect()
        args = (cmd, ) + args
        if isinstance(args[-1], int):
            args = args[:-1] + (str(args[-1]), )
        buf = ''.join('%d\n%s\n' % (len(i), i) for i in args) + '\n'
        self._sock.sendall(buf)

    def recv(self):
        # add
        chunks = []
        while 1:
            buf = self._sock.recv(4096)

            if not isinstance(buf, bytes) and not len(buf):
                self.close()
                raise socket.error('Socket closed on remote end')

            self._parser.feed(str(buf))
            chunk = self._parser.get()
            if chunk is not None:
                chunks.append(chunk)
                break
        ret = chunks[0]
        cmd = self.last_cmd
        '''
        # hidden
        cmd = self.last_cmd
        ret = []
        while True:
            line = self._fp.readline().rstrip('\n')
            if not line:
                break
            data = self._fp.read(int(line))
            self._fp.read(1)  # discard '\n'
            ret.append(data)
        '''
        st, ret = ret[0], ret[1:]

        if st == 'not_found':
            return None
        elif st == 'ok':
            if cmd.endswith('keys') or cmd.endswith('list') or \
                    cmd.endswith('scan') or cmd.endswith('range') or \
                    (cmd.startswith('multi_') and cmd.endswith('get')):
                return ret
            elif len(ret) == 1:
                if cmd.endswith('set') or cmd.endswith('del') or \
                        cmd.endswith('incr') or cmd.endswith('decr') or \
                        cmd.endswith('size') or cmd.endswith('rank') or \
                        cmd == 'setx' or cmd == 'zget':
                    return int(ret[0])
                else:
                    return ret[0]
            elif not ret:
                return True

        if ret:
            raise error(*ret)
        else:
            raise error('error')
登录发表评论