Akka框架思想在mini-agent项目中的实践
问题:if/elif链在定时任务分发中的困境
在重构前,agent.py 中的 _on_cron_tick 函数使用 if/elif 链来分发定时任务:
async def _on_cron_tick(event: Event) -> None: task = event.data.get("task") if task == "todo_reminder": reminders = todo_manager.get_due_reminders(minutes_ahead=15) for r in reminders: await notifier.send(f"Reminder: #{r['id']} {r['content']} (due: {r['due_at']})", level="warning") elif task == "scheduled_message": await schedule_manager.check_and_fire(notifier, code_task_executor, telegram_bot, tg_chat_id) elif task == "memory_consolidation": await conversation.compress_all_active()问题: 每次添加新的定时任务都需要修改这个函数,违反了开放封闭原则(OCP)。所有任务耦合在一个函数里,一个任务的 bug 可能影响其他任务的执行。
Akka Actor 模型核心思想
从 Akka 框架借鉴的核心概念:
- 每个 Actor 是一个独立的计算单元,拥有自己的状态和行为
- Actor 之间通过消息(Message)进行异步通信
- 每个 Actor 独立处理收到的消息,互不干扰
- Actor 的失败被隔离在自身范围内,不会级联传播
TaskActor 协议设计
在 mini-agent 项目中,我们将 Actor 模型应用到定时任务分发系统:
TaskActor 协议 — 类似 Akka 的 Actor trait:
from __future__ import annotationsimport loggingfrom typing import Any, Protocol, runtime_checkable
@runtime_checkableclass TaskActor(Protocol): async def handle(self, data: dict[str, Any]) -> None: ...CronDispatcher 调度器实现
CronDispatcher — 类似 Akka 的 ActorSystem:
class CronDispatcher: def __init__(self) -> None: self._actors: dict[str, TaskActor] = {}
def register(self, name: str, actor: TaskActor) -> None: self._actors[name] = actor logger.info("Registered cron actor: %s", name)
async def dispatch(self, data: dict[str, Any]) -> None: task_name = data.get("task", "") actor = self._actors.get(task_name) if actor is None: logger.warning("No actor registered for cron task: %s", task_name) return try: await actor.handle(data) except Exception: logger.exception("Actor %s failed", task_name)三个包装 Actor 实现
TodoReminderActor — 处理 todo 提醒任务:
class TodoReminderActor: def __init__(self, todo_manager: Any, notifier: Any) -> None: self._todo = todo_manager self._notifier = notifier
async def handle(self, data: dict[str, Any]) -> None: reminders = self._todo.get_due_reminders(minutes_ahead=15) for r in reminders: await self._notifier.send( f"Reminder: #{r['id']} {r['content']} (due: {r['due_at']})", level="warning", )ScheduledMessageActor — 处理定时消息任务:
class ScheduledMessageActor: def __init__(self, manager: Any, notifier: Any, executor: Any, bot: Any, chat_id: str) -> None: self._manager = manager self._notifier = notifier self._executor = executor self._bot = bot self._chat_id = chat_id
async def handle(self, data: dict[str, Any]) -> None: await self._manager.check_and_fire(self._notifier, self._executor, self._bot, self._chat_id)MemoryConsolidationActor — 处理记忆压缩任务:
class MemoryConsolidationActor: def __init__(self, conversation: Any) -> None: self._conversation = conversation
async def handle(self, data: dict[str, Any]) -> None: await self._conversation.compress_all_active()在 agent.py 中的注册与连接
重构后,agent.py 中的连接变得简洁声明式:
dispatcher = CronDispatcher()dispatcher.register("todo_reminder", TodoReminderActor(todo_manager, notifier))dispatcher.register("scheduled_message", ScheduledMessageActor(schedule_manager, notifier, code_task_executor, telegram_bot, tg_chat_id))dispatcher.register("memory_consolidation", MemoryConsolidationActor(conversation))
async def _on_cron_tick(event: Event) -> None: await dispatcher.dispatch(event.data)_on_cron_tick 从原来十多行的 if/elif 变成了一行 dispatch 调用。
错误隔离设计
关键设计决策:错误隔离 — 一个 Actor 失败不影响其他 Actor 的执行。
这模仿了 Akka 的 Supervisor Strategy:每个 Actor 的异常被捕获并记录,不会传播到调度器层面。这意味着:
TodoReminderActor抛异常 →ScheduledMessageActor仍然正常执行- 同一个 Actor 连续失败两次,每次都被独立捕获
事件驱动的博客生成
除了定时任务分发,还将 Actor 模式应用到事件驱动的博客生成。ResultBlogActor 不是由 cron 触发,而是由查询/主题完成事件触发。
两种执行路径:
handle_single(question, output)— 使用self._llm.chat()直接对单个/query结果进行博客价值评估handle_topic(topic_name, log_offset)— 委托给CodeTaskExecutor.execute()运行 Claude CLI 的/blog-updateskill 进行主题会话聚合
Fire-and-forget 模式:
# 在 MessageHandler 中,当单个查询成功时触发:if tag == TAG_QUERY and self._blog_actor: asyncio.ensure_future( self._blog_actor.handle_single(text, output), loop=self._loop, )
# 在 MessageHandler._end_topic 中,当主题会话结束时触发:if self._blog_actor and thread_id != "default": log_offset = self._topic_log_offsets.pop(chat_id, 0) asyncio.ensure_future( self._blog_actor.handle_topic(thread_id, log_offset), loop=self._loop, )结果日志标签系统
博客生成依赖于 data/logs/result.log 中的结构化日志标签系统。
在 tasks/code_task.py 中定义了五个标签常量:
| 标签 | 触发时机 | 示例 |
|---|---|---|
[task] | /task 完成 | [task] Fix the login bug |
[query] | 单个 /query 成功 | [query] What is FastAPI? |
[query_start] | /query_start <topic> 发出 | [query_start] FastAPI |
[query_topic] | 主题模式下每次查询成功 | [query_topic] How does DI work? |
[query_end] | /query_end 或结束主题关键词 | [query_end] FastAPI |
主题的标签生命周期:[query_start] <topic> → 多个 [query_topic] <question> → [query_end] <topic>
log_result() 是模块级函数:
TAG_TASK = "[task]"TAG_QUERY = "[query]"TAG_QUERY_TOPIC = "[query_topic]"TAG_QUERY_START = "[query_start]"TAG_QUERY_END = "[query_end]"
def log_result(label: str, content: str) -> None: ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") with RESULT_LOG.open("a") as f: f.write(f"\n{'='*60}\n[{ts}] {label}\n{'='*60}\n{content}\n")基于偏移量的部分日志读取
主题博客生成使用基于偏移量的部分日志读取。MessageHandler._topic_log_offsets 跟踪 [query_start] 之后的文件位置:
# 当主题开始时:log_result(f"{TAG_QUERY_START} {topic}", "")try: self._topic_log_offsets[chat_id] = RESULT_LOG.stat().st_sizeexcept OSError: self._topic_log_offsets[chat_id] = 0当主题结束时,偏移量被传递给 handle_topic 以仅读取日志文件的相关部分,避免读取整个文件。
Akka 与 mini-agent 实现对比
| 概念 | Akka (Scala/Java) | mini-agent (Python) |
|---|---|---|
| Actor 定义 | 继承 AbstractActor | 实现 TaskActor Protocol |
| 消息传递 | tell/ask 模式 | dispatch(data) 字典传递 |
| 错误处理 | Supervisor Strategy | try/except + logger |
| Actor 生命周期 | preStart/postStop | 无(轻量级) |
| 邮箱(Mailbox) | FIFO/优先级 | 无(直接 await) |
| 路由(Routing) | RoundRobin/ScatterGather | 字典查表 |
| 远程通信 | Akka Remote/Cluster | 不需要(单进程) |
我们刻意保持实现的简洁性 — 个人守护进程不需要 actor 生命周期管理、邮箱或集群。基于 Protocol 的方法为我们提供了类型检查,而没有额外的开销。
重构后添加新的定时任务
在重构后添加新的定时任务:
- 编写一个具有
async handle(data)的 actor 类 - 在
agent.py中使用dispatcher.register()注册它
就这么简单。无需修改 _on_cron_tick 或 CronDispatcher。
总结
通过借鉴 Akka Actor 模型的核心思想,mini-agent 项目成功解决了定时任务分发中的 if/elif 链问题。关键收获:
- 开放封闭原则:添加新任务无需修改调度器代码
- 错误隔离:单个 Actor 失败不影响其他任务执行
- 简洁性优先:仅采用必要的 Actor 概念,避免过度工程
- 事件驱动扩展:不仅用于定时任务,还扩展到博客生成等场景
这种设计使得系统易于维护和扩展,同时保持了个人守护进程所需的轻量级特性。