5326 words
27 minutes
RL-scheduler Bug fixed——netty @sharable 导致同一个handler在新channel复用失败

Master-Worker 弹性恢复修复文档#

修复日期: 2026-04-03 / 2026-04-11 影响范围: Worker 重连机制、Master 宕机恢复、任务状态一致性 状态: 部分完成(见下方说明)

修复进度#

模块状态说明
Part 1: Worker 重连指数退避✅ 已实施1秒稳定期连接确认机制已加
Part 1: Worker 重连始终失败(Netty Handler 复用)✅ 已修复根因是 Worker 重连时复用同一个非 @SharableWorkerHandler 实例,导致第 2 次及之后的连接在 pipeline 初始化阶段直接失败
Part 2: 任务所有权立即持久化✅ 已解决僵尸任务(RUNNING 永久悬停)问题已解决
Part 3: Master 重启恢复逻辑增强✅ 已解决三段式恢复 + 心跳处理器通知丢失检测已生效
Part 4: Master 启动状态重建✅ 已解决@PostConstruct 启动时重建已生效

结论更新(2026-04-11): “1 秒稳定期仍无效”的根因并非 closeFuture 时序,而是 Worker 重连时复用同一个非 @Sharable handler 导致连接初始化失败;见本文第十节 Bug Fix Log。


一、问题描述#

1.1 故障场景#

当 Worker 正在执行训练任务时,如果 Master 突然宕机会出现以下问题:

  1. Worker 无法重连:Worker 使用固定 5 秒延迟重试,但 Master 重启后 Worker 持续重连失败,形成”惊群效应”(thundering herd)
  2. 任务永久悬停:任务在数据库中保持 RUNNING 状态,Worker 继续训练但 Master 无法感知
  3. 资源无法释放:Worker 的 worker:{workerId}:task 在 Redis 中被永久占用,导致其他任务无法分配
  4. 状态不一致:Master 重启后无法区分”任务已完成但通知丢失”和”任务仍在运行”

1.2 影响分析#

影响项严重程度说明
Worker 重连失效TCP 连接成功但应用层状态未建立
任务状态不一致DB=Running, Redis=无对应 Worker
资源泄漏Redis key 永不过期,Worker 无法接新任务
恢复时间长惊群效应导致 Master 启动压力增大

二、根因分析#

2.1 Worker 重连问题#

原代码(WorkerAgent.java):

channel.closeFuture().addListener((ChannelFutureListener) closeFuture -> {
LOG.warn(">>> 与 Master 连接断开,5秒后重连...");
closeFuture.channel().eventLoop().schedule(() -> connect(b), 5, TimeUnit.SECONDS);
});

问题:

  • 固定 5 秒延迟,多个 Worker 同时重连产生惊群效应
  • 无重连状态跟踪,可能出现并发重连
  • 无指数退避,Master 完全启动前反复失败

2.2 任务所有权时间差#

Master 调度任务的执行顺序:

1. Master 写入 task:{taskId}:workerId → Redis ✓
2. Master 发送 ExecuteTaskRequest → Netty
3. Worker 接收,设置 currentTaskId,返回确认
4. Worker 发送 ExecuteTaskResponse
5. Worker 在下次心跳(10秒间隔)时才写入 worker:{workerId}:task → Redis ✗

时间差窗口: 如果 Master 在步骤 1-4 之间宕机,task:{taskId}:workerId 已在 Redis 中,但 worker:{workerId}:task 尚未写入。恢复程序无法判断任务状态。

2.3 恢复盲点#

RunningTaskRecovery 逻辑:

// 原逻辑:只检查 taskOwnerKey 是否存在
String workerId = redisTemplate.opsForValue().get(taskWorkerKey(taskId));
if (workerId == null || workerId.isBlank()) {
continue; // 无 owner → 跳过(实际上这是调度中断,应标记 PENDING)
}
Boolean hbAlive = redisTemplate.hasKey("worker:" + workerId + ":hb");
if ((hbAlive != null && hbAlive) && (taskKeyExists != null && taskKeyExists)) {
continue; // Worker 存活且 taskKey 存在 → 跳过(但无法判断任务是否真的在运行)
}

盲点:

  • taskOwnerKey 存在但 taskKey 不存在 → 无法判断(已完成?中断?)
  • taskOwnerKey 不存在 → 原逻辑直接跳过(实际是调度中断,应重置)
  • Master 重启后不知道 Worker 的 currentTaskId 是否是重启前的残留

