2703 words
14 minutes
AgentForge Cog 引擎详解

AgentForge Cog 引擎详解#

Cog 是什么#

一句话定位:Cog 是一个 YAML 驱动的状态机执行器。 它读取一份 YAML(定义节点 + 转移表),然后在一个 while 循环里反复”取当前节点 → 跑 → 查表找下一节点 → 直到撞到 end”。引擎本身没有任何智能,智能全在被调用的 Agent(LLM)里,Cog 只负责”按图走”。

之所以强调这点,是因为 AgentForge 官方把自己宣传为 “AGI framework”,听起来很神秘。但读完真实源码后,真相朴素得多——而正因为朴素,它才是一个你能看懂、能借鉴、甚至能用几十行代码复刻的东西。本文基于克隆的真实仓库 DataBassGit/AgentForge 的源码(cog.py / transition_resolver.py / agent_runner.py / memory_manager.py / cog_config_structs.py + 示例 YAML),把 Cog 引擎从原理讲到执行细节。


真实的 AgentForge:四个声明式概念#

先厘清 AgentForge 的真实面目,避免被名字误导:

维度真实情况
仓库DataBassGit/AgentForge(PyPI 包名 agentforge
核心机制Cog(声明式 YAML 工作流引擎),不是 persona 切换
Tools/Actions已废弃,README 明确说会被 MCP 标准取代
硬依赖chromadb + sentence-transformers + torch(包很重)
Python 版本要求 3.12

它把”agent 系统”拆成 4 个声明式 YAML 概念,全部住在 .agentforge/ 目录下:

.agentforge/
├── personas/ # 身份(Persona)
├── prompts/ # 提示词模板(Agent)
├── cogs/ # 工作流编排(Cog) ← 真正的"大脑"
└── settings/ # 模型/存储配置
  • Agent:不是”会思考的实体”,而是一个 YAML 提示词模板 + 一次模型调用。它绑定配置、渲染 prompt、调 LLM、解析输出。可单独跑,也可被 Cog 编排。
  • Cog:多 Agent 工作流引擎,本文的主角。一份 YAML 声明式 DAG/状态机,定义”哪些 agent 按什么顺序跑、在哪分支、在哪终止、何时读写记忆”,全程零 Python
  • Memory:声明在 Cog 里的共享节点,不是 agent 的私有属性,由 MemoryManager 统管,底层默认 ChromaDB。
  • Persona:YAML 身份配置,分 static(注入 prompt 的核心身份)和 retrieval(可存入向量库按相关性召回)两段;同时作为记忆的存储命名空间,隔离不同人格的数据。

运行时组件#

Cog 引擎由 5 个组件协作,职责切分得极干净:

组件文件职责
Cogcog.py主类。持有配置 + 各组件,跑主循环,管状态
AgentRegistrycore/agent_registry.py静态工厂。把 YAML agents: 段一次性实例化成 {id: Agent}
TransitionResolvercore/transition_resolver.py纯查表器。给定”当前节点 + 各节点输出”,返回下一节点(或 None
AgentRunnercore/agent_runner.py跑单个 Agent,带 3 次重试(输出为空就重试)
MemoryManagercore/memory_manager.py管记忆节点,提供 query_before / update_after 钩子

分工是理解 Cog 的关键:Cog 知道”流程”,Resolver 知道”怎么查表”,Runner 知道”怎么跑一个 agent”,MemoryManager 知道”记忆何时读写”。四者互不耦合、各自可测——这就是为什么 cog.py 里的单步循环只有寥寥几行。


一次 run 的执行流程#

Context:cog.run(user_input=...) 入口跟随真实代码走一遍。

Why this way: 主循环刻意保持极简,把记忆载入/记录、流程执行、结果提取拆成独立方法,方便单独测试和替换。

# cog.py 主循环(简化)
def run(self, **kwargs):
self.mem_mgr.load_chat(_ctx=kwargs, _state={}) # 先把对话历史载入
self._execute_workflow(**kwargs) # 跑流程
result = self._process_execution_result() # 按 end 规则提取返回值
self.mem_mgr.record_chat(self.context, result) # 把本轮记进 chat_history
return result
def _execute_agent_flow(self):
current = self.cog_config.cog.flow.start # 从 start 节点开始
self.transition_resolver.reset_visit_counts()
while current: # None 就停
current = self._execute_single_agent_cycle(current)

Key params:

  • flow.start — YAML 里声明的起始节点 id
  • reset_visit_counts() — 每次运行清零决策节点的访问计数,用于防死循环
  • while currentNone 表示撞到 end 或无路可走,循环退出

每个节点的单步循环 _execute_single_agent_cycle 只做 4 件事:

1. _prepare_agent_execution(agent_id)
├─ pre_agent_execution(agent_id) # 空钩子,子类可覆盖
└─ mem_mgr.query_before(agent_id, ...) # 记忆"读"钩子(在 agent 跑之前)
2. _execute_agent(agent_id)
└─ agent_runner.run_agent(...) # 带 3 次重试地调 LLM
输出存入 self.state[agent_id]
3. _finalize_agent_execution(agent_id, output)
├─ process_agent_output(...) # 空钩子,子类可覆盖
├─ trail_recorder.record_agent_output() # 记执行轨迹
└─ mem_mgr.update_after(agent_id, ...) # 记忆"写"钩子(在 agent 跑之后)
4. _determine_next_agent(agent_id)
└─ transition_resolver.get_next_agent(agent_id, self.state) # 查表

Why this way: 记忆钩子是夹在 agent 执行前后的,而不是 agent 自己调的。这是 Cog 最关键的工程决策——agent 永远只是个”纯函数”(输入 prompt,输出文本),记忆读写是流程层的横切关注点(cross-cutting concern)。


三种转移类型:状态机的边#

转移定义在 cog_config_structs.py:CogFlowTransition,Resolver 用 type 字段分发到三种边。

直接转移#

transitions:
analysis: decision # 写成字符串 → 解析成 direct

跑完 analysis 无条件去 decision,这是最简单的线性流转。

决策转移(带分支 / 回环 / 防死循环)#

Context: 让 LLM 决定下一步去哪。

Why this way: 分支的本质是让 LLM 在输出里塞一个结构化字段(通常 JSON),引擎读这个字段当路由 key。

decision:
choice: # decision_key:读 agent 输出里的这个字段
"approve": response # decision_map:值 → 下一节点
"reject": analysis # 回环!回到 analysis 重做
fallback: response # 没匹配上 / 超限时去这
max_visits: 3 # 这个节点最多重入 3 次,超了强制走 fallback

Key params:

  • choice(decision_key)— 从 state[current_agent_id] 取的字段名
  • 决策值会小写归一化后查表,所以 agent 输出 "Approve" 也能匹配 "approve"
  • max_visits唯一的防死循环机制,只对决策节点生效:访问次数 +1,超过即直接返回 fallback
  • fallback — 兜底;没有则返回 None(流程结束,但可能非预期)

结束转移#

response:
end: true # 或 end: "final.summary"(点号取值)

撞到 end 节点,Resolver 返回 None,主循环退出。

一个完整的真实示例#

仓库自带的 setup_files/cogs/example_cog.yaml,是带分支 + 回环 + 终止的三节点流程:

cog:
name: "ExampleCog"
chat_memory_enabled: false
agents:
- { id: analysis, template_file: cog_analyze_agent }
- { id: decision, template_file: cog_decide_agent }
- { id: response, template_file: cog_response_agent }
flow:
start: analysis
transitions:
analysis: decision # 直接转移
decision: # 决策转移:可回环
choice: { "approve": response, "reject": analysis }
fallback: response
max_visits: 3
response: { end: true } # 结束转移

执行图:

analysis ──→ decision ──approve──→ response(end) ✓
↑ ↓ reject
└──────┘ (最多 3 次,超了强制走 fallback→response)

这就是 Cog 用 YAML 表达的”分析 → 质检 → 回复,质检不过打回重做,最多重做 3 次”——零 Python,全部由引擎按转移表推进。

Python 入口#

Context: 用真实代码看怎么启动一个 Cog。

from agentforge import Cog # 实际是 agentforge.cog.Cog
cog = Cog(cog_file="ExampleCog") # 加载 .agentforge/cogs/ExampleCog.yaml
result = cog.run(user_input="你好") # 返回 end 节点输出 / 整个 state

Key params:

  • cog_file.agentforge/cogs/ 下的 YAML 文件名(不含扩展名)
  • cog.run(**ctx) — 传入的键值会成为 {_ctx.xxx} 占位符的数据源
  • 调试可用 cog.get_track_flow_trail() 看执行轨迹(由 TrailRecorder 记录)

记忆钩子:反向索引的两张表#

Context: AgentForge 最值得借鉴的部分是记忆读写时机的声明化。

Why this way: 不在每次执行时遍历所有记忆节点判断”现在该不该读写”,而是反转为 {agent_step: [mem_ids]} 的映射,运行时 O(1) 查表。

def _initialize_agent_memory_maps(self):
# 遍历所有记忆节点,按它们的 query_before / update_after 字段建两张表
self.query_before_map = self._map_agents_to_memory_nodes("query_before")
self.update_after_map = self._map_agents_to_memory_nodes("update_after")
# 结果形如: { "understanding": ["persona_memory", "scratchpad"] }

YAML 里这样声明记忆节点(example_cog_all_memories.yaml 真实片段):

memory:
- id: persona_memory
type: agentforge.storage.persona_memory.PersonaMemory
query_before: understanding # 在 understanding 节点跑之前检索
query_keys: [user_input] # 用 ctx/state 里的 user_input 当查询串
update_after: response # 在 response 节点跑之后写入
update_keys: [understanding.insights, user_input] # 用这些当写入内容

运行时,主循环里那两步钩子就变成:

进入 understanding 节点前 → query_before("understanding")
→ 查 query_before_map["understanding"] = ["persona_memory"]
→ 对 persona_memory 调 .query_memory(query_keys=["user_input"], _ctx, _state)
→ 结果写进 mem_obj.store
跑完 response 节点后 → update_after("response")
→ 查 update_after_map["response"] = ["persona_memory"]
→ 用 update_keys 指定的字段(支持 understanding.insights 这种点号取值)调 .update_memory(...)

Why this way: 这套设计有三点精妙:① 记忆声明和流程声明在同一份 YAML里,一眼可见”哪步前读、哪步后写”;② query_keys / update_keys 直接复用已有的 ctx/state 字段名,不写胶水代码;③ 默认自动加一个 chat_history 记忆节点(除非 chat_memory_enabled: false),不用声明就有对话历史。

内置记忆类型包括:Memory(基类,ChromaDB 语义检索)、PersonaMemory(按 persona 命名空间隔离)、ScratchPad(草稿本)、ChatHistoryMemory(对话历史,默认开启)。build_mem() 在每步执行前把所有记忆节点的 store 汇总成 {mem_id: store},作为 _mem 传给 agent,prompt 里就能写 {_mem.persona_memory.readable} 把检索结果塞进提示词。


黑板模式:三个占位符#

Agent 跑的时候,agent.run(_ctx=context, _state=state, _mem=memory) 三个 dict 全传进去,prompt 模板用花括号占位符引用。PromptProcessor 在调 LLM 前做字符串替换——所以”节点间传参”不是函数调用,而是通过共享的 state dict + 模板占位符完成,这是典型的黑板模式(blackboard pattern):任何一个节点写入 state,后续节点都能读。

占位符来源典型用法
{_ctx.xxx}cog.run(...) 传进来的外部输入{_ctx.user_input}
{_state.xxx}其他 agent 的输出,xxx 是 agent id{_state.analysis}
{_mem.xxx.readable}记忆节点检索结果{_mem.general_memory.readable}
{persona.static.name}persona 配置身份相关字段

返回值:end 的三种写法#

_process_execution_result 决定 cog.run() 最终返回什么:

end 写法返回什么
end: true最后一个执行节点的输出(state[last_agent]
end: "response"指定某个 agent 的输出(state["response"]
end: "final.summary"点号取值:state["final"]["summary"](支持嵌套)
没命中 end(异常退出)返回整个 state dict

点号取值用的是 ParsingProcessor.get_dot_notated,所以 agent 输出 JSON、用 end: "x.y.z" 取嵌套字段是官方支持的用法——这让”让哪个 agent 的哪个字段作为最终输出”完全声明化。


取舍:能借鉴什么,不该抄什么#

Cog 的核心价值是把”流程”从 Python 里抽出来变成数据。值得借鉴的三点:

  1. 声明式转移表。 如果要加”先分类意图 → 决定走哪条路径 → 必要时回环修正”,不必改 if/else 代码,写成一份 YAML 转移表(start / transitions / 分支 / 终止),引擎不变。它解决的是”顺序流程的声明化”。
  2. query_before / update_after 反向索引。 把”何时检索/写入记忆”从硬编码改成配置驱动,实现就是两张 dict[step] → [mem_ids],几十行。
  3. max_visits + fallback 防死循环。 比单纯”失败重试 N 次”更通用,因为它同时管”回环”和”重试”——只对决策节点生效,超限即兜底。

不该抄的:

  • chromadb + sentence-transformers + torch 全家桶——对轻量应用太重,SQLite + LangGraph checkpointer 足够。
  • Tools/Actions——已废弃,方向应转向 MCP 标准。
  • Python 3.12 强依赖——会限制部署灵活性。
  • 并发能力——Cog 主循环是纯串行状态机,不原生支持扇出并行;要并行得自己在某个 agent 里 asyncio.gather

一句话总结:Cog 是”用 YAML 写状态机 + 把记忆读写时机配置化”的最小可行实现,朴素、可读、可复刻。

AgentForge Cog 引擎详解
https://sgjki547.top/posts/agentforge-cog-引擎详解/
Author
SGJki
Published at
2026-06-15
License
CC BY-NC-SA 4.0