2517 words
13 minutes
个人 AI 助手架构:从 Telegram Bot 到事件驱动 Daemon

个人 AI 助手架构:从 Telegram Bot 到事件驱动 Daemon#

mini-agent 是一个 7×24 运行的个人 AI 助手,以 systemd 用户服务的形式跑在 Arch Linux 上。它能通过 Telegram 和 Web API 交互,支持多轮对话、代码任务执行、定时提醒、博客自动生成等功能。

这篇文章记录它的架构设计——如何从一个简单的 Telegram 消息转发器,一步步演进成现在的样子。

全局架构#

单进程事件驱动,agent.py 是唯一的入口文件,把所有组件串在一起,跑一个 asyncio 事件循环直到收到 SIGTERM/SIGINT。

┌─────────────────────────────┐
│ EventBus │
│ (core/event_bus.py) │
└───┬──────┬──────┬───────────┘
│ │ │
message_received task_command cron_tick ...
│ │ │
▼ ▼ ▼
MessageHandler CodeTask... CronDispatcher

EventBus 是中枢神经系统。所有组件间通信通过 Event(type, data) 对象流动。Handler 是同步函数,异步工作通过 asyncio.ensure_future 派发。Handler 错误隔离——一个挂了不影响其他。

组件清单#

agent.py — 入口,接线,信号处理
├── core/
│ ├── event_bus.py — 发布/订阅事件系统
│ ├── llm.py — LLM 客户端 (LangChain init_chat_model)
│ ├── conversation.py — 对话管理 (LangGraph StateGraph + checkpoint + 记忆压缩)
│ ├── chat_log.py — LLM 请求/响应日志 (按天轮转)
│ ├── trace.py — 请求级 TraceID (contextvars)
│ ├── notifier.py — 终端 + Telegram 双通道通知
│ ├── rate_limiter.py — 滑动窗口令牌桶限流
│ ├── scheduler.py — APScheduler 封装 (cron/interval)
│ └── health.py — HTTP 健康检查
├── channels/
│ ├── telegram.py — Telegram Bot 通道 (长轮询)
│ └── web.py — Web API 通道 (aiohttp)
├── handlers/
│ ├── intent_router.py — 意图分类 (规则 + LLM 两阶段)
│ └── message_handler.py — 消息路由分发
├── prompts/
│ └── intent_classification.md — 意图分类 LLM 系统提示词
└── tasks/
├── blog_sync.py — 文件监控 + git 自动推送
├── todo.py — SQLite 待办管理
├── scheduled_message.py — 定时消息 (一次性 + cron)
├── code_task.py — Claude CLI 任务执行器
├── cron_dispatcher.py — Actor 风格定时任务分发
└── result_blog.py — 查询结果 → 博客文章

Telegram 通道#

使用 python-telegram-bot v22+ 的异步 API,长轮询模式(不是 Webhook)。选长轮询是因为不需要公网暴露端口,更适合单机部署。

启动重试#

系统从挂起恢复后网络可能还没就绪。agent.py 包了一层重试循环——默认 10 次尝试,每次间隔 30 秒。systemd service 也配了 After=network-online.target 确保网络就绪后再启动。

Polling Watchdog#

长轮询会静默死亡(无错误、无异常)。内置看门狗每 120 秒检查一次,发现死亡后自动重启 polling 并通过 Telegram 通知用户。

命令注册#

/start, /help → 使用说明
/todo → 待办管理 (list/add/done)
/task → 代码任务执行
/query → 单次查询
/query_start → 进入话题模式
/query_end → 退出话题模式
/schedule → 定时消息管理

所有 handler 都有 chat_id 验证——只响应配置的白名单用户。

Web API 通道#

嵌入式 aiohttp 服务器,跑在同一个 asyncio 循环里。通过 Cloudflare Tunnel 暴露到 api.sgjki547.top

认证机制#

密码 → Bearer Token(24 小时过期,内存 dict)。每个 token 绑定独立的会话线程,不同登录用户之间完全隔离——不会串话。

16 个端点#

