个人 AI 助手架构:从 Telegram Bot 到事件驱动 Daemon
mini-agent 是一个 7×24 运行的个人 AI 助手,以 systemd 用户服务的形式跑在 Arch Linux 上。它能通过 Telegram 和 Web API 交互,支持多轮对话、代码任务执行、定时提醒、博客自动生成等功能。
这篇文章记录它的架构设计——如何从一个简单的 Telegram 消息转发器,一步步演进成现在的样子。
全局架构
单进程事件驱动,agent.py 是唯一的入口文件,把所有组件串在一起,跑一个 asyncio 事件循环直到收到 SIGTERM/SIGINT。
┌─────────────────────────────┐ │ EventBus │ │ (core/event_bus.py) │ └───┬──────┬──────┬───────────┘ │ │ │ message_received task_command cron_tick ... │ │ │ ▼ ▼ ▼ MessageHandler CodeTask... CronDispatcherEventBus 是中枢神经系统。所有组件间通信通过 Event(type, data) 对象流动。Handler 是同步函数,异步工作通过 asyncio.ensure_future 派发。Handler 错误隔离——一个挂了不影响其他。
组件清单
agent.py — 入口,接线,信号处理├── core/│ ├── event_bus.py — 发布/订阅事件系统│ ├── llm.py — LLM 客户端 (LangChain init_chat_model)│ ├── conversation.py — 对话管理 (LangGraph StateGraph + checkpoint + 记忆压缩)│ ├── chat_log.py — LLM 请求/响应日志 (按天轮转)│ ├── trace.py — 请求级 TraceID (contextvars)│ ├── notifier.py — 终端 + Telegram 双通道通知│ ├── rate_limiter.py — 滑动窗口令牌桶限流│ ├── scheduler.py — APScheduler 封装 (cron/interval)│ └── health.py — HTTP 健康检查├── channels/│ ├── telegram.py — Telegram Bot 通道 (长轮询)│ └── web.py — Web API 通道 (aiohttp)├── handlers/│ ├── intent_router.py — 意图分类 (规则 + LLM 两阶段)│ └── message_handler.py — 消息路由分发├── prompts/│ └── intent_classification.md — 意图分类 LLM 系统提示词└── tasks/ ├── blog_sync.py — 文件监控 + git 自动推送 ├── todo.py — SQLite 待办管理 ├── scheduled_message.py — 定时消息 (一次性 + cron) ├── code_task.py — Claude CLI 任务执行器 ├── cron_dispatcher.py — Actor 风格定时任务分发 └── result_blog.py — 查询结果 → 博客文章Telegram 通道
使用 python-telegram-bot v22+ 的异步 API,长轮询模式(不是 Webhook)。选长轮询是因为不需要公网暴露端口,更适合单机部署。
启动重试
系统从挂起恢复后网络可能还没就绪。agent.py 包了一层重试循环——默认 10 次尝试,每次间隔 30 秒。systemd service 也配了 After=network-online.target 确保网络就绪后再启动。
Polling Watchdog
长轮询会静默死亡(无错误、无异常)。内置看门狗每 120 秒检查一次,发现死亡后自动重启 polling 并通过 Telegram 通知用户。
命令注册
/start, /help → 使用说明/todo → 待办管理 (list/add/done)/task → 代码任务执行/query → 单次查询/query_start → 进入话题模式/query_end → 退出话题模式/schedule → 定时消息管理所有 handler 都有 chat_id 验证——只响应配置的白名单用户。
Web API 通道
嵌入式 aiohttp 服务器,跑在同一个 asyncio 循环里。通过 Cloudflare Tunnel 暴露到 api.sgjki547.top。
认证机制
密码 → Bearer Token(24 小时过期,内存 dict)。每个 token 绑定独立的会话线程,不同登录用户之间完全隔离——不会串话。
16 个端点
| 端点 | 方法 | 认证 | 说明 |
|---|---|---|---|
/auth | POST | 无 | 密码换 Token |
/auth/logout | POST | 需要 | 销毁 Token |
/auth/status | GET | 需要 | 查看会话状态 |
/chat | POST | 需要 | LLM 多轮对话 |
/query | POST | 需要 | Claude CLI 单次查询 |
/task | POST | 需要 | Claude CLI 任务执行 |
/topic/start | POST | 需要 | 进入话题模式 |
/topic/end | POST | 需要 | 退出话题模式 |
/history | GET | 需要 | 获取历史消息 |
/todo | GET/POST | 需要 | 待办管理 |
/schedule | GET/POST | 需要 | 定时消息管理 |
/health | GET | 无 | 健康检查 |
消息路由:一条 Telegram 文本消息的旅程
这是系统最核心的决策树。一条普通文本消息进来,经过三层判断决定走哪条路:
用户发文本到 Telegram Bot │ ├─ [限流检查] 20条/分钟,超了直接拒绝 │ └─ publish Event("message_received") │ ▼ MessageHandler._handle_chat() │ ├─ 话题模式?(thread_id != "default") │ │ │ ├─ 说了"不聊了"?→ 退出话题,回默认模式 │ │ │ └─ 其他 → 直接走 query (带上下文,绕过意图分类) │ └─ 默认模式 │ ├─ IntentRouter 两阶段分类 │ │ │ ├─ Stage 1: 正则规则 (零延迟) │ │ 提醒/定时 → schedule │ │ 待办列表 → todo list │ │ 帮我记/记住 → todo add │ │ 做完了/搞定了 → todo done │ │ 执行/创建/写一个/帮我写 → task │ │ 查询/什么是/解释一下 → query │ │ 聊聊/开始聊 → start_topic │ │ 不聊了/结束话题 → end_topic │ │ │ ├─ 命中了但需要提取时间 → 只调 LLM 提时间 │ ├─ 命中了其他意图 → 直接返回,不调 LLM │ └─ 没命中 → LLM 全分类 (confidence < 0.7 降级为 chat) │ ├─ intent != "chat" → 分发执行 │ └─ intent == "chat" → ConversationManager.chat()话题模式是特殊的:进入后每条消息都自动走 query 路径,带完整上下文发给 Claude CLI,结果写回同一个 thread。相当于一个专注的深度对话模式。
上下文管理
ConversationManager
基于 LangGraph StateGraph + AsyncSqliteSaver,每个对话线程有独立的 checkpoint 持久化。
核心问题是:什么时候读上下文,什么时候写?
| 路径 | 读上下文 | 写回上下文 |
|---|---|---|
| chat(默认模式) | 自动(LangGraph checkpoint) | 自动 |
| query(默认模式) | 不读 | 不写,纯无状态 |
| query(话题模式) | 读 get_context_string() | 写 record_turn() |
| task(任何模式) | 读 | 不写 |
| todo/schedule | 无 | 无 |
| 话题模式中所有消息 | 读 | 写 |
记忆压缩
对话长了之后,旧的 70% token 通过 LLM 压缩成摘要,存到 data/memories.db。最新的 30% 保持原样。压缩前先算 MD5 内容哈希——内容没变就跳过,避免浪费 LLM 调用。
每 4 小时定时压缩一次,也可以在对话中自动触发(达到 80% token 阈值时)。
任务执行
CodeTaskExecutor 通过子进程调用 claude CLI,注入对话上下文作为 prompt 的一部分。
Task vs Query 的区别:
- Task(
/task):执行编码任务,结果通过 Telegram 直接发送。只读上下文,不写回。 - Query(
/query):单次独立提问。默认模式无状态,话题模式下有上下文读写。
两者都有 20 分钟超时保护——子进程 20 分钟没有 stdout 输出就 kill 掉。
结果日志
每次执行的结果追加到按天轮转的日志文件 data/logs/result-YYYY-MM-DD.log,用五种标签分类:
[task] Fix the login bug — 任务结果[query] What is FastAPI? — 单次查询[query_start] FastAPI — 话题开始[query_topic] How does DI work? — 话题中的查询[query_end] FastAPI — 话题结束博客生成
查询和话题结束后,ResultBlogActor 用 LLM 判断内容是否值得写成博客。值得的话,生成 fuwari 格式的 Markdown 文件到博客目录。单次查询用 LLM 直接评估,话题模式用 Claude CLI 调 /blog-update skill 生成。后台异步执行,不阻塞主流程。
可观测性
TraceID
每个请求入口(Web CORS 中间件、Telegram handler、cron 定时任务)注入一个 12 位十六进制的 TraceID,通过 contextvars.ContextVar 传播到所有日志行。
2026-06-09 14:30:00 [INFO] channels.telegram [trace:a1b2c3d4e5f6]: Received message2026-06-09 14:30:01 [INFO] handlers.message_handler [trace:a1b2c3d4e5f6]: Routed to query同一请求的所有日志共享同一个 trace ID,方便在 journalctl 中筛选关联条目。
LLM 调用日志
每次 LLM 调用记录到 data/logs/chat-YYYY-MM-DD.log,包含模型名、消息内容、响应、延迟毫秒数和 token 用量。
健康检查
HTTP 端点 127.0.0.1:9876/health 返回各组件状态(Telegram polling、scheduler、web、db),供外部监控使用。
基础设施
密钥管理
API Key 和 Bot Token 不存在 .env 里。用 systemd-creds 加密存储在 ~/.config/mini-agent/creds/,服务启动时 systemd 解密到 $CREDENTIALS_DIRECTORY/,agent.py 加载为环境变量。
~/.config/mini-agent/creds/├── deepseek-api-key.cred├── claude-api-key.cred├── telegram-bot-token.cred└── web-api-password.cred定时任务
通过 CronDispatcher 以 Actor 模式分发,每个定时任务是一个独立的 TaskActor,互不影响:
| Actor | 周期 | 功能 |
|---|---|---|
todo_reminder | 每 2 小时 | 到期待办提醒 |
scheduled_message | 每分钟 | 定时消息触发 |
memory_consolidation | 每 4 小时 | 对话记忆压缩 |
daily_digest | 每天 8:00 | AI 日报推送 |
日志轮转
三类日志都是按天轮转:
| 日志 | 文件 | 内容 |
|---|---|---|
| 运行日志 | agent.log | 标准输出,journald 也能看 |
| LLM 调用 | chat-YYYY-MM-DD.log | 每次调用的请求/响应/延迟/token |
| 任务结果 | result-YYYY-MM-DD.log | task/query 的输出和话题标记 |
设计原则
这个项目遵循几个核心原则:
事件驱动,不用线程。 所有 I/O 都是 asyncio。EventBus handler 必须是同步的,异步工作通过 ensure_future 派发。错误隔离在 publish 层——一个 handler 崩溃不影响其他。
这是个人工具,不是框架。 不加抽象层,不搞插件系统,只有一处调用的代码不搞 class。没人要求的错误处理不加,不为未来需求预先设计。
Surgical Changes。 只改和任务直接相关的代码。改 EventBus handler 时确保不影响订阅同一事件的其他 handler。发现死代码提一句就好,不删。
最小权限。 Telegram handler 只响应白名单 chat_id。Web API 密码认证 + Bearer Token + 24h 过期。API Key 用 systemd 加密存储,不在代码或 .env 明文出现。
技术栈
| 依赖 | 用途 |
|---|---|
python-telegram-bot v22+ | Telegram Bot 异步 API |
langchain / langchain-openai | 统一 LLM 访问(DeepSeek, ZhiPu AI) |
langgraph + langgraph-checkpoint-sqlite | 对话状态管理和持久化 |
aiosqlite | 异步 SQLite |
aiohttp | Web API 服务器 |
apscheduler + croniter | 定时任务和 cron 表达式 |
watchdog | 文件系统监控(inotify) |
httpx | 异步 HTTP 客户端 |
uv | 包管理器 |
LLM 后端通过 config.yaml 的 provider: openai 字段统一配置,当前使用 DeepSeek(对话)和 ZhiPu AI(GLM,任务执行用 claude CLI 直接调用)。
回顾
从最初一个简单的 webhook 消息转发器,到现在拥有双通道、意图路由、上下文管理、任务执行、博客生成、日志追踪的完整系统。每一步都是因为实际需求驱动——不是预先设计,而是遇到问题再解决。
这个系统的核心价值是作为一个始终在线的个人 AI 助手——Telegram 随时能对话,Web API 给博客页面提供了交互入口,定时任务让它能主动推送信息。所有功能都在一个进程里,零外部依赖(除了 Cloudflare Tunnel)。