1432 words
7 minutes
Akka框架思想在mini-agent项目中的实践

Akka框架思想在mini-agent项目中的实践#

问题:if/elif链在定时任务分发中的困境#

在重构前,agent.py 中的 _on_cron_tick 函数使用 if/elif 链来分发定时任务:

async def _on_cron_tick(event: Event) -> None:
task = event.data.get("task")
if task == "todo_reminder":
reminders = todo_manager.get_due_reminders(minutes_ahead=15)
for r in reminders:
await notifier.send(f"Reminder: #{r['id']} {r['content']} (due: {r['due_at']})", level="warning")
elif task == "scheduled_message":
await schedule_manager.check_and_fire(notifier, code_task_executor, telegram_bot, tg_chat_id)
elif task == "memory_consolidation":
await conversation.compress_all_active()

问题: 每次添加新的定时任务都需要修改这个函数,违反了开放封闭原则(OCP)。所有任务耦合在一个函数里,一个任务的 bug 可能影响其他任务的执行。

Akka Actor 模型核心思想#

从 Akka 框架借鉴的核心概念:

  • 每个 Actor 是一个独立的计算单元,拥有自己的状态和行为
  • Actor 之间通过消息(Message)进行异步通信
  • 每个 Actor 独立处理收到的消息,互不干扰
  • Actor 的失败被隔离在自身范围内,不会级联传播

TaskActor 协议设计#

在 mini-agent 项目中,我们将 Actor 模型应用到定时任务分发系统:

TaskActor 协议 — 类似 Akka 的 Actor trait:

tasks/cron_dispatcher.py
from __future__ import annotations
import logging
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class TaskActor(Protocol):
async def handle(self, data: dict[str, Any]) -> None: ...

CronDispatcher 调度器实现#

CronDispatcher — 类似 Akka 的 ActorSystem:

class CronDispatcher:
def __init__(self) -> None:
self._actors: dict[str, TaskActor] = {}
def register(self, name: str, actor: TaskActor) -> None:
self._actors[name] = actor
logger.info("Registered cron actor: %s", name)
async def dispatch(self, data: dict[str, Any]) -> None:
task_name = data.get("task", "")
actor = self._actors.get(task_name)
if actor is None:
logger.warning("No actor registered for cron task: %s", task_name)
return
try:
await actor.handle(data)
except Exception:
logger.exception("Actor %s failed", task_name)

三个包装 Actor 实现#

TodoReminderActor — 处理 todo 提醒任务:

class TodoReminderActor:
def __init__(self, todo_manager: Any, notifier: Any) -> None:
self._todo = todo_manager
self._notifier = notifier
async def handle(self, data: dict[str, Any]) -> None:
reminders = self._todo.get_due_reminders(minutes_ahead=15)
for r in reminders:
await self._notifier.send(
f"Reminder: #{r['id']} {r['content']} (due: {r['due_at']})",
level="warning",
)

ScheduledMessageActor — 处理定时消息任务:

class ScheduledMessageActor:
def __init__(self, manager: Any, notifier: Any, executor: Any, bot: Any, chat_id: str) -> None:
self._manager = manager
self._notifier = notifier
self._executor = executor
self._bot = bot
self._chat_id = chat_id
async def handle(self, data: dict[str, Any]) -> None:
await self._manager.check_and_fire(self._notifier, self._executor, self._bot, self._chat_id)

MemoryConsolidationActor — 处理记忆压缩任务:

class MemoryConsolidationActor:
def __init__(self, conversation: Any) -> None:
self._conversation = conversation
async def handle(self, data: dict[str, Any]) -> None:
await self._conversation.compress_all_active()

在 agent.py 中的注册与连接#

重构后,agent.py 中的连接变得简洁声明式:

dispatcher = CronDispatcher()
dispatcher.register("todo_reminder", TodoReminderActor(todo_manager, notifier))
dispatcher.register("scheduled_message", ScheduledMessageActor(schedule_manager, notifier, code_task_executor, telegram_bot, tg_chat_id))
dispatcher.register("memory_consolidation", MemoryConsolidationActor(conversation))
async def _on_cron_tick(event: Event) -> None:
await dispatcher.dispatch(event.data)

_on_cron_tick 从原来十多行的 if/elif 变成了一行 dispatch 调用。

错误隔离设计#

关键设计决策:错误隔离 — 一个 Actor 失败不影响其他 Actor 的执行。

这模仿了 Akka 的 Supervisor Strategy:每个 Actor 的异常被捕获并记录,不会传播到调度器层面。这意味着:

  • TodoReminderActor 抛异常 → ScheduledMessageActor 仍然正常执行
  • 同一个 Actor 连续失败两次,每次都被独立捕获