端点方法认证说明
/authPOST密码换 Token
/auth/logoutPOST需要销毁 Token
/auth/statusGET需要查看会话状态
/chatPOST需要LLM 多轮对话
/queryPOST需要Claude CLI 单次查询
/taskPOST需要Claude CLI 任务执行
/topic/startPOST需要进入话题模式
/topic/endPOST需要退出话题模式
/historyGET需要获取历史消息
/todoGET/POST需要待办管理
/scheduleGET/POST需要定时消息管理
/healthGET健康检查

消息路由:一条 Telegram 文本消息的旅程#

这是系统最核心的决策树。一条普通文本消息进来,经过三层判断决定走哪条路:

用户发文本到 Telegram Bot
├─ [限流检查] 20条/分钟,超了直接拒绝
└─ publish Event("message_received")
MessageHandler._handle_chat()
├─ 话题模式?(thread_id != "default")
│ │
│ ├─ 说了"不聊了"?→ 退出话题,回默认模式
│ │
│ └─ 其他 → 直接走 query (带上下文,绕过意图分类)
└─ 默认模式
├─ IntentRouter 两阶段分类
│ │
│ ├─ Stage 1: 正则规则 (零延迟)
│ │ 提醒/定时 → schedule
│ │ 待办列表 → todo list
│ │ 帮我记/记住 → todo add
│ │ 做完了/搞定了 → todo done
│ │ 执行/创建/写一个/帮我写 → task
│ │ 查询/什么是/解释一下 → query
│ │ 聊聊/开始聊 → start_topic
│ │ 不聊了/结束话题 → end_topic
│ │
│ ├─ 命中了但需要提取时间 → 只调 LLM 提时间
│ ├─ 命中了其他意图 → 直接返回,不调 LLM
│ └─ 没命中 → LLM 全分类 (confidence < 0.7 降级为 chat)
├─ intent != "chat" → 分发执行
└─ intent == "chat" → ConversationManager.chat()

话题模式是特殊的:进入后每条消息都自动走 query 路径,带完整上下文发给 Claude CLI,结果写回同一个 thread。相当于一个专注的深度对话模式。

上下文管理#

ConversationManager#

基于 LangGraph StateGraph + AsyncSqliteSaver,每个对话线程有独立的 checkpoint 持久化。

核心问题是:什么时候读上下文,什么时候写?

路径读上下文写回上下文
chat(默认模式)自动(LangGraph checkpoint)自动
query(默认模式)不读不写,纯无状态
query(话题模式)get_context_string()record_turn()
task(任何模式)不写
todo/schedule
话题模式中所有消息

记忆压缩#

对话长了之后,旧的 70% token 通过 LLM 压缩成摘要,存到 data/memories.db。最新的 30% 保持原样。压缩前先算 MD5 内容哈希——内容没变就跳过,避免浪费 LLM 调用。

每 4 小时定时压缩一次,也可以在对话中自动触发(达到 80% token 阈值时)。

任务执行#

CodeTaskExecutor 通过子进程调用 claude CLI,注入对话上下文作为 prompt 的一部分。

Task vs Query 的区别:

  • Task/task):执行编码任务,结果通过 Telegram 直接发送。只读上下文,不写回。
  • Query/query):单次独立提问。默认模式无状态,话题模式下有上下文读写。

两者都有 20 分钟超时保护——子进程 20 分钟没有 stdout 输出就 kill 掉。

结果日志#

每次执行的结果追加到按天轮转的日志文件 data/logs/result-YYYY-MM-DD.log,用五种标签分类:

[task] Fix the login bug — 任务结果
[query] What is FastAPI? — 单次查询
[query_start] FastAPI — 话题开始
[query_topic] How does DI work? — 话题中的查询
[query_end] FastAPI — 话题结束

博客生成#

查询和话题结束后,ResultBlogActor 用 LLM 判断内容是否值得写成博客。值得的话,生成 fuwari 格式的 Markdown 文件到博客目录。单次查询用 LLM 直接评估,话题模式用 Claude CLI 调 /blog-update skill 生成。后台异步执行,不阻塞主流程。

可观测性#

TraceID#