2.4 僵尸任务问题#

当任务完成但 TASK_STATUS_REPORT 丢失时:

Worker: currentTaskId = ""(已清空)
Worker: 发送心跳(currentTaskId = "")
Master: 看到空闲 Worker,尝试分配新任务
问题: 原任务可能分配给其他 Worker,而原任务仍在 Redis 中有记录

三、诊断过程#

3.1 诊断方法#

  1. 日志追踪:分析 Worker 日志中重连行为的时序
  2. Redis 键值分析:检查宕机前后 Redis 中的 key 状态
  3. 代码路径分析:追踪 Master 宕机时各模块的执行状态
  4. 状态机分析:绘制 Worker-Master-Netty-Redis 交互时序图

3.2 Redis Key 分析#

Key 格式TTL创建时机说明
worker:{workerId}:hb30s心跳时Worker 存活标志
worker:{workerId}:task120s下次心跳时(原来)→ 任务接收时(修复后)Worker 当前任务
task:{taskId}:workerId120sMaster 调度时任务所属 Worker

3.3 关键时间点分析#

T0: Master 写入 taskOwnerKey(Redis)
T0+100ms: Netty 消息发送
T0+200ms: Worker 接收,设置 currentTaskId
T0+10s: Worker 发送心跳(写入 workerTaskKey)
Gap [T0, T0+10s]: taskOwnerKey 存在,workerTaskKey 不存在
→ 修复后 persistTaskStart() 在 T0+205ms(任务接收时)立即写入 workerTaskKey
→ Gap 缩小到 ~0ms(原子性)

四、执行方案#

4.1 方案设计原则#

  1. 消除时间差:Worker 接收任务时立即写入 Redis(不在下次心跳时)
  2. 指数退避:防止惊群效应,给予 Master 充分启动时间
  3. 单次重连:防止并发重连浪费资源
  4. 双重确认:同时检查 taskOwnerKeyworkerTaskKey 判断任务真实状态
  5. 启动即恢复:Master 启动时主动重建状态,不依赖定时扫描

4.2 方案对比#

方案优点缺点
A: 只修 Worker 重连简单任务状态不一致问题未解决
B: 只修恢复逻辑无需改 Worker无法处理 workerTaskKey 缺失的边界情况
C: 两者都修(采用)彻底解决重连和恢复问题改动量较大

五、实际修改逻辑#

5.1 Part 1: Worker 重连指数退避(WorkerAgent.java)#

修改常量:

private static final long INITIAL_RECONNECT_DELAY_SECONDS = 1; // 初始延迟 1 秒
private static final long MAX_RECONNECT_DELAY_SECONDS = 60; // 最大 60 秒
private static final double BACKOFF_MULTIPLIER = 2.0; // 2 倍退避
private static final double JITTER_FACTOR = 0.25; // ±25% 抖动

延迟序列(无抖动示例):

第1次: 1s → 2s → 4s → 8s → 16s → 32s → 60s → 60s → ...

核心逻辑:

private void connect(Bootstrap b) {
if (isReconnecting) {
return; // 防止并发重连
}
isReconnecting = true;
b.connect(masterHost, masterPort).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
currentReconnectDelay = INITIAL_RECONNECT_DELAY_SECONDS; // 成功后重置
isReconnecting = false;
// ... 连接成功处理
} else {
scheduleReconnect(b); // 指数退避重连
}
});
}
private void scheduleReconnect(Bootstrap b) {
long delayWithJitter = calculateDelayWithJitter(currentReconnectDelay);
currentReconnectDelay = Math.min(
(long) (currentReconnectDelay * BACKOFF_MULTIPLIER),
MAX_RECONNECT_DELAY_SECONDS
);
channel.eventLoop().schedule(() -> {
isReconnecting = false;
connect(b);
}, delayWithJitter, TimeUnit.SECONDS);
}

5.1.1 补充修复:连接稳定确认机制#

问题现象: isSuccess() = true 仅表示 TCP 三次握手完成,不代表应用层就绪。Master 重启时,TCP 监听已开放但 Spring/Netty 尚未完成初始化,导致 Worker 连接后立即被远程关闭,触发 closeFuture,进而以指数退避重复重连,最终稳定在 60s 延迟。

问题时序:

