Netty EventExecutorGroup 隔离:让 MasterHandler 远离 EventLoop 阻塞
一、背景:EventLoop 阻塞的隐患
在 RL-Job-Scheduler 架构中,Master 通过 Netty 与 Worker 进行 RPC 通信。MasterHandler 作为 ChannelHandler,直接挂在 Netty 的 EventLoop 线程上处理所有入站消息:
Worker A ──┐Worker B ──┼──▶ Netty EventLoop ──▶ MasterHandler ──▶ Redis / MySQLWorker C ──┘ ▲ (同一个线程处理所有请求)问题在于,MasterHandler 中的部分操作是同步阻塞 IO:
| 操作 | 耗时 | 风险 |
|---|---|---|
| Redis TTL 更新 (HEARTBEAT) | ~1-5ms | 低,但频繁 |
| MySQL taskMapper.updateById() | ~5-20ms | 高,串行化 |
| Redis delete() | ~1-3ms | 中 |
| WebSocket 推送 | ~5-10ms | 中 |
当 taskMapper.updateById() 执行时,整个 EventLoop 线程被阻塞,同一 WorkerGroup 上的所有 Channel 全部被拖累,新消息无法及时接收,甚至可能触发 Worker 端的超时。
这正是经典的 EventLoop 阻塞 问题。
二、解决方案:EventExecutorGroup 隔离
架构设计
Worker A ──┐Worker B ──┼──▶ Netty EventLoop ──▶ Decoder ──▶ Encoder ──┐Worker C ──┘ ▲ │ (只做接收/发送) │ 异步分发 ▼ DefaultEventExecutorGroup (16 线程) │ ▼ MasterHandler (业务逻辑) │ ▼ Redis / MySQL / WebSocket核心思路:让 EventLoop 只做它擅长的事——接收数据、编解码、快速分发。耗时的业务逻辑(Redis/MySQL 操作)交给独立的业务线程池执行。
为什么是 DefaultEventExecutorGroup
| 特性 | 说明 |
|---|---|
| 生命周期绑定 | shutdownGracefully() 自动清理,无需手动管理 |
| 与 EventLoop 协作 | Netty 官方推荐,与 BossGroup/WorkerGroup 无缝配合 |
| 线程安全 | 内部线程池与 Netty 线程解耦 |
| 无需 ThreadLocal | 事务问题不存在(见下文) |
三、核心代码解析
Before(改造前)
public class MasterNettyServer { private MasterHandler masterHandler; // Spring 注入
public void start() { // ... serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()); ch.pipeline().addLast(new MessageEncoder()); ch.pipeline().addLast(masterHandler); // ❌ 挂在 EventLoop 上 } }); }}After(改造后)
import io.netty.util.concurrent.DefaultEventExecutorGroup;import io.netty.util.concurrent.EventExecutorGroup;
public class MasterNettyServer { private MasterHandler masterHandler; private EventExecutorGroup bizGroup; // 业务线程池
public void start() { bizGroup = new DefaultEventExecutorGroup(16,null,1000, RejectedExecutionHandlers.reject()); // 16 个业务线程,队列大小为1000,拒绝策略,默认线程工厂
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()); ch.pipeline().addLast(new MessageEncoder()); ch.pipeline().addLast(bizGroup, masterHandler); // ✅ 交给线程池 } }); }
public void stop() { // ... bizGroup.shutdownGracefully(); // ✅ 优雅关闭 }}Pipeline 对比
Before:┌──────────────┐ ┌──────────────┐ ┌──────────────────┐│ MessageDecoder│ → │ MessageEncoder│ → │ MasterHandler │└──────────────┘ └──────────────┘ └──────────────────┘ ▲ EventLoop 线程 (同步执行所有业务)
After:┌──────────────┐ ┌──────────────┐ ┌──────────────────┐│ MessageDecoder│ → │ MessageEncoder│ → │ MasterHandler │└──────────────┘ └──────────────┘ └──────────────────┘ ▲ DefaultEventExecutorGroup-1-N (16 个线程,并行处理业务)四、线程池配置说明
bizGroup = new DefaultEventExecutorGroup(16);重要说明:DefaultEventExecutorGroup 不支持直接传 core/max/queue/keepAlive 等参数。它的构造函数只有:
DefaultEventExecutorGroup(int numThreads)DefaultEventExecutorGroup(int numThreads, ThreadFactory threadFactory)
如果你需要更精细的配置(如队列大小、超时时间),需要自定义 ThreadFactory:
bizGroup = new DefaultEventExecutorGroup(16, new ThreadFactory() { private AtomicInteger idx = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "BizExecutor-" + idx.incrementAndGet()); t.setDaemon(false); return t; }});默认配置下的行为
| 参数 | 默认值 |
|---|---|
| 线程数 | 16(固定) |
| 队列 | LinkedBlockingQueue(无界) |
| 拒绝策略 | 抛异常(不推荐无界队列) |
五、Spring 上下文安全性说明
masterHandler 是 Spring Bean
@Componentpublic class MasterHandler extends ChannelInboundHandlerAdapter { @Autowired private RedisTemplate<String, Object> redisTemplate;
@Autowired private TaskMapper taskMapper;}masterHandler 是单例 Bean,而 Bean 实例本身是线程安全的。Bean 实例的字段(redisTemplate、taskMapper)都是无状态的,因此无论哪个线程调用都是安全的。
为什么不用 @Transactional
Spring 的 @Transactional 依赖 ThreadLocal 存储事务上下文。跨线程池后,ThreadLocal 数据会丢失。
但这不是问题,因为当前代码中:
handleStatusReport的taskMapper.updateById()是独立 AutoCommithandleHeartbeat的 Redis 操作本身就是原子命令- 各操作之间没有跨方法的事务需求
六、验证方式
1. 启动后观察日志
应用启动后,在日志中搜索 DefaultEventExecutorGroup:
# 应该看到类似输出:DefaultEventExecutorGroup-1-1 [nioEventLoop-3-1] INFO ...DefaultEventExecutorGroup-1-2 [nioEventLoop-3-2] INFO ......DefaultEventExecutorGroup-1-16 [nioEventLoop-3-16] INFO ...如果看到这些线程名,说明业务逻辑已经成功分发到线程池。
2. 添加断点验证
在 MasterHandler 的关键方法(如 channelRead0)中加断点,观察执行线程名。正常情况下,channelRead0 会在 EventLoop 线程执行,而实际业务处理(如 handleStatusReport)应该在 DefaultEventExecutorGroup-N-N 线程执行。
七、改动总结
改动文件(仅 1 个)
MasterNettyServer.java
| 改动点 | 说明 |
|---|---|
| 新增 import | DefaultEventExecutorGroup, EventExecutorGroup |
| 新增字段 | private EventExecutorGroup bizGroup |
| start() 初始化 | bizGroup = new DefaultEventExecutorGroup(16) |
| Pipeline 改造 | ch.pipeline().addLast(bizGroup, masterHandler) |
| stop() 清理 | bizGroup.shutdownGracefully() |
Git 提交
git add src/main/java/org/sgj/rljobscheduler/master/netty/MasterNettyServer.javagit commit -m "refactor(master): isolate MasterHandler to DefaultEventExecutorGroup"改造效果
改造前:Worker heartbeat → EventLoop 阻塞 10ms → 其他 Worker 等待
改造后:Worker heartbeat → EventLoop 快速分发 → DefaultEventExecutorGroup-1 处理(不阻塞) ↓ 其他 Worker 消息正常处理八、相关上下文
MasterHandler 处理的消息类型
| 消息类型 | 处理逻辑 | 阻塞风险 |
|---|---|---|
HEARTBEAT | Worker 注册 + Redis TTL | 低 |
EXECUTE_TASK_RESPONSE | 日志分发(已解耦) | 无 |
LOG_DATA | 入队(queue.offer) | 无 |
TASK_STATUS_REPORT | MySQL 更新 + Redis 删除 + WebSocket 推送 | 高 |
LogManager 已经解耦
LogManager 早已做了异步处理:
// MasterHandler 中的日志处理logQueue.offer(new LogEntry(traceId, content)); // ✅ 非阻塞入队
// Master-LogManager-Thread 异步消费while (true) { LogEntry entry = logQueue.take(); // 阻塞式消费 // 写入文件 / 推送 WebSocket}真正的阻塞点在于 handleStatusReport 中的 taskMapper.updateById(),这正是本次改造要解决的核心问题。