事件驱动的博客生成#

除了定时任务分发,还将 Actor 模式应用到事件驱动的博客生成ResultBlogActor 不是由 cron 触发,而是由查询/主题完成事件触发。

两种执行路径:

  • handle_single(question, output) — 使用 self._llm.chat() 直接对单个 /query 结果进行博客价值评估
  • handle_topic(topic_name, log_offset) — 委托给 CodeTaskExecutor.execute() 运行 Claude CLI 的 /blog-update skill 进行主题会话聚合

Fire-and-forget 模式:

# 在 MessageHandler 中,当单个查询成功时触发:
if tag == TAG_QUERY and self._blog_actor:
asyncio.ensure_future(
self._blog_actor.handle_single(text, output),
loop=self._loop,
)
# 在 MessageHandler._end_topic 中,当主题会话结束时触发:
if self._blog_actor and thread_id != "default":
log_offset = self._topic_log_offsets.pop(chat_id, 0)
asyncio.ensure_future(
self._blog_actor.handle_topic(thread_id, log_offset),
loop=self._loop,
)

结果日志标签系统#

博客生成依赖于 data/logs/result.log 中的结构化日志标签系统。

tasks/code_task.py 中定义了五个标签常量:

标签触发时机示例
[task]/task 完成[task] Fix the login bug
[query]单个 /query 成功[query] What is FastAPI?
[query_start]/query_start <topic> 发出[query_start] FastAPI
[query_topic]主题模式下每次查询成功[query_topic] How does DI work?
[query_end]/query_end 或结束主题关键词[query_end] FastAPI

主题的标签生命周期:[query_start] <topic> → 多个 [query_topic] <question>[query_end] <topic>

log_result() 是模块级函数:

TAG_TASK = "[task]"
TAG_QUERY = "[query]"
TAG_QUERY_TOPIC = "[query_topic]"
TAG_QUERY_START = "[query_start]"
TAG_QUERY_END = "[query_end]"
def log_result(label: str, content: str) -> None:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
with RESULT_LOG.open("a") as f:
f.write(f"\n{'='*60}\n[{ts}] {label}\n{'='*60}\n{content}\n")

基于偏移量的部分日志读取#

主题博客生成使用基于偏移量的部分日志读取。MessageHandler._topic_log_offsets 跟踪 [query_start] 之后的文件位置:

# 当主题开始时:
log_result(f"{TAG_QUERY_START} {topic}", "")
try:
self._topic_log_offsets[chat_id] = RESULT_LOG.stat().st_size
except OSError:
self._topic_log_offsets[chat_id] = 0

当主题结束时,偏移量被传递给 handle_topic 以仅读取日志文件的相关部分,避免读取整个文件。

Akka 与 mini-agent 实现对比#

概念Akka (Scala/Java)mini-agent (Python)
Actor 定义继承 AbstractActor实现 TaskActor Protocol
消息传递tell/ask 模式dispatch(data) 字典传递
错误处理Supervisor Strategytry/except + logger
Actor 生命周期preStart/postStop无(轻量级)
邮箱(Mailbox)FIFO/优先级无(直接 await)
路由(Routing)RoundRobin/ScatterGather字典查表
远程通信Akka Remote/Cluster不需要(单进程)

我们刻意保持实现的简洁性 — 个人守护进程不需要 actor 生命周期管理、邮箱或集群。基于 Protocol 的方法为我们提供了类型检查,而没有额外的开销。

重构后添加新的定时任务#

在重构后添加新的定时任务:

  1. 编写一个具有 async handle(data) 的 actor 类
  2. agent.py 中使用 dispatcher.register() 注册它

就这么简单。无需修改 _on_cron_tickCronDispatcher

总结#

通过借鉴 Akka Actor 模型的核心思想,mini-agent 项目成功解决了定时任务分发中的 if/elif 链问题。关键收获:

  • 开放封闭原则:添加新任务无需修改调度器代码
  • 错误隔离:单个 Actor 失败不影响其他任务执行
  • 简洁性优先:仅采用必要的 Actor 概念,避免过度工程
  • 事件驱动扩展:不仅用于定时任务,还扩展到博客生成等场景

这种设计使得系统易于维护和扩展,同时保持了个人守护进程所需的轻量级特性。

Akka框架思想在mini-agent项目中的实践
https://sgjki547.top/posts/akka-actor-pattern-in-mini-agent/
Author
SGJki
Published at
2026-05-27
License
CC BY-NC-SA 4.0