1425 words
7 minutes
Netty-Redis for RPC

RL_Scheduler 项目 Netty + Redis 架构解析#


一、整体架构

┌─────────────────────────────────────────────────────────────────┐
│ Master (8082) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ MasterNettyServer (Port 9000) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │ │
│ │ │ MessageDecoder│→│ MasterHandler│→│ MessageEncoder│ │ │
│ │ └─────────────┘ └─────────────┘ └──────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼───────────────────────────────┐ │
│ │ ChannelManager │ │
│ │ (ConcurrentHashMap<workerId, Channel>) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼───────────────────────────────┐ │
│ │ StringRedisTemplate │ │
│ │ worker:{id}:hb (30s TTL) │ worker:{id}:task (120s) │ │
│ │ task:{id}:workerId │ worker:{id}:meta │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼───────────────────────────────┐ │
│ │ SchedulerService / LogManager │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ Protobuf Binary (Netty RPC)
┌─────────────────────────────────────────────────────────────────┐
│ Worker (Java Agent) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ WorkerHandler │ │
│ │ (Task Execution + Status Reporting) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼───────────────────────────────┐ │
│ │ RedisLeaseManager │ │
│ │ (Lettuce Client - Sync Commands) │ │
│ │ setex worker:{id}:hb │ setex task:{id}:workerId │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────▼───────────────────────────────┐ │
│ │ Python Executor (train.py) │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

二、核心组件详解#

  1. Netty 服务端 (MasterNettyServer)
// 关键配置
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1个Accept线程
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 无限大Worker线程池
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) // TCP全连接队列
.childOption(ChannelOption.SO_KEEPALIVE, true) // TCP保活

性能关键点:

┌─────────────────────┬───────────┬──────────────────┐
│ 配置 │ 值 │ 作用 │
├─────────────────────┼───────────┼──────────────────┤
│ NioEventLoopGroup() │ 默认CPU*2 │ 高效I/O多路复用 │
├─────────────────────┼───────────┼──────────────────┤
│ SO_KEEPALIVE │ true │ 检测死连接 │
├─────────────────────┼───────────┼──────────────────┤
│ SO_BACKLOG │ 128 │ 缓解瞬时连接洪峰 │
└─────────────────────┴───────────┴──────────────────┘

  1. Protobuf 序列化 (MessageEncoder/Decoder)
┌─────────────────────────────────────────┐
│ 4B Magic + 1B Version │
│ + 4B Length + 1B Type │
├─────────────────────────────────────────┤
│ Body (Protobuf) │
└─────────────────────────────────────────┘
Total: 10B Header

MessageEncoder 关键代码:

// 直接序列化Protobuf,无中间转换
if (msg.getBody() instanceof MessageLite) {
bodyBytes = ((MessageLite) msg.getBody()).toByteArray();
}

对比JSON:

┌───────────────────┬──────┬──────────┬────────┐
│ 序列化方式 │ 体积 │ 解析速度 │ 跨语言 │
├───────────────────┼──────┼──────────┼────────┤
│ Protobuf │ 最小 │ 最快 │ ✅ │
├───────────────────┼──────┼──────────┼────────┤
│ JSON │ 大 │ 中等 │ ✅ │
├───────────────────┼──────┼──────────┼────────┤
│ Java Serializable │ 大 │ 慢 │ ❌ │
└───────────────────┴──────┴──────────┴────────┘

  1. ChannelManager (Worker连接管理)
@Component
public class ChannelManager {
// workerId -> Channel (长连接)
private final Map<String, Channel> workerChannels = new ConcurrentHashMap<>();
// 高性能并发读写
public void register(String workerId, Channel channel) {
workerChannels.put(workerId, channel);
}
// Master向指定Worker发消息
public void sendToWorker(String workerId, NettyMessage msg) {
Channel ch = workerChannels.get(workerId);
if (ch != null && ch.isActive()) {
ch.writeAndFlush(msg);
}
}
}

  1. Redis心跳租约 (MasterHandler)
// Master收到Worker心跳后:
private void handleHeartbeat(ChannelHandlerContext ctx, HeartbeatRequest req) {
String workerId = req.getWorkerId();
// 1. 维护TCP长连接映射
channelManager.register(workerId, ctx.channel());
// 2. Redis心跳 (30秒TTL)
String hbKey = "worker:" + workerId + ":hb";
redisTemplate.opsForValue().set(hbKey, "alive", 30, TimeUnit.SECONDS);
// 3. 任务运行时续约 (120秒TTL)
if (currentTaskId != null) {
String taskKey = "worker:" + workerId + ":task";
redisTemplate.expire(taskKey, 120, TimeUnit.SECONDS);
}
}

心跳续约机制:

Worker: 每10秒发送心跳 ──────────────────────────────▶
Master: 更新Redis TTL ────────────────────────────────▶
worker:{id}:hb TTL=30s
worker:{id}:task TTL=120s
如果Worker宕机:
Redis TTL过期 ──▶ Master检测到"worker超时" ──▶ 重新调度任务

  1. Worker端 RedisLeaseManager
// Worker使用Lettuce同步客户端
RedisURI uri = RedisURI.builder().withHost(host).withPort(port).build();
RedisClient client = RedisClient.create(uri);
StatefulRedisConnection<String, String> connection = client.connect();
RedisCommands<String, String> commands = connection.sync();
// 关键方法
public void renew(String currentTaskId, String lastTaskId) {
commands.setex(hbKey(), hbTtlSeconds, "alive"); // 心跳
commands.setex(taskKey(), taskTtlSeconds, currentTaskId); // 任务
commands.setex(taskOwnerKey(currentTaskId), taskTtlSeconds, workerId);
}

