异步消息队列
Redis 的list数据结构常用来作为异步消息队列使用,用rpush和lpush操作入队,用lpop和rpop操作出队列
队列空了怎么办
客户端通过队列的pop操作获取的消息,进行处理。处理完之后接着获取,进行处理。如此循环。
如果队列空了,客户端陷入pop的死循环,不停的pop,浪费生命的空轮询。
通常使用sleep来解决这个问题
阻塞读
睡眠有个小问题,导致消息的延迟增大。如果只有1个消费者,那么这个延迟就是1s。多个消费者,这个延迟有所下降。
使用blpop 或 brpop。b代表blocking,阻塞读
空闲连接自动断开
如果线程一直阻塞在那里,redis客户端连接就成了闲置连接,闲置过久,服务器一般会主动断开连接,客户端消费者要小心,捕获异常,还要重试
延时队列的实现
通过redis的zset来实现。将消息序列化成一个字符串作为zset的value,这个消息到期处理时间作为score,然后用多个线程轮询zset获取到期的任务进行处理。多个线程是为了保障可用性,万一挂了一个线程还有其他线程可以继续处理。因为有多个线程,需要考虑并发争抢任务,确保任务不会被多次执行
def loop():
while True:
values = r.zrangebyscore("delay_queue",0,time.time(),start = 0,num = 1)
if not values:
time.sleep(1)
continue
value = values[0]
success = r.zrem("delay_queue",value)
if success:
handle_msg(value)