Worker: connect() → SYN
Master: TCP 监听就绪,SYN-ACK 返回
Worker: isSuccess() = true → 注册 closeFuture
→ channel = future.channel()
→ isReconnecting = false
Master: Spring 上下文初始化中,Netty pipeline 未就绪
→ 心跳无法处理或 NPE
→ TCP RST 发送给 Worker
Worker: closeFuture 触发 → scheduleReconnect(currentReconnectDelay=60)
→ 连接被关闭后立即以 60s 延迟重试
→ 循环...

修复后的连接确认逻辑:

private void connect(Bootstrap b) {
if (isReconnecting) {
return;
}
isReconnecting = true;
final Channel[] pendingChannel = new Channel[1]; // 数组包装避免 lambda 捕获
b.connect(masterHost, masterPort).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
scheduleReconnect(b, 0);
return;
}
// TCP 握手成功,等待 1 秒确认应用层已就绪
pendingChannel[0] = future.channel();
LOG.info(">>> TCP 握手成功,等待应用层就绪...");
pendingChannel[0].eventLoop().schedule(() -> {
if (!pendingChannel[0].isActive()) {
// 1 秒内连接失效 → 立即重连,不等待指数退避
LOG.warn(">>> 连接不稳定(1秒内失效),立即重连...");
pendingChannel[0].close();
scheduleReconnect(b, 0);
return;
}
// 连接真正稳定,注册 closeFuture
channel = pendingChannel[0];
currentReconnectDelay = INITIAL_RECONNECT_DELAY_SECONDS;
isReconnecting = false;
LOG.info(">>> 成功连接到 Master(连接已稳定)!");
channel.closeFuture().addListener((ChannelFutureListener) closeFuture -> {
if (isReconnecting) {
return; // 防止双重触发
}
scheduleReconnect(b, 0);
});
}, 1, TimeUnit.SECONDS);
});
}

关键改进:

  1. 1 秒稳定期isSuccess() 后等待 1 秒,让 Master 有充足时间完成 Spring 启动
  2. 主动检测失效:稳定期内如果 isActive() = false,立即重连而不是等到 closeFuture
  3. isReconnecting 守卫:closeFuture 中检查 isReconnecting 标志,防止与 scheduleReconnect 双重触发

5.2 Part 2: 任务所有权立即持久化(WorkerHandler + RedisLeaseManager)#

修改点 1:WorkerHandler.handleExecuteTask()

private void handleExecuteTask(ChannelHandlerContext ctx, ExecuteTaskRequest req) {
String taskId = req.getTaskId();
this.lastTaskId = taskId;
this.currentTaskId = taskId;
this.currentAttempt = req.getAttempt();
// ★ 在启动 Python 线程之前立即写入 Redis(原子性)
if (leaseManager != null) {
leaseManager.persistTaskStart(taskId);
}
// 1. 返回响应
// 2. 异步执行 Python 脚本
new Thread(() -> runPythonTask(ctx, req), "Task-Executor-" + taskId).start();
}

修改点 2:WorkerHandler.reportStatus()

private void reportStatus(ChannelHandlerContext ctx, String taskId, String status, String errorMsg) {
// ★ 立即清除 Redis 任务所有权
if (leaseManager != null) {
leaseManager.clearTask(taskId);
}
// 发送状态报告...
}

修改点 3:RedisLeaseManager 新增方法

public void persistTaskStart(String taskId) {
// 同时写入两个 key,保证双向一致性
commands.setex(taskKey(), taskTtlSeconds, taskId); // worker:{workerId}:task
commands.setex(taskOwnerKey(taskId), taskTtlSeconds, workerId); // task:{taskId}:workerId
}
public void clearTask(String taskId) {
commands.del(taskKey());
if (taskId != null && !taskId.isBlank()) {
commands.del(taskOwnerKey(taskId));
}
}

效果:时间差从 ~10 秒降低到 ~0 秒

5.3 Part 3: Master 重启恢复增强(RunningTaskRecovery + MasterHandler)#

RunningTaskRecovery 修复后的逻辑:

