AgentForge Cog 引擎详解
Cog 是什么
一句话定位:Cog 是一个 YAML 驱动的状态机执行器。 它读取一份 YAML(定义节点 + 转移表),然后在一个 while 循环里反复”取当前节点 → 跑 → 查表找下一节点 → 直到撞到 end”。引擎本身没有任何智能,智能全在被调用的 Agent(LLM)里,Cog 只负责”按图走”。
之所以强调这点,是因为 AgentForge 官方把自己宣传为 “AGI framework”,听起来很神秘。但读完真实源码后,真相朴素得多——而正因为朴素,它才是一个你能看懂、能借鉴、甚至能用几十行代码复刻的东西。本文基于克隆的真实仓库 DataBassGit/AgentForge 的源码(cog.py / transition_resolver.py / agent_runner.py / memory_manager.py / cog_config_structs.py + 示例 YAML),把 Cog 引擎从原理讲到执行细节。
真实的 AgentForge:四个声明式概念
先厘清 AgentForge 的真实面目,避免被名字误导:
| 维度 | 真实情况 |
|---|---|
| 仓库 | DataBassGit/AgentForge(PyPI 包名 agentforge) |
| 核心机制 | Cog(声明式 YAML 工作流引擎),不是 persona 切换 |
| Tools/Actions | 已废弃,README 明确说会被 MCP 标准取代 |
| 硬依赖 | chromadb + sentence-transformers + torch(包很重) |
| Python 版本 | 要求 3.12 |
它把”agent 系统”拆成 4 个声明式 YAML 概念,全部住在 .agentforge/ 目录下:
.agentforge/├── personas/ # 身份(Persona)├── prompts/ # 提示词模板(Agent)├── cogs/ # 工作流编排(Cog) ← 真正的"大脑"└── settings/ # 模型/存储配置- Agent:不是”会思考的实体”,而是一个 YAML 提示词模板 + 一次模型调用。它绑定配置、渲染 prompt、调 LLM、解析输出。可单独跑,也可被 Cog 编排。
- Cog:多 Agent 工作流引擎,本文的主角。一份 YAML 声明式 DAG/状态机,定义”哪些 agent 按什么顺序跑、在哪分支、在哪终止、何时读写记忆”,全程零 Python。
- Memory:声明在 Cog 里的共享节点,不是 agent 的私有属性,由
MemoryManager统管,底层默认 ChromaDB。 - Persona:YAML 身份配置,分
static(注入 prompt 的核心身份)和retrieval(可存入向量库按相关性召回)两段;同时作为记忆的存储命名空间,隔离不同人格的数据。
运行时组件
Cog 引擎由 5 个组件协作,职责切分得极干净:
| 组件 | 文件 | 职责 |
|---|---|---|
Cog | cog.py | 主类。持有配置 + 各组件,跑主循环,管状态 |
AgentRegistry | core/agent_registry.py | 静态工厂。把 YAML agents: 段一次性实例化成 {id: Agent} |
TransitionResolver | core/transition_resolver.py | 纯查表器。给定”当前节点 + 各节点输出”,返回下一节点(或 None) |
AgentRunner | core/agent_runner.py | 跑单个 Agent,带 3 次重试(输出为空就重试) |
MemoryManager | core/memory_manager.py | 管记忆节点,提供 query_before / update_after 钩子 |
分工是理解 Cog 的关键:Cog 知道”流程”,Resolver 知道”怎么查表”,Runner 知道”怎么跑一个 agent”,MemoryManager 知道”记忆何时读写”。四者互不耦合、各自可测——这就是为什么 cog.py 里的单步循环只有寥寥几行。
一次 run 的执行流程
Context: 从 cog.run(user_input=...) 入口跟随真实代码走一遍。
Why this way: 主循环刻意保持极简,把记忆载入/记录、流程执行、结果提取拆成独立方法,方便单独测试和替换。
# cog.py 主循环(简化)def run(self, **kwargs): self.mem_mgr.load_chat(_ctx=kwargs, _state={}) # 先把对话历史载入 self._execute_workflow(**kwargs) # 跑流程 result = self._process_execution_result() # 按 end 规则提取返回值 self.mem_mgr.record_chat(self.context, result) # 把本轮记进 chat_history return result
def _execute_agent_flow(self): current = self.cog_config.cog.flow.start # 从 start 节点开始 self.transition_resolver.reset_visit_counts() while current: # None 就停 current = self._execute_single_agent_cycle(current)Key params:
flow.start— YAML 里声明的起始节点 idreset_visit_counts()— 每次运行清零决策节点的访问计数,用于防死循环while current—None表示撞到 end 或无路可走,循环退出
每个节点的单步循环 _execute_single_agent_cycle 只做 4 件事:
1. _prepare_agent_execution(agent_id) ├─ pre_agent_execution(agent_id) # 空钩子,子类可覆盖 └─ mem_mgr.query_before(agent_id, ...) # 记忆"读"钩子(在 agent 跑之前)2. _execute_agent(agent_id) └─ agent_runner.run_agent(...) # 带 3 次重试地调 LLM 输出存入 self.state[agent_id]3. _finalize_agent_execution(agent_id, output) ├─ process_agent_output(...) # 空钩子,子类可覆盖 ├─ trail_recorder.record_agent_output() # 记执行轨迹 └─ mem_mgr.update_after(agent_id, ...) # 记忆"写"钩子(在 agent 跑之后)4. _determine_next_agent(agent_id) └─ transition_resolver.get_next_agent(agent_id, self.state) # 查表Why this way: 记忆钩子是夹在 agent 执行前后的,而不是 agent 自己调的。这是 Cog 最关键的工程决策——agent 永远只是个”纯函数”(输入 prompt,输出文本),记忆读写是流程层的横切关注点(cross-cutting concern)。
三种转移类型:状态机的边
转移定义在 cog_config_structs.py:CogFlowTransition,Resolver 用 type 字段分发到三种边。
直接转移
transitions: analysis: decision # 写成字符串 → 解析成 direct跑完 analysis 无条件去 decision,这是最简单的线性流转。
决策转移(带分支 / 回环 / 防死循环)
Context: 让 LLM 决定下一步去哪。
Why this way: 分支的本质是让 LLM 在输出里塞一个结构化字段(通常 JSON),引擎读这个字段当路由 key。
decision: choice: # decision_key:读 agent 输出里的这个字段 "approve": response # decision_map:值 → 下一节点 "reject": analysis # 回环!回到 analysis 重做 fallback: response # 没匹配上 / 超限时去这 max_visits: 3 # 这个节点最多重入 3 次,超了强制走 fallbackKey params:
choice(decision_key)— 从state[current_agent_id]取的字段名- 决策值会小写归一化后查表,所以 agent 输出
"Approve"也能匹配"approve" max_visits— 唯一的防死循环机制,只对决策节点生效:访问次数 +1,超过即直接返回fallbackfallback— 兜底;没有则返回None(流程结束,但可能非预期)
结束转移
response: end: true # 或 end: "final.summary"(点号取值)撞到 end 节点,Resolver 返回 None,主循环退出。
一个完整的真实示例
仓库自带的 setup_files/cogs/example_cog.yaml,是带分支 + 回环 + 终止的三节点流程:
cog: name: "ExampleCog" chat_memory_enabled: false agents: - { id: analysis, template_file: cog_analyze_agent } - { id: decision, template_file: cog_decide_agent } - { id: response, template_file: cog_response_agent }
flow: start: analysis transitions: analysis: decision # 直接转移 decision: # 决策转移:可回环 choice: { "approve": response, "reject": analysis } fallback: response max_visits: 3 response: { end: true } # 结束转移执行图:
analysis ──→ decision ──approve──→ response(end) ✓ ↑ ↓ reject └──────┘ (最多 3 次,超了强制走 fallback→response)这就是 Cog 用 YAML 表达的”分析 → 质检 → 回复,质检不过打回重做,最多重做 3 次”——零 Python,全部由引擎按转移表推进。
Python 入口
Context: 用真实代码看怎么启动一个 Cog。
from agentforge import Cog # 实际是 agentforge.cog.Cog
cog = Cog(cog_file="ExampleCog") # 加载 .agentforge/cogs/ExampleCog.yamlresult = cog.run(user_input="你好") # 返回 end 节点输出 / 整个 stateKey params:
cog_file—.agentforge/cogs/下的 YAML 文件名(不含扩展名)cog.run(**ctx)— 传入的键值会成为{_ctx.xxx}占位符的数据源- 调试可用
cog.get_track_flow_trail()看执行轨迹(由TrailRecorder记录)
记忆钩子:反向索引的两张表
Context: AgentForge 最值得借鉴的部分是记忆读写时机的声明化。
Why this way: 不在每次执行时遍历所有记忆节点判断”现在该不该读写”,而是反转为 {agent_step: [mem_ids]} 的映射,运行时 O(1) 查表。
def _initialize_agent_memory_maps(self): # 遍历所有记忆节点,按它们的 query_before / update_after 字段建两张表 self.query_before_map = self._map_agents_to_memory_nodes("query_before") self.update_after_map = self._map_agents_to_memory_nodes("update_after") # 结果形如: { "understanding": ["persona_memory", "scratchpad"] }YAML 里这样声明记忆节点(example_cog_all_memories.yaml 真实片段):
memory: - id: persona_memory type: agentforge.storage.persona_memory.PersonaMemory query_before: understanding # 在 understanding 节点跑之前检索 query_keys: [user_input] # 用 ctx/state 里的 user_input 当查询串 update_after: response # 在 response 节点跑之后写入 update_keys: [understanding.insights, user_input] # 用这些当写入内容运行时,主循环里那两步钩子就变成:
进入 understanding 节点前 → query_before("understanding") → 查 query_before_map["understanding"] = ["persona_memory"] → 对 persona_memory 调 .query_memory(query_keys=["user_input"], _ctx, _state) → 结果写进 mem_obj.store跑完 response 节点后 → update_after("response") → 查 update_after_map["response"] = ["persona_memory"] → 用 update_keys 指定的字段(支持 understanding.insights 这种点号取值)调 .update_memory(...)Why this way: 这套设计有三点精妙:① 记忆声明和流程声明在同一份 YAML里,一眼可见”哪步前读、哪步后写”;② query_keys / update_keys 直接复用已有的 ctx/state 字段名,不写胶水代码;③ 默认自动加一个 chat_history 记忆节点(除非 chat_memory_enabled: false),不用声明就有对话历史。
内置记忆类型包括:Memory(基类,ChromaDB 语义检索)、PersonaMemory(按 persona 命名空间隔离)、ScratchPad(草稿本)、ChatHistoryMemory(对话历史,默认开启)。build_mem() 在每步执行前把所有记忆节点的 store 汇总成 {mem_id: store},作为 _mem 传给 agent,prompt 里就能写 {_mem.persona_memory.readable} 把检索结果塞进提示词。
黑板模式:三个占位符
Agent 跑的时候,agent.run(_ctx=context, _state=state, _mem=memory) 三个 dict 全传进去,prompt 模板用花括号占位符引用。PromptProcessor 在调 LLM 前做字符串替换——所以”节点间传参”不是函数调用,而是通过共享的 state dict + 模板占位符完成,这是典型的黑板模式(blackboard pattern):任何一个节点写入 state,后续节点都能读。
| 占位符 | 来源 | 典型用法 |
|---|---|---|
{_ctx.xxx} | cog.run(...) 传进来的外部输入 | {_ctx.user_input} |
{_state.xxx} | 其他 agent 的输出,xxx 是 agent id | {_state.analysis} |
{_mem.xxx.readable} | 记忆节点检索结果 | {_mem.general_memory.readable} |
{persona.static.name} | persona 配置 | 身份相关字段 |
返回值:end 的三种写法
_process_execution_result 决定 cog.run() 最终返回什么:
end 写法 | 返回什么 |
|---|---|
end: true | 最后一个执行节点的输出(state[last_agent]) |
end: "response" | 指定某个 agent 的输出(state["response"]) |
end: "final.summary" | 点号取值:state["final"]["summary"](支持嵌套) |
| 没命中 end(异常退出) | 返回整个 state dict |
点号取值用的是 ParsingProcessor.get_dot_notated,所以 agent 输出 JSON、用 end: "x.y.z" 取嵌套字段是官方支持的用法——这让”让哪个 agent 的哪个字段作为最终输出”完全声明化。
取舍:能借鉴什么,不该抄什么
Cog 的核心价值是把”流程”从 Python 里抽出来变成数据。值得借鉴的三点:
- 声明式转移表。 如果要加”先分类意图 → 决定走哪条路径 → 必要时回环修正”,不必改
if/else代码,写成一份 YAML 转移表(start / transitions / 分支 / 终止),引擎不变。它解决的是”顺序流程的声明化”。 query_before/update_after反向索引。 把”何时检索/写入记忆”从硬编码改成配置驱动,实现就是两张dict[step] → [mem_ids],几十行。max_visits+fallback防死循环。 比单纯”失败重试 N 次”更通用,因为它同时管”回环”和”重试”——只对决策节点生效,超限即兜底。
不该抄的:
- chromadb + sentence-transformers + torch 全家桶——对轻量应用太重,SQLite + LangGraph checkpointer 足够。
- Tools/Actions——已废弃,方向应转向 MCP 标准。
- Python 3.12 强依赖——会限制部署灵活性。
- 并发能力——Cog 主循环是纯串行状态机,不原生支持扇出并行;要并行得自己在某个 agent 里
asyncio.gather。
一句话总结:Cog 是”用 YAML 写状态机 + 把记忆读写时机配置化”的最小可行实现,朴素、可读、可复刻。