每个请求入口(Web CORS 中间件、Telegram handler、cron 定时任务)注入一个 12 位十六进制的 TraceID,通过 contextvars.ContextVar 传播到所有日志行。

2026-06-09 14:30:00 [INFO] channels.telegram [trace:a1b2c3d4e5f6]: Received message
2026-06-09 14:30:01 [INFO] handlers.message_handler [trace:a1b2c3d4e5f6]: Routed to query

同一请求的所有日志共享同一个 trace ID,方便在 journalctl 中筛选关联条目。

LLM 调用日志#

每次 LLM 调用记录到 data/logs/chat-YYYY-MM-DD.log,包含模型名、消息内容、响应、延迟毫秒数和 token 用量。

健康检查#

HTTP 端点 127.0.0.1:9876/health 返回各组件状态(Telegram polling、scheduler、web、db),供外部监控使用。

基础设施#

密钥管理#

API Key 和 Bot Token 不存在 .env 里。用 systemd-creds 加密存储在 ~/.config/mini-agent/creds/,服务启动时 systemd 解密到 $CREDENTIALS_DIRECTORY/agent.py 加载为环境变量。

~/.config/mini-agent/creds/
├── deepseek-api-key.cred
├── claude-api-key.cred
├── telegram-bot-token.cred
└── web-api-password.cred

定时任务#

通过 CronDispatcher 以 Actor 模式分发,每个定时任务是一个独立的 TaskActor,互不影响:

Actor周期功能
todo_reminder每 2 小时到期待办提醒
scheduled_message每分钟定时消息触发
memory_consolidation每 4 小时对话记忆压缩
daily_digest每天 8:00AI 日报推送

日志轮转#

三类日志都是按天轮转:

日志文件内容
运行日志agent.log标准输出,journald 也能看
LLM 调用chat-YYYY-MM-DD.log每次调用的请求/响应/延迟/token
任务结果result-YYYY-MM-DD.logtask/query 的输出和话题标记

设计原则#

这个项目遵循几个核心原则:

事件驱动,不用线程。 所有 I/O 都是 asyncio。EventBus handler 必须是同步的,异步工作通过 ensure_future 派发。错误隔离在 publish 层——一个 handler 崩溃不影响其他。

这是个人工具,不是框架。 不加抽象层,不搞插件系统,只有一处调用的代码不搞 class。没人要求的错误处理不加,不为未来需求预先设计。

Surgical Changes。 只改和任务直接相关的代码。改 EventBus handler 时确保不影响订阅同一事件的其他 handler。发现死代码提一句就好,不删。

最小权限。 Telegram handler 只响应白名单 chat_id。Web API 密码认证 + Bearer Token + 24h 过期。API Key 用 systemd 加密存储,不在代码或 .env 明文出现。

技术栈#

依赖用途
python-telegram-bot v22+Telegram Bot 异步 API
langchain / langchain-openai统一 LLM 访问(DeepSeek, ZhiPu AI)
langgraph + langgraph-checkpoint-sqlite对话状态管理和持久化
aiosqlite异步 SQLite
aiohttpWeb API 服务器
apscheduler + croniter定时任务和 cron 表达式
watchdog文件系统监控(inotify)
httpx异步 HTTP 客户端
uv包管理器

LLM 后端通过 config.yamlprovider: openai 字段统一配置,当前使用 DeepSeek(对话)和 ZhiPu AI(GLM,任务执行用 claude CLI 直接调用)。

回顾#

从最初一个简单的 webhook 消息转发器,到现在拥有双通道、意图路由、上下文管理、任务执行、博客生成、日志追踪的完整系统。每一步都是因为实际需求驱动——不是预先设计,而是遇到问题再解决。

这个系统的核心价值是作为一个始终在线的个人 AI 助手——Telegram 随时能对话,Web API 给博客页面提供了交互入口,定时任务让它能主动推送信息。所有功能都在一个进程里,零外部依赖(除了 Cloudflare Tunnel)。

个人 AI 助手架构:从 Telegram Bot 到事件驱动 Daemon
https://sgjki547.top/posts/telegram-bot-design/
Author
SGJki
Published at
2026-05-16
License
CC BY-NC-SA 4.0