for (TrainingTask task : running) {
String taskId = task.getId();
String workerId = redisTemplate.opsForValue().get(taskWorkerKey(taskId));
if (workerId == null || workerId.isBlank()) {
// ★ 情况1: taskOwnerKey 不存在 → 调度中断,从未真正开始
task.setStatus("PENDING");
taskMapper.updateById(task);
schedulerService.enqueueTask(taskId);
LOG.info(">>> [Recovery] 任务 [{}] 从未分发(无 owner key),标记为 PENDING", taskId);
continue;
}
Boolean hbAlive = redisTemplate.hasKey("worker:" + workerId + ":hb");
if (hbAlive != null && hbAlive) {
// ★ 情况2: Worker 存活 → 任务在运行或刚完成(通知可能丢失)
// 保持 RUNNING,由心跳处理器判断
continue;
}
// ★ 情况3: Worker 已死 → 任务孤立
task.setStatus("PENDING");
taskMapper.updateById(task);
schedulerService.enqueueTask(taskId);
schedulerService.releaseTaskOwner(taskId);
LOG.info(">>> [Recovery] 任务 [{}] 的 Worker [{}] 已失活,标记为 PENDING", taskId, workerId);
}

MasterHandler.handleHeartbeat() 增强:

private void handleHeartbeat(ChannelHandlerContext ctx, HeartbeatRequest req) {
String currentTaskId = req.getCurrentTaskId();
if (currentTaskId != null && !currentTaskId.isEmpty()) {
// 续期 TTL
schedulerService.renewTaskOwnerTtl(currentTaskId);
// ★ 检查 taskOwnerKey 是否匹配(通知丢失检测)
String ownerKey = "task:" + currentTaskId + ":workerId";
String ownerWorkerId = redisTemplate.opsForValue().get(ownerKey);
if (ownerWorkerId == null || !ownerWorkerId.equals(workerId)) {
// taskOwnerKey 缺失 → 任务已完成但通知丢失
TrainingTask task = taskMapper.selectById(currentTaskId);
if (task != null && "RUNNING".equals(task.getStatus())) {
task.setStatus("COMPLETED");
task.setCompletedAt(LocalDateTime.now());
taskMapper.updateById(task);
LOG.info(">>> [Heartbeat] 任务 [{}] 实际已完成(通知丢失),强制标记为 COMPLETED", currentTaskId);
messagingTemplate.convertAndSend("/topic/tasks", task);
}
schedulerService.tryDispatchQueuedTaskToWorker(workerId);
}
} else {
// ★ Worker 空闲时检查并修复孤儿 RUNNING 任务
checkAndFixStaleRunningTasks(workerId);
schedulerService.tryDispatchQueuedTaskToWorker(workerId);
}
}

5.4 Part 4: Master 启动状态重建(SchedulerService)#

@PostConstruct
public void reconstructWorkerTasksFromRedis() {
LOG.info(">>> [SchedulerService] 开始从 Redis 重建 Worker 任务状态...");
Set<String> workerTaskKeys = redisTemplate.keys("worker:*:task");
for (String workerTaskKey : workerTaskKeys) {
String workerId = workerTaskKey.split(":")[1];
String taskId = redisTemplate.opsForValue().get(workerTaskKey);
// 检查 taskOwnerKey 是否匹配
String ownerKey = taskWorkerKey(taskId);
String ownerWorkerId = redisTemplate.opsForValue().get(ownerKey);
if (ownerWorkerId == null || !ownerWorkerId.equals(workerId)) {
// 调度被中断,标记为 PENDING 重新入队
TrainingTask task = taskMapper.selectById(taskId);
if (task != null && "RUNNING".equals(task.getStatus())) {
task.setStatus("PENDING");
taskMapper.updateById(task);
enqueueIfEnabled(taskId);
LOG.info(">>> [Recovery] 启动重建:任务 [{}] 调度中断,标记为 PENDING", taskId);
}
}
}
}

六、修改文件清单#

文件路径修改类型修改内容
worker/WorkerAgent.java修改指数退避重连 + isReconnecting 状态跟踪 + 1秒稳定期连接确认
worker/netty/WorkerHandler.java修改注入 LeaseManager,任务接收时立即持久化,报告时清除
worker/redis/RedisLeaseManager.java修改新增 persistTaskStart() 和 clearTask() 方法
master/service/RunningTaskRecovery.java修改三段式恢复逻辑(中断/存活/死亡)
master/netty/MasterHandler.java修改心跳中检测通知丢失,新增 checkAndFixStaleRunningTasks()
master/service/SchedulerService.java修改新增 @PostConstruct reconstructWorkerTasksFromRedis()

七、Redis 数据流(修复后)#

7.1 正常任务执行流程#