为什么用Lettuce而不是Spring Data Redis?

┌─────────┬──────────┬────────┬────────┐
│ 客户端 │ 线程安全 │ 连接池 │ 响应式 │
├─────────┼──────────┼────────┼────────┤
│ Lettuce │ ✅ │ 内置 │ ✅ │
├─────────┼──────────┼────────┼────────┤
│ Jedis │ ⚠️需池 │ 需池 │ ❌ │
└─────────┴──────────┴────────┴────────┘

三、消息流分析#

  1. 任务下发流程
Master Worker
│ │
│ EXECUTE_TASK (Protobuf) │
│───────────────────────────────────▶│ channelRead0()
│ │
│ WorkerHandler.runPythonTask() │
│ │ │
│ ├── persistTaskStart(taskId) │
│ │ └── Redis SETEX task key │
│ │ │
│ └── 启动Python进程 │
│ │
│ EXECUTE_TASK_RESPONSE │
│◀──────────────────────────────────│ ctx.writeAndFlush()
│ │
│ (异步日志流) │
│◀──────────────────────────────────│ LOG_DATA (每行日志)
│ │
│ (任务结束) │
│◀──────────────────────────────────│ TASK_STATUS_REPORT
│ │
│ releaseTaskOwner(taskId) │
│ tryDispatchQueuedTaskToWorker() │
  1. 心跳续约时序
Worker Redis Master
│ │ │
│──── SETEX worker:1:hb ────────────▶│ │
│──── SETEX worker:1:task ──────────▶│ │
│ │ │
│ (10秒后) │ │
│──── SETEX worker:1:hb ────────────▶│ │
│──── SETEX worker:1:task ──────────▶│ │
│ │ │
│ (如果宕机) │ │
│ (不再发送心跳) │ │
│ │ TTL=30s expire │
│ │◀──── worker:1:hb 消失 │
│ │ │
│ Master检测到心跳丢失 │
│ 重新调度任务 │

四、性能优化点#

  1. Netty优化
// 当前配置
.childOption(ChannelOption.SO_KEEPALIVE, true) // TCP保活
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle,低延迟
.childOption(ChannelOption.SO_LINGER, 0) // 关闭时立即销毁
// 可增强配置
.childOption(ChannelOption.SO_RCVBUF, 65536) // 增大接收缓冲
.childOption(ChannelOption.SO_SNDBUF, 65536) // 增大发送缓冲
  1. Redis优化
// Pipeline批量操作(减少网络RTT)
List<String> keys = Arrays.asList(
"worker:" + workerId + ":hb",
"worker:" + workerId + ":task"
);
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (String key : keys) {
connection.stringCommands().setEx(
key.getBytes(),
ttlSeconds,
"alive".getBytes()
);
}
return null;
});
  1. 可改进空间
┌─────────────────┬────────────────────────────┬────────────┐
│ 当前 │ 可优化为 │ 收益 │
├─────────────────┼────────────────────────────┼────────────┤
│ 同步Redis命令 │ Lettuce异步/VirtualThread │ 更高并发 │
├─────────────────┼────────────────────────────┼────────────┤
│ Thread-per-Task │ Netty EventLoop │ 减少线程数 │
├─────────────────┼────────────────────────────┼────────────┤
│ 单Reactor │ 多Reactor (boss1, workerN) │ 多核扩展 │
├─────────────────┼────────────────────────────┼────────────┤
│ 无连接池 │ Lettuce连接池 │ 复用连接 │
└─────────────────┴────────────────────────────┴────────────┘

五、关键设计亮点

1. 心跳续约 + Redis TTL = 分布式故障检测
├── 简单可靠,无需额外监控
└── TTL过期 = Worker宕机/失联
2. TaskIdKey 双重保险
├── worker:{id}:task (Worker自我报告)
└── task:{id}:workerId (调度所有权)
└── 两者不一致 = 任务异常,需要修复
3. LogManager 异步解耦
├── MasterHandler 接收日志后 enqueue
└── 独立线程异步写入文件
└── 不阻塞EventLoop
4. ChannelManager 本地缓存
├── 无需每次查Redis
└── O(1) Channel查找
└── 但需要额外机制处理Worker迁移

六、与高性能的差距

┌──────┬──────────────────────┬───────────────────┐
│ 方面 │ 当前实现 │ 生产级优化 │
├──────┼──────────────────────┼───────────────────┤
│ 协议 │ Protobuf ✅ │ 可加压缩 (Snappy) │
├──────┼──────────────────────┼───────────────────┤
│ 连接 │ 长连接 ✅ │ 连接池化 │
├──────┼──────────────────────┼───────────────────┤
│ 线程 │ NioEventLoopGroup ✅ │ 可绑核 (Epoll) │
├──────┼──────────────────────┼───────────────────┤
│ 心跳 │ Redis续约 ✅ │ 可用Netty内置 │
├──────┼──────────────────────┼───────────────────┤
│ 调度 │ 简单队列 │ 可加优先级队列 │
├──────┼──────────────────────┼───────────────────┤
│ 容错 │ 心跳检测 ✅ │ 可加重试+幂等 │
└──────┴──────────────────────┴───────────────────┘

这是一个轻量但有效的生产级原型,关键路径设计合理。

Netty-Redis for RPC
https://sgjki547.top/posts/netty-redis/
Author
SGJki
Published at
2026-04-06
License
CC BY-NC-SA 4.0