Java进阶指南
表达式解析
UEL 统一表达式语言
Ognl 对象图导航语言
Spel Spring表达式语言
Java 进阶
SPI的高级用法
SLF4J的绑定原理
H2 JDBC驱动类注册与数据库引擎初始化原理
Java SPI与Dubbo SPI区别
Java 秒懂对象 PO、VO、BO、DTO、POJO!
Java POJO/DO/DTO/BO/VO概念及应用案例分析
一个线程oom,进程里其他线程还能运行吗?
jps命令详解
Java的BigDecimal也会存在丢失精度的问题
java中的枚举类和常量类区别在哪儿?
Java 打包 FatJar 方法小结
"too many open files"的原理和解决方案
GraalVM 专栏
GraalVM入门以及环境搭建
Maven 专栏
maven 跳过单元测试-maven.test.skip和skipTests的区别
maven 配置代码检查插件,生成检查报告
Maven 执行生命周期
maven 删除本地仓库当前项目的依赖包
Gradle 专栏
自己动手应用Groovy实现Gradle的DSL—Task定义
看懂Gradle脚本(1)- Groovy语言的Map语法糖
看懂Gradle脚本(2)- Groovy语言的闭包语法
看懂Gradle脚本(3)- Groovy AST转换
看懂Gradle脚本(4)- Groovy语法之运算符重载
看懂Gradle脚本(5)- 跟Gradle学领域驱动设计
看懂Gradle脚本(6)- Hello Groovy, Goodbye Getters&Setters
看懂Gradle脚本(7)- ext {}函数是如何实现的
Gradle 常见问题集锦
Spring 专栏
Spring AOP 使用介绍,从前世到今生
Spring IOC 容器源码分析
Spring AOP 源码解析
Spring @PropertySource 注解实现读取 yml 文件
Spring 好用的工具类
Spring @Async失效情况
Spring I/O 2023 干货视频精选!
Spring 动态刷新bean
Spring Cache缓存技术
Spring @Transactional注解失效情况
Spring Event 事件订阅踩坑
循环依赖
Spring 解析@Async引起的循环依赖
Spring 中的循环依赖
从源码层面深度剖析 Spring 循环依赖 | 京东云技术团队
Spring 不同平台构建出现循环依赖错误问题原因分析
SpringBoot 专栏
SpringBoot 构建FarJAR Maven配置
SpringBoot 项目启动慢原因分析
SpringBoot 资源文件问题总结(Spring Boot的静态资源访问,配置文件外置)
SpringBoot 读取Jar包中静态资源原理
SpringBoot 配置Undertow处理高并发
SpringBoot Maven Profile配合Spring Profile进行多环境配置和打包
SpringBoot 使用profile结合maven实现多环境配置
SpringBoot @ComponentScan注解过滤排除不加载某个类的3种方法
Mybatis 专栏
Mybatis 一级、二级缓存机制
Mybatis 关闭一级、二级缓存机制
MybatisPlus
MybatisPlus LambdaQueryWrapper类的实现原理
MybatisPlus 在不修改全局策略和字段注解的情况下将字段更新为null
并发与多线程
Java 从单核到多核的多线程并发
并发和并行的区别
Redisson 专栏
一次生产redisson 延时队列不消费问题排查
redisson 阻塞队列不消费问题排查
Spring Batch 专栏
批处理框架spring batch基础知识介绍
Shiro 专栏
一篇适合小白的Shiro教程
SpringMVC 专栏
SpringMVC 后端处理多文件上传如何保持最大的灵活性
@RequestParam的加与不加的作用
SpringCloud 专栏
Gateway 一文彻底解决跨域问题
ruoyi-vue-pro 开发指南
萌新必读
简介
交流群
视频教程
功能列表
快速启动(后端项目)
快速启动(前端项目)
接口文档
技术选型
项目结构
代码热加载
一键改包
删除功能
内网穿透
达梦数据库专属
后端手册
新建模块
代码生成【单表】(新增功能)
代码生成【主子表】
代码生成【树表】
功能权限
数据权限
用户体系
三方登录
OAuth 2.0(SSO 单点登录)
SaaS多租户【字段隔离】
SaaS 多租户【数据库隔离】
WebSocket 实时通讯
异常处理(错误码)
参数校验、时间传参
分页实现
VO 对象转换、数据翻译
文件存储(上传下载)
Excel 导入导出
操作日志、访问日志、异常日志
MyBatis 数据库
MyBatis 联表&分页查询
多数据源(读写分离)、事务
Redis 缓存
本地缓存
异步任务
分布式锁
幂等性(防重复提交)
请求限流(RateLimiter)
单元测试
验证码
工具类
配置管理
数据库文档
中间件手册
定时任务
消息队列(内存)
消息队列(Redis)
消息队列(RocketMQ)
消息队列(RabbitMQ)
消息队列(Kafka)
限流熔断
工作流手册
工作流演示
功能开启
工作流(达梦适配)
审批接入(流程表单)
审批接入(业务表单)
流程设计器(BPMN)
流程设计器(钉钉、飞书)
选择审批人、发起人自选
会签、或签、依次审批
流程发起、取消、重新发起
审批通过、不通过、驳回
审批加签、减签
审批转办、委派、抄送
执行监听器、任务监听器
流程表达式
流程审批通知
大屏手册
报表设计器
大屏设计器
支付手册
功能开启
支付宝支付接入
微信公众号支付接入
微信小程序支付接入
支付宝、微信退款接入
会员手册
功能开启
微信公众号登录
微信小程序登录
会员用户、标签、分组
会员等级、积分、签到
商城手册
商城演示
功能开启
商城装修
【商品】商品分类
【商品】商品属性
【商品】商品 SPU 与 SKU
【商品】商品评价
【交易】购物车
【交易】交易订单
【交易】售后退款
【交易】快递发货
【交易】门店自提
【交易】分销返佣
【营销】优惠劵
【营销】拼团活动
【营销】秒杀活动
【营销】砍价活动
【营销】满减送
【营销】限时折扣
【营销】内容管理
【统计】会员、商品、交易统计
ERP手册
ERP 演示
功能开启
【产品】产品信息、分类、单位
【库存】产品库存、库存明细
【库存】其它入库、其它出库
【库存】库存调拨、库存盘点
【采购】采购订单、入库、退货
【销售】销售订单、出库、退货
【财务】采购付款、销售收款
CRM 手册
CRM 演示
功能开启
【线索】线索管理
【客户】客户管理、公海客户
【商机】商机管理、商机状态
【合同】合同管理、合同提醒
【回款】回款管理、回款计划
【产品】产品管理、产品分类
【通用】数据权限
【通用】跟进记录、待办事项
公众号手册
功能开启
公众号接入
公众号粉丝
公众号标签
公众号消息
自动回复
公众号菜单
公众号素材
公众号图文
公众号统计
系统手册
短信配置
邮件配置
站内信配置
数据脱敏
敏感词
地区 & IP 库
运维手册
开发环境
Linux 部署
Docker 部署
Jenkins 部署
HTTPS 证书
服务监控
前端手册 Vue 3.x
开发规范
菜单路由
Icon 图标
字典数据
系统组件
通用方法
配置读取
CRUD 组件
国际化
IDE 调试
代码格式化
前端手册 Vue 2.x
开发规范
菜单路由
Icon 图标
字典数据
系统组件
通用方法
配置读取
更新日志
【v2.1.0】开发中
【v2.0.1】2024-03-01
【v2.0.0】2024-01-26
【v1.9.0】2023-12-01
【v1.8.3】2023-10-24
yudao-cloud 开发指南
萌新必读
简介
交流群
视频教程
功能列表
快速启动(后端项目)
快速启动(前端项目)
接口文档
技术选型
项目结构
代码热加载
一键改包
删除功能
内网穿透
达梦数据库专属
微服务手册
微服务调试(必读)
注册中心 Nacos
配置中心 Nacos
服务网关 Spring Cloud Gateway
服务调用 Feign
定时任务 XXL Job
消息队列(内存)
消息队列(Redis)
消息队列(RocketMQ)
消息队列(RabbitMQ)
消息队列(Kafka)
消息队列(Cloud)
分布式事务 Seata
服务保障 Sentinel
Spring Security 专栏
Spring Security 入门
Spring Security OAuth2 入门
Spring Security OAuth2 存储器
Spring Security OAuth2 单点登录
Spring Security 常见问题
Guava 专栏
Guava 常用API汇总
本文档使用 MrDoc 发布
-
+
首页
一次生产redisson 延时队列不消费问题排查
# 问题描述 项目使用 redisson 延时队列功能,实现直播的开播提醒,突然有一天业务爆出问题,未触发开播提醒。 # 初步排查 首先通过查询生产日志,发送端日志存在,没有消费日志,猜测消费端没有消费到延时消息,,在 dba 的协助下查询 redis 队列,消息也确实存在,但已经过了过期时间,由此证明 redisson 消费者出现问题。通过服务日志发现在最后一次设置自定义推送任务是在一次服务发布之前,服务发布后,之前设置的自定义推送消息均没有被客户端消费,由此猜想是由发布服务导致消费端失效。 # 排查过程 发送端代码 ```java public void produce(String delayQueue, T t, long delay, TimeUnit timeUnit) { try { log.info("delay msg,delayQueue:{},key:{},delay:{}", delayQueue, t, delay); if (delay < 0) { delay = 0; } RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(delayQueue); RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); }catch (Exception e){ log.error("添加延时任务队列失败",e); } } ```` 消费端代码 ```java public class DelayTaskHandler implements Runnable { @Override public void run() { RBlockingQueue blockingFairQueue = redissonClient.getBlockingQueue(delayQueue); while (true) { try { T value = blockingFairQueue.take(); log.info("delay queue {},延时任务开始执行,value - {} , timeStamp - {} , threadName - {}", delayQueue, value, System.currentTimeMillis(), Thread.currentThread().getName()); consumer.accept(value); } catch (Exception e) { log.error("延时任务执行失败,", e); } } } } ``` **因为 redisson 延时队列是基于 redis 实现的,所以从 redis 执行命令开始入手排查** > 1.打开 redis 监控,启动服务,发现 redis 首先执行了 blpop 命令,阻塞等待{cl-live-admin:notice_delay_queue} 队列消息 >  > 2.提交一个延时任务后,观察 redis 命令 >  > 此时发现 redis 首先执行了一个 SUBSCRIBE 命令,订阅了一个队列,然后执行了一段 lua 脚本,主要包括以下命令: - zrangebyscore:获取 zset 中 score 在 0 至当前时间戳范围内的前一百条数据 如果获取到数据则循环执行 rpush,lrem,zrem 命令 - zrange:取 zset 中第一条数据 - zadd:向 zset 中添加一条数据,score 为时间戳 - rpush:向 list 右边 push 一条数据 - publish:如果添加的消息在顶部,则发布一条订阅消息 > 3.消费一条消息 >  > 同样消费的时候也是提交了一条 lua 脚本,主要执行了以下命令 可以看到和发送端命令相似 - zrangebyscore:获取 zset 中 score 在 0 至当前时间戳范围内的前一百条数据 - rpush:向 list 右边 push 一条数据 - lrem:删除一条数据 - zrem:删除 zeset 中的数据 - zrange:获取第一条数据 - BLPOP:阻塞等待队列消息 通过以上 redis 命令的执行可以发现一个命令 SUBCRIBE 用于订阅 redis 的一个队列,而这个命令只在发送消息的时候执行了,在消费的时候没有执行。从而验证了当服务重启后如果没有新的消息发送,那么客户端就不会发送 SUBCRIBE 命令,订阅延时队列,这就导致在服务重启前发送的消息到时间后无法消费。 # 解决方案 在消费端启动的时候添加一行代码用于订阅延时队列 ```java //订阅 redis 队列 redissonClient.getDelayedQueue(blockingFairQueue); ``` **那么为什么没有订阅就消费不到消息了呢?带着疑问继续深入理解 redisson 的实现** # redisson 延时队列原理  > 首先回到消费端代码  > 在我们没有发送订阅命令的时候,客户端只是在阻塞等待一个指定队列的消息,那么这个队列的消息是谁放进去的呢? > 带着疑问我们再看发送端代码  > 直接进入 delayedQueue.offer()方法内部  > 可以看到发送端是提交了一个 lua 脚本主要执行了 zadd,rpush,publish 命令,这里我们需要注意 publish 命令,在 redis 中 pub/sub 是对应的,当有 publish 的时候,那么 subcribe 端会收到该订阅消息。 > 那么是谁收到了订阅的消息,收到消息后又做了什么呢,回到 redissonClient.getDelayedQueue(blockingFairQueue)代码中  > 继续进入 new RedissonDelayedQueue()  > 可以看到这里创建了一个 QueueTransferTask,实现了 pushTaskAsync()方法,具体内容是一个 lua 脚本,首先执行 zrangebyscore 获取过期的前一百条数据,循环调用 rpush,lrem,zrem,注意这里 rpush 的队列为我们指定的延时队列,也就是 consumer 端 take 的队列。至此明白了消费端的消息是方法 pushTaskAsync()执行后放入的。那么什么时候执行这个方法呢。 > 进入 queueTransferService.schedule(queueName, task)方法  > 这里会执行 start 方法,继续跟进  这里可以看到添加了两个 listener,onSubcribe,onMessage,当订阅到消息时执行 onSubcribe 中的 pushTash,当 redis 有新的消息通知,就会触发 scheduleTask(...)方法,startTime 为上述中 publish 通知的元素过期时间 > 继续进入 pushTask 方法  > 这里可以看到一个熟悉的方法 pushTaskAsync(),也就是前边的一段 lua 脚本,用于将过期的消息放入阻塞队列,并返回排在第一个的消息执行 scheduleTask() > 继续进入 scheduleTask()方法  > 如果时间差小于 10 毫秒则执行 pushTask 方法,如果大于 10 毫秒则启动一个延时任务,到时间后执行 pushTask 方法。pushTask 与 scheduleTask 互相调用循环往复 # 至此源码分析完毕,整个流程总结如下: > 1. 发送端只是往 zset,list,添加数据,并且发布一条订阅消息 > 2. 消费端收到订阅消息后会查询 zset 中的过期消息,并放入阻塞队列供消费端 take 消息,并且获取 zset 第一个消息,启动一个延时任务,到期后继续从 zset 中获取过期消息如此循环。 此时就回答了上边的问题 **那么为什么没有订阅就消费不到消息了呢?** 如果没有订阅的话消费端就收不到订阅消息,也就不会去获取过期时间放入阻塞队列进行循环。
LazzMan
2023年7月6日 16:07
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码