Worker 接收任务
→ persistTaskStart(taskId)
→ SETEX worker:{workerId}:task TTL=120s
→ SETEX task:{taskId}:workerId TTL=120s
→ 启动 Python 训练线程
→ (训练中) heartbeat 每 10s 续期 TTL
→ (完成) reportStatus()
→ clearTask(taskId)
→ DEL worker:{workerId}:task
→ DEL task:{taskId}:workerId
→ 发送 TASK_STATUS_REPORT

7.2 Master 宕机恢复流程#

Master 重启
→ @PostConstruct reconstructWorkerTasksFromRedis()
→ 扫描 worker:*:task
→ 检查 taskOwnerKey 是否匹配
→ 不匹配 → 标记 PENDING,重新入队
同时 RunningTaskRecovery 每 5s 扫描
→ taskOwnerKey 不存在 → 标记 PENDING
→ Worker hb 不存在 → 标记 PENDING,释放 owner
Worker 重连
→ 指数退避(1s → 2s → 4s → ...)
→ 发送心跳(携带 currentTaskId)
→ Master 检查 taskOwnerKey 匹配性
→ 匹配 → 任务正常运行
→ 不匹配 → 标记 COMPLETED(通知丢失)

7.3 Worker-Master 交互图(正常执行)#

sequenceDiagram
    autonumber
    participant M as Master(SchedulerService/MasterHandler)
    participant R as Redis
    participant W as Worker(WorkerAgent/WorkerHandler)
    participant N as Netty Channel

    Note over W: 定时心跳(10s)\n带 currentTaskId(来自 WorkerState)
    W->>N: HEARTBEAT(currentTaskId)
    N->>M: HEARTBEAT
    M->>R: SET worker:{workerId}:hb = alive (TTL 30s)
    alt currentTaskId != ""
        M->>R: EXPIRE worker:{workerId}:task (TTL 120s)
        M->>R: EXPIRE task:{taskId}:workerId (TTL 120s)
    end

    Note over M: 调度开始:选择在线 worker
    M->>R: keys worker:*:hb
    M->>R: Lua tryPreemptWorker\nif hb==alive && !exists(worker:{id}:task)\nthen SET worker:{id}:task=taskId EX 120
    alt 抢占成功
        M->>R: SET task:{taskId}:workerId = workerId (TTL 120s)
        M->>N: EXECUTE_TASK(taskId, attempt, traceId)
        N->>W: EXECUTE_TASK
        W->>R: persistTaskStart(taskId)\nSETEX worker:{id}:task=taskId (TTL 120s)\nSETEX task:{taskId}:workerId=workerId (TTL 120s)
        W->>N: EXECUTE_TASK_RESPONSE(accepted=true)
        N->>M: EXECUTE_TASK_RESPONSE
        Note over W: 启动训练线程\n持续推送 LOG_DATA / TASK_STATUS_REPORT
        W->>N: TASK_STATUS_REPORT(COMPLETED/FAILED)
        N->>M: TASK_STATUS_REPORT
        W->>R: clearTask(taskId)\nDEL worker:{id}:task\nDEL task:{taskId}:workerId
        M->>R: DEL worker:{id}:task (兜底清理)
        M->>R: DEL task:{taskId}:workerId (releaseTaskOwner)
    else 抢占失败
        Note over M: worker 忙或不在线\n任务入队/等待
    end

7.4 Worker-Master 交互图(Master 宕机 + 恢复 + Worker 重连)#

sequenceDiagram
    autonumber
    participant M as Master
    participant R as Redis
    participant W as Worker

    Note over W: 任务执行中 / 或准备执行中
    Note over M: Master 宕机

    alt Master 宕机前已写入部分 Redis key
        Note over R: 可能存在:\n1) worker:{id}:task=taskId\n2) task:{taskId}:workerId=workerId\n3) worker:{id}:hb(会过期)
    end

    Note over M: Master 重启
    M->>R: 扫描 worker:*:task
    M->>R: 对每个 workerTaskKey 读取 taskId
    M->>R: 读取 task:{taskId}:workerId
    alt ownerKey 缺失或不匹配
        Note over M: 判定为调度中断 / 不一致\n任务回滚到 PENDING 并重新入队
    else ownerKey 匹配
        Note over M: 保持 RUNNING\n等待心跳继续续期
    end

    Note over W: Worker 按指数退避重连
    W->>M: 重连成功后发送 HEARTBEAT(currentTaskId)
    M->>R: SET worker:{id}:hb alive (TTL 30s)
    alt currentTaskId != "" 且 ownerKey 匹配
        M->>R: EXPIRE worker:{id}:task (TTL 120s)
        M->>R: EXPIRE task:{taskId}:workerId (TTL 120s)
        Note over M: 任务继续追踪为 RUNNING
    else currentTaskId != "" 但 ownerKey 缺失/不匹配
        Note over M: 判定通知丢失或所有权丢失\n按策略标记 COMPLETED 或重新调度
    else currentTaskId == ""
        Note over M: worker 空闲\n尝试派发队列任务
    end

