1425 words
7 minutes
Netty-Redis for RPC
RL_Scheduler 项目 Netty + Redis 架构解析
一、整体架构
┌─────────────────────────────────────────────────────────────────┐ │ Master (8082) │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ MasterNettyServer (Port 9000) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌──────────────┐ │ │ │ │ │ MessageDecoder│→│ MasterHandler│→│ MessageEncoder│ │ │ │ │ └─────────────┘ └─────────────┘ └──────────────┘ │ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────▼───────────────────────────────┐ │ │ │ ChannelManager │ │ │ │ (ConcurrentHashMap<workerId, Channel>) │ │ │ └───────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────▼───────────────────────────────┐ │ │ │ StringRedisTemplate │ │ │ │ worker:{id}:hb (30s TTL) │ worker:{id}:task (120s) │ │ │ │ task:{id}:workerId │ worker:{id}:meta │ │ │ └───────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────▼───────────────────────────────┐ │ │ │ SchedulerService / LogManager │ │ │ └───────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ▲ │ Protobuf Binary (Netty RPC) │ ┌─────────────────────────────────────────────────────────────────┐ │ Worker (Java Agent) │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ WorkerHandler │ │ │ │ (Task Execution + Status Reporting) │ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────▼───────────────────────────────┐ │ │ │ RedisLeaseManager │ │ │ │ (Lettuce Client - Sync Commands) │ │ │ │ setex worker:{id}:hb │ setex task:{id}:workerId │ │ │ └───────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────────────────▼───────────────────────────────┐ │ │ │ Python Executor (train.py) │ │ │ └───────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘二、核心组件详解
- Netty 服务端 (MasterNettyServer)
// 关键配置 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1个Accept线程 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 无限大Worker线程池
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) // TCP全连接队列 .childOption(ChannelOption.SO_KEEPALIVE, true) // TCP保活性能关键点:
┌─────────────────────┬───────────┬──────────────────┐ │ 配置 │ 值 │ 作用 │ ├─────────────────────┼───────────┼──────────────────┤ │ NioEventLoopGroup() │ 默认CPU*2 │ 高效I/O多路复用 │ ├─────────────────────┼───────────┼──────────────────┤ │ SO_KEEPALIVE │ true │ 检测死连接 │ ├─────────────────────┼───────────┼──────────────────┤ │ SO_BACKLOG │ 128 │ 缓解瞬时连接洪峰 │ └─────────────────────┴───────────┴──────────────────┘- Protobuf 序列化 (MessageEncoder/Decoder)
┌─────────────────────────────────────────┐ │ 4B Magic + 1B Version │ │ + 4B Length + 1B Type │ ├─────────────────────────────────────────┤ │ Body (Protobuf) │ └─────────────────────────────────────────┘ Total: 10B HeaderMessageEncoder 关键代码:
// 直接序列化Protobuf,无中间转换 if (msg.getBody() instanceof MessageLite) { bodyBytes = ((MessageLite) msg.getBody()).toByteArray(); }对比JSON:
┌───────────────────┬──────┬──────────┬────────┐ │ 序列化方式 │ 体积 │ 解析速度 │ 跨语言 │ ├───────────────────┼──────┼──────────┼────────┤ │ Protobuf │ 最小 │ 最快 │ ✅ │ ├───────────────────┼──────┼──────────┼────────┤ │ JSON │ 大 │ 中等 │ ✅ │ ├───────────────────┼──────┼──────────┼────────┤ │ Java Serializable │ 大 │ 慢 │ ❌ │ └───────────────────┴──────┴──────────┴────────┘- ChannelManager (Worker连接管理)
@Component public class ChannelManager { // workerId -> Channel (长连接) private final Map<String, Channel> workerChannels = new ConcurrentHashMap<>();
// 高性能并发读写 public void register(String workerId, Channel channel) { workerChannels.put(workerId, channel); }
// Master向指定Worker发消息 public void sendToWorker(String workerId, NettyMessage msg) { Channel ch = workerChannels.get(workerId); if (ch != null && ch.isActive()) { ch.writeAndFlush(msg); } } }- Redis心跳租约 (MasterHandler)
// Master收到Worker心跳后: private void handleHeartbeat(ChannelHandlerContext ctx, HeartbeatRequest req) { String workerId = req.getWorkerId();
// 1. 维护TCP长连接映射 channelManager.register(workerId, ctx.channel());
// 2. Redis心跳 (30秒TTL) String hbKey = "worker:" + workerId + ":hb"; redisTemplate.opsForValue().set(hbKey, "alive", 30, TimeUnit.SECONDS);
// 3. 任务运行时续约 (120秒TTL) if (currentTaskId != null) { String taskKey = "worker:" + workerId + ":task"; redisTemplate.expire(taskKey, 120, TimeUnit.SECONDS); } }心跳续约机制:
Worker: 每10秒发送心跳 ──────────────────────────────▶ │Master: 更新Redis TTL ────────────────────────────────▶ worker:{id}:hb TTL=30s worker:{id}:task TTL=120s
如果Worker宕机: Redis TTL过期 ──▶ Master检测到"worker超时" ──▶ 重新调度任务- Worker端 RedisLeaseManager
// Worker使用Lettuce同步客户端 RedisURI uri = RedisURI.builder().withHost(host).withPort(port).build(); RedisClient client = RedisClient.create(uri); StatefulRedisConnection<String, String> connection = client.connect(); RedisCommands<String, String> commands = connection.sync();
// 关键方法 public void renew(String currentTaskId, String lastTaskId) { commands.setex(hbKey(), hbTtlSeconds, "alive"); // 心跳 commands.setex(taskKey(), taskTtlSeconds, currentTaskId); // 任务 commands.setex(taskOwnerKey(currentTaskId), taskTtlSeconds, workerId); }为什么用Lettuce而不是Spring Data Redis?
┌─────────┬──────────┬────────┬────────┐ │ 客户端 │ 线程安全 │ 连接池 │ 响应式 │ ├─────────┼──────────┼────────┼────────┤ │ Lettuce │ ✅ │ 内置 │ ✅ │ ├─────────┼──────────┼────────┼────────┤ │ Jedis │ ⚠️需池 │ 需池 │ ❌ │ └─────────┴──────────┴────────┴────────┘三、消息流分析
- 任务下发流程
Master Worker │ │ │ EXECUTE_TASK (Protobuf) │ │───────────────────────────────────▶│ channelRead0() │ │ │ WorkerHandler.runPythonTask() │ │ │ │ │ ├── persistTaskStart(taskId) │ │ │ └── Redis SETEX task key │ │ │ │ │ └── 启动Python进程 │ │ │ │ EXECUTE_TASK_RESPONSE │ │◀──────────────────────────────────│ ctx.writeAndFlush() │ │ │ (异步日志流) │ │◀──────────────────────────────────│ LOG_DATA (每行日志) │ │ │ (任务结束) │ │◀──────────────────────────────────│ TASK_STATUS_REPORT │ │ │ releaseTaskOwner(taskId) │ │ tryDispatchQueuedTaskToWorker() │- 心跳续约时序
Worker Redis Master │ │ │ │──── SETEX worker:1:hb ────────────▶│ │ │──── SETEX worker:1:task ──────────▶│ │ │ │ │ │ (10秒后) │ │ │──── SETEX worker:1:hb ────────────▶│ │ │──── SETEX worker:1:task ──────────▶│ │ │ │ │ │ (如果宕机) │ │ │ (不再发送心跳) │ │ │ │ TTL=30s expire │ │ │◀──── worker:1:hb 消失 │ │ │ │ │ Master检测到心跳丢失 │ │ 重新调度任务 │四、性能优化点
- Netty优化
// 当前配置 .childOption(ChannelOption.SO_KEEPALIVE, true) // TCP保活 .childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle,低延迟 .childOption(ChannelOption.SO_LINGER, 0) // 关闭时立即销毁
// 可增强配置 .childOption(ChannelOption.SO_RCVBUF, 65536) // 增大接收缓冲 .childOption(ChannelOption.SO_SNDBUF, 65536) // 增大发送缓冲- Redis优化
// Pipeline批量操作(减少网络RTT) List<String> keys = Arrays.asList( "worker:" + workerId + ":hb", "worker:" + workerId + ":task" ); redisTemplate.executePipelined((RedisCallback<Object>) connection -> { for (String key : keys) { connection.stringCommands().setEx( key.getBytes(), ttlSeconds, "alive".getBytes() ); } return null; });- 可改进空间
┌─────────────────┬────────────────────────────┬────────────┐ │ 当前 │ 可优化为 │ 收益 │ ├─────────────────┼────────────────────────────┼────────────┤ │ 同步Redis命令 │ Lettuce异步/VirtualThread │ 更高并发 │ ├─────────────────┼────────────────────────────┼────────────┤ │ Thread-per-Task │ Netty EventLoop │ 减少线程数 │ ├─────────────────┼────────────────────────────┼────────────┤ │ 单Reactor │ 多Reactor (boss1, workerN) │ 多核扩展 │ ├─────────────────┼────────────────────────────┼────────────┤ │ 无连接池 │ Lettuce连接池 │ 复用连接 │ └─────────────────┴────────────────────────────┴────────────┘五、关键设计亮点
1. 心跳续约 + Redis TTL = 分布式故障检测 ├── 简单可靠,无需额外监控 └── TTL过期 = Worker宕机/失联
2. TaskIdKey 双重保险 ├── worker:{id}:task (Worker自我报告) └── task:{id}:workerId (调度所有权) └── 两者不一致 = 任务异常,需要修复
3. LogManager 异步解耦 ├── MasterHandler 接收日志后 enqueue └── 独立线程异步写入文件 └── 不阻塞EventLoop
4. ChannelManager 本地缓存 ├── 无需每次查Redis └── O(1) Channel查找 └── 但需要额外机制处理Worker迁移六、与高性能的差距
┌──────┬──────────────────────┬───────────────────┐ │ 方面 │ 当前实现 │ 生产级优化 │ ├──────┼──────────────────────┼───────────────────┤ │ 协议 │ Protobuf ✅ │ 可加压缩 (Snappy) │ ├──────┼──────────────────────┼───────────────────┤ │ 连接 │ 长连接 ✅ │ 连接池化 │ ├──────┼──────────────────────┼───────────────────┤ │ 线程 │ NioEventLoopGroup ✅ │ 可绑核 (Epoll) │ ├──────┼──────────────────────┼───────────────────┤ │ 心跳 │ Redis续约 ✅ │ 可用Netty内置 │ ├──────┼──────────────────────┼───────────────────┤ │ 调度 │ 简单队列 │ 可加优先级队列 │ ├──────┼──────────────────────┼───────────────────┤ │ 容错 │ 心跳检测 ✅ │ 可加重试+幂等 │ └──────┴──────────────────────┴───────────────────┘这是一个轻量但有效的生产级原型,关键路径设计合理。
Netty-Redis for RPC
https://sgjki547.top/posts/netty-redis/