本文完整代码下载:github链接
目前在做的工作有一部门是搭建一个可供公司内部使用的推送平台,用的中间件是redis,于是就自然的想用redis5.0版本的新特性来实现这个功能,网上的demo比较少,且大多是终端操作的命令行,写了一个Python的类和大家分享。
在介绍具体实现之前,先大致介绍一下背景。
在Redis5.0版本发布之前,redis也有一个发布、订阅功能,但功能非常简单,只能单纯的发布和订阅,适合在即时通信里使用。缺点非常多:
-
消息没有持久化的机制。在Pub/Sub模型中,消费者是和连接(Connection)绑定的,当消费者的连接断掉(网络原因或者消费者进程crash)后,再次重连,那么Channel中的消息将永久消失(对于该消费者而言),也就是说Pub/Sub模型缺少消息回溯的机制
-
消费消息的速度和消费者的数量成反比。在Redis的实现中,Redis会把Channel中的消息逐个(Linear)推送给每个消费者,因此当消费者的数量达到一定规模时,服务器的性能将线性下降,因此每个消费者获取到消息的延迟也线性增长
-
当生产者产生消息的速度远大于消费者的消费能力的时候(此时可以简单地理解为消息积压),消费者会被强制断开连接,因此会造成消息的丢失,这个特性可以详见redis的配置
-
对频道的消费者信息没有展现接口。 在我们的项目里需要管理每一个频道的订阅者,虽然redis本身有记录,但是并没有提供API可以访问。
Redis5.0最大的新特性就是多出了一个数据结构Stream,它是一个新的强大的支持多播的可持久化的消息队列,设计和Kafka非常相似。
- Redis Stream有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的ID和对应的内容。消息是持久化的,Redis重启后,内容还在。
- 每个消费组都有一个Stream内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从Stream的某个消息ID开始消费,这个ID用来初始化last_delivered_id变量。
- 每个Stream都可以挂多个消费组,每个消费组会有个游标last_delivered_id在Stream数组之上往前移动,表示当前消费组已经消费到哪条消息了。
- 同一个消费组(Consumer Group)可以挂接多个消费者(Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者者有一个组内唯一名称。【但是每个消费者并没有消费到哪条 消息的单独记录,所以后续我队列的消费者就是一个只含有一个消费者的消费组,这样可以方便记录更多信息】
- 消费者(Consumer)内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有ack。如果客户端没有ack,这个变量里面的消息ID会越来越多,一旦某个消息被ack,它就开始减少。这个pending_ids变量用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
具体实现:
class SubRedis(object):
def __init__(self):
if not hasattr(SubRedis, 'pool'):
SubRedis.getRedisCoon() #创建redis连接池
self._coon = redis.Redis(connection_pool=SubRedis.pool)
@staticmethod
def getRedisCoon():
SubRedis.pool = redis.ConnectionPool(host=redisInfo['SubRedisAddress'],password=redisInfo['SubRedisPassword'],port=redisInfo['SubRedisPort'],db=redisInfo['db'])
#返回一个channel的具体信息: 订阅者数量,最后送达的msg的ID...
def channel_info(self,channel):
return self._coon.xinfo_stream(channel)
#返回一个channel的具体订阅群组的信息(这里是返回订阅者,因为每一个群组里只有一个消费者)
def channel_consumers_info(self,channel):
InfoList = self._coon.xinfo_groups(channel)
for GroupDict in InfoList:
GroupDict.pop("consumers")
return InfoList
#创建消费者
def create_consumer_group(self,name,channel):
ret = self._coon.xgroup_create(channel,name,id="$")
if ret == True:
print self.channel_consumers_info(channel)
else:
logging.error("create consumer %s fill,ret %s" %(name,ret))
#往某一个channel发送消息
def publish(self,channel,msg):
msgid = self._coon.xadd(channel,msg)
return msgid
def consumer_already_subscribed(self,channel,consumer):
channel_consumers_infolist = self.channel_consumers_info(channel)
for consumer_dict in channel_consumers_infolist:
if consumer in consumer_dict.values():
logging.warning("consumer %s has already subscribed %s" % (consumer, channel))
return True
return False
#已经存在的订阅者订阅新频道
def subscribe(self,name,channel):
if(self.consumer_already_subscribed(channel,consumer)):
return False
self.create_consumer_group(name,channel)
print "%s subscribe %s success,channel %s info:"%(name,channel,channel),self.channel_consumers_info(channel)
return
#监听并写入新消息到文件
def listen_channel(self,channel,consumer,file):
if not (self.consumer_already_subscribed(channel,consumer)):
return False
mess = self._coon.xreadgroup(consumer,consumer,{channel:">"})
if mess != []:
msg_list = mess[0][1]
for msg in msg_list:
id, content = msg[0], msg[1]
content["msgid"] = id
json_content = json.dumps(content)
json_content += ","
print ("new message: ",content)
with open(file, "a") as f:
f.write(json_content)
self._coon.xack(channel, consumer, id)