7.5 Redis Key 如何保证“稳定分发”(一致性约束)#

flowchart TB
    A["选择 worker:从 worker:*:hb 中筛选"] --> B{"Lua tryPreemptWorker<br/>原子检查与占位"}
    B -->|"hb 存活 且 worker 空闲"| C["SET worker:{id}:task=taskId EX 120<br/>占用 worker 资源"]
    B -->|"否则"| Z["抢占失败<br/>任务入队/等待"]

    C --> D["SET task:{taskId}:workerId=workerId EX 120<br/>记录任务 owner"]
    D --> E["Netty 下发 EXECUTE_TASK"]
    E --> F["Worker 收到任务<br/>persistTaskStart(taskId)"]
    F --> G["SETEX worker:{id}:task=taskId EX 120<br/>SETEX task:{taskId}:workerId=workerId EX 120<br/>双向写入(缩短时间差)"]
    G --> H["训练中:心跳续期<br/>续期 hbKey / taskKey / ownerKey"]
    H --> I{"任务结束?"}
    I -->|"是"| J["Worker clearTask(taskId)<br/>DEL worker:{id}:task<br/>DEL task:{taskId}:workerId<br/>并上报 TASK_STATUS_REPORT"]
    J --> K["Master 兜底清理并 releaseTaskOwner<br/>避免残留 key 阻塞调度"]
    I -->|"否"| H

核心约束(可以用来理解恢复与一致性判断)

  1. worker:{workerId}:hb(30s)表示 Worker 存活;调度只考虑存活 Worker。
  2. worker:{workerId}:task(120s)是 Worker “资源占用位”,Lua 抢占保证同一时刻只会被一个任务占用。
  3. task:{taskId}:workerId(120s)是任务 owner;用于 Master 恢复时判断任务是否仍有明确归属。
  4. 理想一致性:worker:{workerId}:task = taskIdtask:{taskId}:workerId = workerId。若出现缺失/不匹配,恢复逻辑将其判定为“调度中断/通知丢失”等异常分支并做纠正(回滚 PENDING/强制 COMPLETED/重新入队)。

八、测试用例#

8.1 单元测试:指数退避计算#

@Test
void testExponentialBackoffWithJitter() {
WorkerAgent agent = new WorkerAgent("localhost", 9000, "test-worker");
long baseDelay = 1;
// 计算 5 次延迟,验证指数增长
long delay1 = calculateDelayWithJitter(1);
long delay2 = calculateDelayWithJitter(2);
long delay3 = calculateDelayWithJitter(4);
long delay4 = calculateDelayWithJitter(8);
// 抖动范围:delay * (1 - JITTER_FACTOR) <= actual <= delay * (1 + JITTER_FACTOR)
assertTrue(delay1 >= 0.75 && delay1 <= 1.25);
assertTrue(delay2 >= 1.5 && delay2 <= 2.5);
assertTrue(delay3 >= 3.0 && delay3 <= 5.0);
// 最大延迟不超过 MAX_RECONNECT_DELAY_SECONDS
long delay60 = calculateDelayWithJitter(60);
assertTrue(delay60 <= 75); // 60 * 1.25
}

8.2 单元测试:persistTaskStart 原子性#

@Test
void testPersistTaskStartWritesBothKeys() {
RedisLeaseManager manager = new RedisLeaseManager("worker-1");
manager.persistTaskStart("task-123");
// 验证两个 key 都存在
assertEquals("task-123", redis.get("worker:worker-1:task"));
assertEquals("worker-1", redis.get("task:task-123:workerId"));
manager.clearTask("task-123");
// 验证两个 key 都被删除
assertNull(redis.get("worker:worker-1:task"));
assertNull(redis.get("task:task-123:workerId"));
}

8.3 集成测试:Master 宕机后任务恢复#

