十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。

来安ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
- redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
- implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
| 1 2 | r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx() | 
有了ConnectionPool这个类之后,可以使用如下方法
- r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
- r.xxxx()
这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | classStrictRedis(object):........    def__init__(self, host='localhost', port=6379,                 db=0, password=None, socket_timeout=None,                 socket_connect_timeout=None,                 socket_keepalive=None, socket_keepalive_options=None,                 connection_pool=None, unix_socket_path=None,                 encoding='utf-8', encoding_errors='strict',                 charset=None, errors=None,                 decode_responses=False, retry_on_timeout=False,                 ssl=False, ssl_keyfile=None, ssl_certfile=None,                 ssl_cert_reqs=None, ssl_ca_certs=None):         ifnotconnection_pool:             ..........              connection_pool =ConnectionPool(**kwargs)         self.connection_pool =connection_pool | 
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |    # COMMAND EXECUTION AND PROTOCOL PARSING    defexecute_command(self, *args, **options):        "Execute a command and return a parsed response"        pool =self.connection_pool        command_name =args[0]        connection =pool.get_connection(command_name, **options)  #调用ConnectionPool.get_connection方法获取一个连接        try:            connection.send_command(*args)  #命令执行,这里为Connection.send_command            returnself.parse_response(connection, command_name, **options)        except(ConnectionError, TimeoutError) as e:            connection.disconnect()            ifnotconnection.retry_on_timeout andisinstance(e, TimeoutError):                raise            connection.send_command(*args)              returnself.parse_response(connection, command_name, **options)        finally:            pool.release(connection)  #调用ConnectionPool.release释放连接 | 
在来看看ConnectionPool类:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |      classConnectionPool(object):         ...........    def__init__(self, connection_class=Connection, max_connections=None,                 **connection_kwargs):   #类初始化时调用构造函数        max_connections =max_connections or2**31        ifnotisinstance(max_connections, (int, long)) ormax_connections < 0:  #判断输入的max_connections是否合法            raiseValueError('"max_connections" must be a positive integer')        self.connection_class =connection_class  #设置对应的参数        self.connection_kwargs =connection_kwargs        self.max_connections =max_connections        self.reset()  #初始化ConnectionPool 时的reset操作    defreset(self):        self.pid =os.getpid()        self._created_connections =0#已经创建的连接的计数器        self._available_connections =[]   #声明一个空的数组,用来存放可用的连接        self._in_use_connections =set()  #声明一个空的集合,用来存放已经在用的连接        self._check_lock =threading.Lock().......    defget_connection(self, command_name, *keys, **options):  #在连接池中获取连接的方法        "Get a connection from the pool"        self._checkpid()        try:            connection =self._available_connections.pop()  #获取并删除代表连接的元素,在***次获取connectiong时,因为_available_connections是一个空的数组,            会直接调用make_connection方法        exceptIndexError:            connection =self.make_connection()        self._in_use_connections.add(connection)   #向代表正在使用的连接的集合中添加元素        returnconnection       defmake_connection(self): #在_available_connections数组为空时获取连接调用的方法        "Create a new connection"        ifself._created_connections >=self.max_connections:   #判断创建的连接是否已经达到***限制,max_connections可以通过参数初始化            raiseConnectionError("Too many connections")        self._created_connections +=1#把代表已经创建的连接的数值+1        returnself.connection_class(**self.connection_kwargs)     #返回有效的连接,默认为Connection(**self.connection_kwargs)    defrelease(self, connection):  #释放连接,链接并没有断开,只是存在链接池中        "Releases the connection back to the pool"        self._checkpid()        ifconnection.pid !=self.pid:            return        self._in_use_connections.remove(connection)   #从集合中删除元素        self._available_connections.append(connection) #并添加到_available_connections 的数组中    defdisconnect(self): #断开所有连接池中的链接        "Disconnects all connections in the pool"        all_conns =chain(self._available_connections,                          self._in_use_connections)        forconnection inall_conns:            connection.disconnect() | 
execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:
| 1 2 3 4 5 6 7 | classConnection(object):    "Manages TCP communication to and from a Redis server"    def__del__(self):   #对象删除时的操作,调用disconnect释放连接        try:            self.disconnect()        exceptException:            pass | 
核心的链接建立方法是通过socket模块实现:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |     def_connect(self):        err =None        forres insocket.getaddrinfo(self.host, self.port, 0,                                      socket.SOCK_STREAM):            family, socktype, proto, canonname, socket_address =res            sock =None            try:                sock =socket.socket(family, socktype, proto)                # TCP_NODELAY                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)                # TCP_KEEPALIVE                ifself.socket_keepalive:   #构造函数中默认 socket_keepalive=False,因此这里默认为短连接                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)                    fork, v initeritems(self.socket_keepalive_options):                        sock.setsockopt(socket.SOL_TCP, k, v)                # set the socket_connect_timeout before we connect                sock.settimeout(self.socket_connect_timeout)  #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式                # connect                sock.connect(socket_address)                # set the socket_timeout now that we're connected                sock.settimeout(self.socket_timeout)  #构造函数中默认socket_timeout=None                returnsock            exceptsocket.error as _:                err =_                ifsock isnotNone:                    sock.close()..... | 
关闭链接的方法:
| 1 2 3 4 5 6 7 8 9 10 11 |     defdisconnect(self):        "Disconnects from the Redis server"        self._parser.on_disconnect()        ifself._sock isNone:            return        try:            self._sock.shutdown(socket.SHUT_RDWR)  #先shutdown再close            self._sock.close()        exceptsocket.error:            pass        self._sock =None | 
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。
本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog./1652935/1583541