@Test
void testTaskRecoversAfterMasterRestart() {
// 1. 提交任务,Worker 接收并开始训练
String taskId = submitTask();
worker.receiveTask(taskId);
assertEquals("RUNNING", db.getTaskStatus(taskId));
// 2. Master 宕机(模拟)
master.stop();
// 3. Worker 继续训练(currentTaskId 不变)
assertEquals(taskId, worker.getCurrentTaskId());
assertTrue(redis.exists("task:" + taskId + ":workerId"));
assertTrue(redis.exists("worker:" + worker.getId() + ":task"));
// 4. Master 重启
master.restart();
// 5. Worker 重连(指数退避后成功)
worker.reconnectEventually();
// 6. 验证任务仍被追踪为 RUNNING
assertEquals("RUNNING", db.getTaskStatus(taskId));
// taskOwnerKey 存在且匹配,任务保持 RUNNING
}

8.4 集成测试:调度中断场景#

@Test
void testTaskRequeuedWhenDispatchInterrupted() {
// 1. Master 写入 taskOwnerKey(Redis)
redis.set("task:task-X:workerId", "worker-1");
// 2. Master 宕机(尚未发送 Netty 消息)
master.stop();
// 3. RunningTaskRecovery 扫描
recovery.recoverOrphanRunningTasks();
// 4. 验证:taskOwnerKey 存在但 taskKey 不存在
// → 任务从未分发,标记为 PENDING
assertEquals("PENDING", db.getTaskStatus("task-X"));
assertTrue(queue.contains("task-X"));
}

8.5 集成测试:通知丢失场景#

@Test
void testTaskMarkedCompletedWhenNotificationLost() {
// 1. Worker 完成任务,清除 Redis key
worker.clearTask("task-Y");
// 此时:DB=RUNNING, Redis taskOwnerKey=不存在
// 2. Master 宕机(未收到 TASK_STATUS_REPORT)
master.stop();
// 3. Worker 发送心跳(currentTaskId = "",已清空)
worker.sendHeartbeat(currentTaskId: "");
// 4. Master 重启,Worker 重连
master.restart();
worker.reconnect();
// 5. RunningTaskRecovery 扫描
// → taskOwnerKey 不存在 → 标记 PENDING
// 6. Worker 空闲,Master 分发新任务
// (旧任务标记 PENDING 后可被重新调度)
}

8.6 集成测试:惊群效应防止#

@Test
void testNoThunderingHerdOnMasterRestart() {
// 1. 启动 10 个 Worker 连接 Master
List<WorkerAgent> workers = startWorkers(10);
workers.forEach(w -> assertTrue(w.isConnected()));
// 2. Master 宕机
master.stop();
// 3. 记录每个 Worker 首次重连时间
Map<WorkerAgent, Long> firstReconnectTimes = new HashMap<>();
workers.forEach(w -> {
w.onReconnectAttempt(() -> firstReconnectTimes.put(w, System.currentTimeMillis()));
});
// 4. Master 重启
master.restart();
// 5. 验证:10 个 Worker 的首次重连时间分散在 1s~4s 范围内
// (指数退避 + 抖动,而非全部同时 1s)
long minTime = firstReconnectTimes.values().stream().min(Long::compare).get();
long maxTime = firstReconnectTimes.values().stream().max(Long::compare).get();
assertTrue(maxTime - minTime > 500); // 至少有 500ms 分散
}

九、验证检查清单#

  • Worker 重连使用指数退避(1s → 2s → 4s → … → 60s)
  • 重连包含 ±25% 随机抖动
  • isReconnecting 标志防止并发重连
  • TCP 握手成功后等待 1 秒稳定期才注册 closeFuture
  • 稳定期内连接失效时立即重连(不等 closeFuture
  • persistTaskStart() 在 Python 线程启动前调用
  • clearTask()reportStatus() 中调用
  • taskOwnerKeyworkerTaskKey 同时写入/删除
  • RunningTaskRecovery 区分三种状态(中断/存活/死亡)
  • checkAndFixStaleRunningTasks() 修复孤儿 RUNNING 任务
  • @PostConstruct 在 Master 启动时执行状态重建
  • 心跳处理器检测 taskOwnerKey 匹配性
  • 编译通过(./mvnw compile
  • 测试通过(./mvnw test

十、Bug Fix Log(2026-04-11:Worker 重连始终失败)#

10.1 问题现象#

  1. master 必须先启动:若 worker 先启动,后续即便 master 启动,worker 仍无法连接成功。
  2. master 宕机恢复后 worker 重连无效:worker 侧日志显示持续重试,但始终无法稳定建立连接。

10.2 已做过的尝试(为何一开始误判)#

  1. 假设 1:Master 启动期应用层未就绪
    • 现象上表现为:TCP 握手成功,但随后立刻被关闭/重连,容易联想到 “端口已监听但业务未 ready”。
    • 对应尝试:在 Worker connect() 成功后增加“1 秒稳定期确认”,避免过早认定连接可用。
  2. 假设 2:惊群效应导致 Master 压力过大
    • 对应尝试:指数退避 + 抖动(已实施)。

上述两类尝试都合理,但这个 bug 的真实失败点发生得更早:连接 pipeline 初始化阶段,因此“稳定期确认”不会真正生效。

10.3 定位过程与关键证据#

为了快速验证“worker 先启动 / master 后启动”的场景,采用方式 A:

  1. 不启动 master,直接启动 worker(触发重连循环)。
  2. 观察第 2 次及之后的重连日志,出现关键异常:
io.netty.channel.ChannelPipelineException:
org.sgj.rljobscheduler.worker.netty.WorkerHandler is not a @Sharable handler,
so can't be added or removed multiple times.

该异常由 Netty 在初始化新 Channel 的 pipeline 时抛出(checkMultiplicity),并会导致连接被关闭,外层表现为 “connect 失败不断重试”。

10.4 根因分析(为什么会有 bug)#

Worker 端在 WorkerAgent.start() 时只创建了 一个 WorkerHandler 实例,然后每次 connect()(包括重连)都会创建 新的 Channel,并把同一个 WorkerHandler 再次 addLast 到新 Channel 的 pipeline。

  • Netty 规则:同一个 handler 实例如果不是 @Sharable,不能被添加到多个 pipeline
  • WorkerHandler 不是 @Sharable,且包含任务状态字段(currentTaskId / lastTaskId / attempt),因此复用不仅会触发 Netty 的 multiplicity 保护,也会带来潜在的状态串扰风险。

结果就是:第 1 次连接可能成功(或失败),但第 2 次及之后的连接在 pipeline 初始化时必然失败,从而表现为“所有重连尝试都无效”。

10.5 修复方案(推荐方案落地)#

修复目标:每次重连都能创建一条“干净的 pipeline”,同时保留 worker 必要的业务状态。

  1. 每个 Channel 新建一个 WorkerHandler
    • ChannelInitializer.initChannelnew WorkerHandler(...),避免 handler 实例复用。
  2. 抽离跨连接共享状态 WorkerState
    • currentTaskId / lastTaskId / currentAttempt 从 handler 内移到 WorkerState,由:
      • WorkerAgent 的心跳/续租定时器读取
      • 每次新建的 WorkerHandler 写入
    • 这样即使发生重连/换 Channel,worker 的任务状态仍能持续保存并参与续租与心跳上报。
  3. 补齐连接失败根因日志
    • future.isSuccess() == false 时输出 future.cause(),避免以后再次误判为“纯 TCP 失败”。

10.6 回归测试(锁住 Netty 复用坑)#

新增测试覆盖两条关键约束:

  1. 复用同一个非 @Sharable handler 实例到两个 Channel 必然抛异常
  2. 每个 Channel 新建 handler(即便共享同一个 WorkerState)不会抛异常

这确保未来重构重连逻辑时,不会再次把 handler 复用引入回来。

10.7 修复后对业务逻辑的影响评估#

  1. Worker 状态能否保存
    • 能保存:任务状态从 handler 内移到 WorkerState,与连接生命周期解耦;重连换 Channel 不会丢状态。
  2. 任务执行是否受影响
    • 不受影响:任务接收/启动 Python/日志推送/状态上报的业务逻辑未改变,仍由 WorkerHandler 处理。
  3. 心跳与 Redis 续租是否受影响
    • 更稳定:心跳/续租读取的是 WorkerState,不再依赖某一个 handler 实例是否还在使用中。
  4. master 宕机恢复后 worker 是否能重连
    • 能:消除了 pipeline 初始化阶段的硬失败点,重连会真正进入“连接成功→稳定期→心跳恢复”的路径。
RL-scheduler Bug fixed——netty @sharable 导致同一个handler在新channel复用失败
https://sgjki547.top/posts/master-worker-resilience-fix/
Author
SGJki
Published at
2026-04-11
License
CC BY-NC-SA 4.0