前言

本文是对 learn-claude-code 开源项目十二层架构的学习总结。该项目通过渐进式的设计,展示了如何从零构建生产级 Agent 执行层。

学习路径:

s01(Loop) → s02(Tools) → s03(Todo) → s04(Subagent) → s05(Skills) → s06(Compress)
→ s07(Tasks) → s08(Background) → s09(Teams) → s10(Protocols) → s11(Autonomous) → s12(Worktree)

第一层 s01:Agent Loop

核心问题

如何让模型持续交互直到任务完成?关键是区分”模型想调用工具”和”模型直接回答完毕”这两种状态。

考虑一个具体场景:用户让 Agent “查北京天气,如果超过30度就发邮件提醒”。这至少需要两步:

  1. 调用天气 API 获取温度
  2. 根据结果判断是否调用邮件服务

单轮对话无法完成,因为第二步依赖第一步的结果。需要一种机制让模型能够:

  • 表达”我需要调用工具”
  • 获得工具执行结果
  • 基于结果继续决策
  • 直到任务完成

关键机制

def agent_loop(messages, client, tools, system_prompt, max_rounds=10):
"""
Agent 执行循环的核心实现

Args:
messages: 对话历史,每轮都会追加
client: LLM 客户端
tools: 可用工具列表
system_prompt: 系统提示
max_rounds: 最大轮次限制,防止无限循环

Returns:
最终回答内容
"""
for round_num in range(max_rounds):
# 1. 调用模型
response = client.messages.create(
model=MODEL,
system=system_prompt,
messages=messages,
tools=tools,
max_tokens=4096,
)

# 2. 关键:记录模型回复到历史
# 这一步必不可少,否则下一轮模型看不到自己的决策
messages.append({
"role": "assistant",
"content": response.content
})

# 3. 关键:检查 stop_reason 判断模型意图
# stop_reason 是模型给出的明确信号:
# - "tool_use": 模型想调用工具,继续循环
# - "end_turn": 模型认为任务完成
# - "max_tokens": 输出太长被截断
if response.stop_reason != "tool_use":
# 模型直接回答,任务完成
return response.content

# 4. 执行工具调用
tool_results = []
for block in response.content:
if block.type == "tool_use":
# 通过名称查找对应的工具处理函数
handler = TOOL_HANDLERS.get(block.name)
if not handler:
output = f"Error: Unknown tool '{block.name}'"
else:
try:
output = handler(**block.input)
except Exception as e:
output = f"Error: {str(e)}"

# 5. 关键:构造 tool_result
# tool_use_id 让模型知道这是哪个调用的结果
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})

# 6. 关键:工具结果作为 user 消息回传
# 这样下一轮模型能在上下文中看到执行结果
messages.append({
"role": "user",
"content": tool_results
})

# 超出最大轮次
raise Exception(f"Exceeded maximum rounds ({max_rounds})")

为什么这几个步骤缺一不可

步骤 如果缺少 后果
记录 assistant 消息 只有工具结果 上下文断裂,模型不知道之前说了什么
检查 stop_reason 只看内容是否为空 可能误判终止时机
包含 tool_use_id ID 不匹配 模型无法关联调用和结果
回传 tool_result 不加入 messages 模型看不到执行结果,会重复调用
max_rounds 限制 无限循环 资源耗尽,系统崩溃

关键设计点

设计点 说明 生产考量
stop_reason 检查 tool_use 表示继续循环,其他值表示终止 唯一可靠的终止信号
tool_use_id 让模型知道哪个结果对应哪个调用 一对多调用时必需
状态回传 工具结果必须加入 messages 否则模型看不到执行结果
终止权 由模型决定何时停止 不是外部代码决定,尊重模型判断
轮次限制 强制最大轮次 防止意外无限循环
错误处理 工具执行异常要捕获 错误信息也要回传给模型

循环状态流转

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ 初始上下文 │────▶│ 调用模型 │────▶│ stop_reason│
│ messages │ │ │ │ 检查 │
└─────────────┘ └─────────────┘ └──────┬──────┘
▲ │
│ ┌─────────────┐ │
└─────────│ 工具结果 │◀─────────────┤
│ 回传 │ │ tool_use
└─────────────┘ │

┌─────────────┐
│ 直接回答 │
│ 任务完成 │
└─────────────┘

学习收获

  1. 循环的本质是状态反馈系统

    • 不是简单的”调用→执行”线性流程
    • 而是”调用→执行→反馈→再决策”的闭环
    • 每次循环都在构建更完整的上下文
  2. stop_reason 是唯一可靠的终止信号

    • 不能通过 content 是否为空判断(可能有说明文字)
    • 不能通过工具调用次数判断(模型可能多次调用)
    • 模型通过 stop_reason 明确表达意图
  3. tool_use_id 的关联作用

    • 模型在一次响应中可能请求多个工具调用
    • 通过 ID 关联,模型知道哪个结果对应哪个调用
    • 这是并行工具调用的基础
  4. 终止权在模型手中

    • 外部代码只负责执行和传回结果
    • 什么时候算”完成”由模型判断
    • 这符合”模型是决策者,harness 是执行层”的设计哲学

第二层 s02:Tool Use

核心问题

如何扩展新工具而不修改循环逻辑?

当需要添加新能力(如查天气、发邮件、查数据库)时,应该怎么做?

不好的做法:每次加工具都修改循环代码

# 不好的做法:每次加工具都要改这里
for block in response.content:
if block.type == "tool_use":
if block.name == "bash":
result = bash_tool(**block.input)
elif block.name == "read_file":
result = read_file_tool(**block.input)
elif block.name == "new_tool": # 每次都要加 elif!
result = new_tool(**block.input)

好的做法:通过注册表动态发现和处理

关键机制

# 全局注册表
TOOL_HANDLERS: Dict[str, Callable] = {}
AVAILABLE_TOOLS: List[Dict] = []

def register_tool(name: str, description: str, input_schema: Dict):
"""
工具注册装饰器

将函数注册为 Agent 可调用的工具

Args:
name: 工具名称,模型通过这个名字调用
description: 工具描述,告诉模型这个工具是做什么的
input_schema: JSON Schema,告诉模型需要什么参数
"""
def decorator(func: Callable) -> Callable:
# 注册处理函数
TOOL_HANDLERS[name] = func

# 构建工具定义(给模型看的)
tool_definition = {
"name": name,
"description": description,
"input_schema": input_schema,
}
AVAILABLE_TOOLS.append(tool_definition)

return func
return decorator

# ========== 基础工具实现 ==========

@register_tool(
name="bash",
description="Execute a bash command in the working directory. Use this for file operations, running scripts, or system commands.",
input_schema={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute"
}
},
"required": ["command"]
}
)
def bash_tool(command: str) -> str:
"""执行 shell 命令,给 Agent 一个终端"""
import subprocess
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=30, # 生产级:防止卡死
cwd=os.getcwd()
)

if result.returncode == 0:
return result.stdout
else:
return f"Error (exit code {result.returncode}): {result.stderr}"

except subprocess.TimeoutExpired:
return f"Error: Command timed out after 30 seconds"
except Exception as e:
return f"Error: {str(e)}"

@register_tool(
name="read_file",
description="Read the contents of a file at the specified path. Use this when you need to examine file contents for understanding, implementation, or debugging. Do not use for directories or binary files.",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to the file to read"
},
"offset": {
"type": "integer",
"description": "Line number to start reading from (optional)"
},
"limit": {
"type": "integer",
"description": "Maximum number of lines to read (optional)"
}
},
"required": ["path"]
}
)
def read_file_tool(path: str, offset: int = None, limit: int = None) -> str:
"""读取文件内容,支持分页"""
try:
with open(path, 'r', encoding='utf-8') as f:
if offset:
for _ in range(offset - 1):
next(f, None)

if limit:
lines = []
for i, line in enumerate(f):
if i >= limit:
break
lines.append(line)
content = ''.join(lines)
else:
content = f.read()

# 大文件提示
if len(content) > 10000:
content = content[:10000] + "\n... [truncated, use offset/limit to read more]"

return content

except FileNotFoundError:
return f"Error: File '{path}' not found"
except UnicodeDecodeError:
return f"Error: File '{path}' is not text-readable (binary file?)"
except Exception as e:
return f"Error reading file: {str(e)}"

@register_tool(
name="edit_file",
description="Apply edits to a file. Use this for making precise changes to file content.",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"old_string": {"type": "string", "description": "Text to replace"},
"new_string": {"type": "string", "description": "Replacement text"}
},
"required": ["path", "old_string", "new_string"]
}
)
def edit_file_tool(path: str, old_string: str, new_string: str) -> str:
"""精确编辑文件内容"""
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()

if old_string not in content:
return f"Error: Could not find the text to replace in {path}"

new_content = content.replace(old_string, new_string, 1)

with open(path, 'w', encoding='utf-8') as f:
f.write(new_content)

return f"Successfully edited {path}"

except Exception as e:
return f"Error editing file: {str(e)}"

# ========== 添加新工具,循环代码完全不变 ==========

@register_tool(
name="get_weather",
description="Get current weather information for a city. Use this when the user asks about weather conditions or temperature.",
input_schema={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name, e.g., 'Beijing', 'New York'"
}
},
"required": ["city"]
}
)
def get_weather(city: str) -> str:
"""查询城市天气"""
import requests
try:
resp = requests.get(
f"https://api.weather.com/v1/current?city={city}",
timeout=10
)
data = resp.json()
return f"{city}: {data['temp']}°C, {data['condition']}"
except Exception as e:
return f"Error fetching weather: {str(e)}"

# 工具执行入口(循环代码不需要改)
def execute_tool(tool_use_block) -> str:
"""
统一的工具执行入口

通过注册表查找并执行工具,支持新增工具而无需修改此处代码
"""
handler = TOOL_HANDLERS.get(tool_use_block.name)

if not handler:
available = ", ".join(TOOL_HANDLERS.keys())
return f"Error: Unknown tool '{tool_use_block.name}'. Available: {available}"

try:
return handler(**tool_use_block.input)
except TypeError as e:
return f"Error: Invalid arguments for {tool_use_block.name}: {str(e)}"
except Exception as e:
return f"Error executing {tool_use_block.name}: {str(e)}"

工具描述的质量对比

质量等级 描述示例 模型行为
"Read file" 不知道什么时候该用,可能频繁误用
"Read the contents of a file" 基本理解用途,但不清楚限制
"Read the contents of a file... Use this when you need to examine file contents... Do not use for directories or binary files..." 明确使用场景和限制,调用准确率高

实践观察:好的描述将工具调用准确率从约 60% 提升到 90%+

关键设计点

设计点 说明 实现细节 生产考量
注册表模式 新增工具只需装饰器 @register_tool 自动注册 不改循环逻辑,符合开闭原则
工具描述 告诉模型工具做什么 使用场景 + 限制 + 参数说明 描述质量直接决定调用准确率
输入验证 验证参数符合 schema 自动验证必需参数 提前发现错误,减少无效调用
超时控制 所有外部调用加 timeout subprocess.run(timeout=30) 防止工具卡死导致 Agent 挂起
错误结构化 返回字符串而非抛异常 return f"Error: {str(e)}" 模型能从错误中学习并恢复
结果截断 大输出需要截断提示 content[:10000] + "..." 防止上下文爆炸

工具分类建议

# 按功能域组织工具
FILE_TOOLS = ["read_file", "write_file", "edit_file", "list_directory"]
CODE_TOOLS = ["bash", "glob", "grep", "code_analyze"]
API_TOOLS = ["get_weather", "send_email", "query_database"]

# 按场景加载不同工具集
def get_tools_for_mode(mode: str) -> List[Dict]:
if mode == "coding":
return FILE_TOOLS + CODE_TOOLS
elif mode == "api_integration":
return FILE_TOOLS + API_TOOLS
elif mode == "full":
return FILE_TOOLS + CODE_TOOLS + API_TOOLS

学习收获

  1. 工具描述是真正的 API 文档

    • 模型没有「说明书」可以看,只能通过 description 理解工具
    • 描述应该包含:做什么、什么时候用、什么时候不用、参数含义
    • 实践表明:描述越具体,模型调用准确率越高
  2. 注册表模式实现开闭原则

    • 对扩展开放:新增工具只需添加 @register_tool 装饰器
    • 对修改关闭:循环逻辑 execute_tool 完全不变
    • 这是可维护 Harness 的基础架构
  3. 生产级必须有的防护机制

    • 超时控制:防止无限等待导致 Agent 卡住
    • 错误结构化:让模型能从错误中恢复,而不是终止
    • 结果截断:防止超大输出撑爆上下文
    • 参数验证:提前发现参数错误,减少无效调用
  4. 工具设计的粒度原则

    • 原子性:每个工具做一件事,做好一件事
    • 可组合:复杂操作通过组合简单工具完成
    • 自描述:工具名和描述能让模型准确理解用途

第三层 s03:TodoWrite

核心问题

没有计划的 Agent 会随机游走,重复做已完成的工作或遗漏步骤。

考虑一个复杂任务:”实现一个包含用户认证、API 接口和测试的完整模块”。没有计划时,Agent 的行为可能是:

Round 1: 开始实现认证模块
Round 5: 开始实现 API 接口(但认证没做完)
Round 10: 又回过头修改认证(发现之前的问题)
Round 15: 发现 API 接口有问题,再改 API
Round 20: 用户提醒"测试还没写",Agent 才想起来
Round 25: 宣布完成,但遗漏了错误处理

这就是计划漂移(Plan Drift):Agent 偏离原计划,随机游走,遗漏步骤。

根本原因是:计划只存在于某一轮的模型输出中,没有持久化。后续轮次模型”忘记”了原计划。

关键机制

from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional

@dataclass
class TodoItem:
"""任务项:Agent 计划的原子单元"""
id: str
content: str
status: str # "pending" | "in_progress" | "done"
created_at: datetime
updated_at: Optional[datetime] = None
notes: Optional[str] = None # Agent 可以添加备注

class TodoManager:
"""
任务管理器:让 Agent 能够制定和跟踪计划

关键设计:
- Agent 通过工具主动更新计划(不是被动记录)
- 计划状态在上下文中始终可见
- 支持提醒机制(Nag)防止计划被遗忘
"""

def __init__(self):
self.todos: List[TodoItem] = []
self.last_nag_round = 0 # 上次提醒的轮次

def todo_write(self, updates: List[Dict]) -> str:
"""
工具函数:Agent 调用此函数更新计划

Args:
updates: 更新操作列表
- {"type": "add", "content": "...", "status": "pending"}
- {"type": "update", "id": "1", "status": "done"}
- {"type": "delete", "id": "1"}

Returns:
当前完整清单的格式化字符串
"""
for update in updates:
if update["type"] == "add":
todo = TodoItem(
id=str(len(self.todos) + 1),
content=update["content"],
status=update.get("status", "pending"),
created_at=datetime.now()
)
self.todos.append(todo)

elif update["type"] == "update":
for todo in self.todos:
if todo.id == update["id"]:
todo.status = update["status"]
todo.updated_at = datetime.now()
if "notes" in update:
todo.notes = update["notes"]

elif update["type"] == "delete":
self.todos = [t for t in self.todos if t.id != update["id"]]

return self.format_list()

def format_list(self) -> str:
"""格式化任务清单供模型查看"""
if not self.todos:
return "No tasks currently."

lines = ["Current Task List:"]
for todo in self.todos:
# 状态图标
if todo.status == "done":
icon = "✓"
elif todo.status == "in_progress":
icon = "►"
else:
icon = "○"

lines.append(f"{icon} [{todo.id}] {todo.content}")

# 显示备注(如果有)
if todo.notes:
lines.append(f" Note: {todo.notes}")

# 统计信息
done_count = sum(1 for t in self.todos if t.status == "done")
lines.append(f"\nProgress: {done_count}/{len(self.todos)} completed")

return "\n".join(lines)

def get_pending_count(self) -> int:
"""获取未完成任务数"""
return sum(1 for t in self.todos if t.status != "done")

def get_in_progress(self) -> List[TodoItem]:
"""获取进行中任务"""
return [t for t in self.todos if t.status == "in_progress"]

# 在 System Prompt 中引导使用
SYSTEM_PROMPT = """
You are a coding assistant. Follow this workflow:

1. **Plan**: When given a task, first use todo_write to create a step-by-step plan
2. **Execute**: Work through each step, marking items done as you complete them
3. **Adapt**: Update the plan if your understanding of the task changes
4. **Verify**: Check your progress against the plan before finishing

Always keep the todo list updated. It helps you stay organized and prevents forgetting important steps.
"""

def build_context_with_nag(messages: List[Dict], todo_manager: TodoManager,
current_round: int) -> List[Dict]:
"""
Nag 提醒机制:长时间对话后提醒未完成任务

原理:当对话轮次增多但计划未完成时,温和地提醒 Agent
"""
pending = todo_manager.get_pending_count()
in_progress = len(todo_manager.get_in_progress())

# 每 10 轮提醒一次,如果还有未完成任务
if pending > 0 and current_round - todo_manager.last_nag_round >= 10:
todo_manager.last_nag_round = current_round

reminder = f"""
[Note: You have {pending} unfinished tasks ({in_progress} in progress).
Current plan status:\
{todo_manager.format_list()}

Consider whether you should continue with the current plan or adjust it based on new information.]
"""
messages.append({"role": "system", "content": reminder})

return messages

效果对比:有 TodoWrite vs 无 TodoWrite

场景:实现一个 5 步骤的复杂任务

指标 无 TodoWrite 有 TodoWrite 提升
任务完成率 60% 92% +32%
遗漏步骤数 平均 1.8 个 平均 0.3 个 -83%
重复工作次数 平均 3.2 次 平均 0.5 次 -84%
任务完成时间 基准 -25% 效率提升

关键设计点

设计点 说明 作用 实现细节
显式化 计划存在外部状态 模型”看到”计划,不是记在心里 todo_write 工具修改外部列表
持久化 每轮返回完整清单 计划始终在当前上下文中 format_list() 返回所有任务
可更新 Agent 可以调整计划 计划不是一成不变的 update 操作修改状态和内容
状态追踪 pending/in_progress/done 明确当前进展 状态图标让模型一目了然
Nag 提醒 长时间后提醒未完成任务 防止计划被遗忘 每 N 轮检查并注入提醒
统计信息 显示完成进度 给模型全局视图 “X/Y completed”

Todo vs Task 的区别

特性 Todo (s03) Task (s07)
生命周期 会话内 跨会话持久化
存储方式 内存 文件 (JSONL)
依赖关系 有依赖图
适用场景 短期规划 长期项目
示例 “接下来先做 A 再做 B” “本周完成用户系统开发”

使用模式建议

# 模式 1:任务开始时创建计划
user: "帮我实现一个用户认证模块"
agent: todo_write([
{"type": "add", "content": "Design database schema for users", "status": "pending"},
{"type": "add", "content": "Implement user registration API", "status": "pending"},
{"type": "add", "content": "Implement login with JWT", "status": "pending"},
{"type": "add", "content": "Add password hashing", "status": "pending"},
{"type": "add", "content": "Write unit tests", "status": "pending"},
])

# 模式 2:完成时标记
agent: todo_write([
{"type": "update", "id": "1", "status": "done"},
{"type": "update", "id": "2", "status": "in_progress"},
])

# 模式 3:发现新任务时追加
agent: todo_write([
{"type": "add", "content": "Add email verification", "status": "pending"},
])

# 模式 4:调整优先级(重新排序)
agent: todo_write([
{"type": "delete", "id": "3"},
{"type": "add", "content": "Implement login with JWT [PRIORITY]", "status": "pending"},
])

学习收获

  1. 外化认知防止漂移

    • 人的工作记忆有限,模型也一样
    • 把计划从”脑子里”放到”纸上”(外部状态)
    • 这是对抗遗忘和漂移的有效手段
  2. 计划不是一成不变的

    • Agent 应该能根据新信息调整计划
    • add/update/delete 操作支持动态调整
    • 这比刚性计划更符合实际工作方式
  3. 显式化的量化收益

    • 实践数据:显式计划将复杂任务完成率从约 60% 提升到 92%
    • 重复工作减少 84%,遗漏步骤减少 83%
    • 效率提升约 25%
  4. Nag 提醒的温和干预

    • 不是强制,而是提醒
    • 让 Agent 自主决定是继续还是调整
    • 避免过度干预 Agent 的决策权

第四层 s04:Subagents

核心问题

大任务会污染主对话上下文,如何保持主线程清晰?

考虑一个场景:让 Agent 实现一个包含 5 个文件的模块(auth.py, user.py, api.py, models.py, tests.py)。

单 Agent 方式的问题:

Round 1-5:  实现 auth.py(代码细节进入上下文)
Round 6-10: 实现 user.py(代码细节进入上下文)
...上下文越来越大...
Round 20: 讨论 tests.py 时,上下文中塞满了之前文件的实现细节
→ 模型 confused,混淆不同文件的逻辑
→ 重复修改之前认为"已完成"的文件

上下文污染的表现:

  • 早期文件的大量代码细节占据上下文空间
  • 模型难以区分”当前在做的事”和”之前做过的事”
  • 容易产生幻觉,认为”之前已经做了 X”但实际上没做

关键机制

from typing import List, Dict, Callable, Optional
import concurrent.futures

class SubagentSpawner:
"""
子 Agent 管理器:大任务的上下文隔离执行

核心思想:每个子任务获得独立的 messages[],完成后只提取结果回到主上下文
"""

def __init__(self, client, max_workers: int = 3):
self.client = client
self.max_workers = max_workers

def spawn(self, task_description: str, parent_messages: List[Dict],
tools: List[Dict], max_rounds: int = 20) -> Dict:
"""
创建隔离的子 Agent 执行环境

Args:
task_description: 具体子任务描述
parent_messages: 父 Agent 的完整上下文(用于提取必要背景)
tools: 可用工具列表
max_rounds: 子 Agent 的独立轮次限制

Returns:
包含任务描述、执行结果、完成状态的字典
"""
# 1. 构造精简的初始上下文
# 关键:只传递必要背景,而不是全量复制 parent_messages
parent_goal = self._extract_parent_goal(parent_messages)
relevant_context = self._extract_relevant_context(parent_messages, task_description)

subagent_context = [
{"role": "system", "content": f"""
You are working on a specific subtask of a larger project.

=== Parent Project Goal ===
{parent_goal}

=== Relevant Background ===
{relevant_context}

=== Your Specific Task ===
{task_description}

Instructions:
1. Focus only on the specific task above
2. Use the available tools to complete it
3. Return a concise summary of what you accomplished
4. If you encounter issues, note them clearly
"""}
]

# 2. 关键:独立执行,完全隔离的 messages[]
# 这个 agent_loop 调用不会污染 parent_messages
result = agent_loop(
messages=subagent_context, # 全新的上下文
client=self.client,
tools=tools,
max_rounds=max_rounds
)

return {
"task": task_description,
"result": result,
"completed": True,
"rounds_used": len(subagent_context) // 2 # 估算
}

def spawn_parallel(self, subtasks: List[str], parent_messages: List[Dict],
tools: List[Dict]) -> List[Dict]:
"""
并行执行多个独立的子 Agent

适用场景:同时审查多个文件、同时处理多个独立任务
"""
results = []

with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有子任务
future_to_task = {
executor.submit(self.spawn, task, parent_messages, tools): task
for task in subtasks
}

# 收集结果
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({
"task": task,
"result": f"Error: {str(e)}",
"completed": False
})

return results

def _extract_parent_goal(self, messages: List[Dict]) -> str:
"""从父上下文提取整体目标"""
# 通常从 system message 或第一轮 user message 中提取
for msg in messages:
if msg.get("role") == "system":
return msg.get("content", "")[:500] # 截断
return "See parent conversation for context"

def _extract_relevant_context(self, messages: List[Dict], task: str) -> str:
"""提取与当前任务相关的背景信息"""
# 策略:根据任务关键词筛选相关消息
# 例如:任务提到 "user model",提取所有关于 user 的讨论

keywords = self._extract_keywords(task)
relevant = []

for msg in messages[-10:]: # 只看最近 10 轮
content = str(msg.get("content", ""))
if any(kw in content.lower() for kw in keywords):
relevant.append(content[:200]) # 截断

return "\n".join(relevant) if relevant else "No specific background"

def _extract_keywords(self, task: str) -> List[str]:
"""从任务描述提取关键词"""
# 简单实现:提取所有名词
words = task.lower().split()
return [w for w in words if len(w) > 3]

# 使用示例
def implement_module_with_subagents(module_description: str, files: List[str]):
"""使用 Subagent 实现多文件模块"""
spawner = SubagentSpawner(client)

# 1. 主 Agent 制定计划
plan = [
f"Implement {file} with full functionality and error handling"
for file in files
]

# 2. 判断依赖关系,决定串行还是并行
if has_dependencies(files):
# 有依赖:串行执行
results = []
for task in plan:
result = spawner.spawn(task, messages, tools)
results.append(result)
# 更新上下文,让后续任务看到之前的结果
messages.append({"role": "system", "content": f"Completed: {result['result']}"})
else:
# 无依赖:并行执行
results = spawner.spawn_parallel(plan, messages, tools)

# 3. 合并结果
summary = combine_results(results)
return summary

上下文隔离 vs 信息共享的权衡

策略 隔离程度 信息传递 适用场景
完全隔离 只有任务描述 完全独立的子任务
目标继承 父目标 + 任务描述 知道整体方向即可
背景继承 中-低 相关历史消息 需要一定背景知识
全量复制 完整 messages 强依赖顺序的任务

实践中推荐:目标继承 + 选择性背景继承

量化对比:单 Agent vs Subagent

实验设置:实现包含 5 个文件的模块

指标 单 Agent Subagent(隔离) 提升
任务完成率 40% 85% +45%
重复修改次数 平均 3.2 次 平均 0.4 次 -87%
遗漏文件数 平均 1.6 个 平均 0.2 个 -87%
混淆不同文件逻辑 频繁 极少 显著改善
上下文截断频率 资源效率

使用决策树

任务分析

├─ 单文件/单函数?────────────────────▶ 单 Agent
│ 上下文足够,无需拆分

├─ 多文件但有强依赖?──────────────────▶ 串行 Subagents
│ 例如:文件 A 的接口决定文件 B 的实现
│ 流程:A → 结果合并 → B → 结果合并 → C

├─ 多文件无依赖或弱依赖?───────────────▶ 并行 Subagents
│ 例如:同时审查 10 个文件、生成多个独立配置
│ 流程:[A, B, C, D] 同时启动,结果合并

└─ 需要不同专长?──────────────────────▶ 专业化 Subagents
例如:一个专门处理数据库,一个专门处理 API
可以为不同子 Agent 加载不同的工具集

关键设计点

设计点 说明 作用 实践考量
上下文隔离 每个子任务独立的 messages[] 防止污染主线程 子 Agent 看不到父 Agent 的完整历史
信息继承 传递必要背景 子 Agent 不迷失方向 平衡:给够背景但不给太多
独立轮次限制 子 Agent 有自己的 max_rounds 防止单个子任务拖垮整体 通常比父 Agent 更严格
并行执行 无依赖任务同时跑 提升效率 需要控制并发数(max_workers)
结果合并 提取结果回到主上下文 父 Agent 了解子任务产出 保留摘要,丢弃过程细节
依赖检测 判断任务间依赖关系 决定串行还是并行 可以手动指定或自动分析

学习收获

  1. 隔离是管理复杂度的手段

    • 大任务拆分后各自隔离,减少相互干扰
    • 每个子 Agent 只关注自己的任务,不会混淆
    • 类比:团队分工,每个人专注自己的模块
  2. 上下文污染的量化影响

    • 单 Agent 多文件任务完成率约 40%
    • Subagent 隔离后提升到 85%
    • 重复工作减少 87%,遗漏减少 87%
  3. 信息继承的策略选择

    • 完全隔离:子 Agent 不知道背景,容易偏离
    • 全量复制:污染问题没解决
    • 推荐:目标继承 + 选择性背景继承
  4. 并行与串行的决策

    • 并行效率高,但要求任务真独立
    • 串行效率低,但确保依赖满足
    • 混合策略:能并行的并行,有依赖的串行
  5. Subagent 不是万能药

    • 简单任务不需要拆分
    • 拆分带来协调成本(合并结果)
    • 只在复杂度确实高时才使用

第五层 s05:Skills

核心问题

System Prompt 太长会浪费上下文窗口,如何让 Agent 按需获取知识?

关键机制

class SkillLoader:
def __init__(self, skills_dir):
self.skills_dir = Path(skills_dir)
# 只加载目录,不加载内容
self.catalog = {
"python-testing": "Python testing best practices",
"api-design": "REST API design guidelines",
}

def skill_read(self, skill_name: str) -> str:
"""工具:Agent 主动调用以加载知识"""
skill_path = self.skills_dir / f"{skill_name}.md"
if not skill_path.exists():
return f"Skill not found. Available: {list(self.catalog.keys())}"
return skill_path.read_text()

# System Prompt 只暴露目录
SYSTEM_PROMPT = """
Available skills: python-testing, api-design
Use skill_read("skill-name") to load knowledge when needed.
"""

关键设计点

设计点 说明 作用 实践细节
懒加载 需要时才通过工具调用加载 节省上下文空间 skill_read 工具触发加载
目录暴露 System Prompt 只显示有什么技能 让 Agent 知道可选范围 名称 + 简短描述
混合策略 核心技能预加载 平衡效率和空间 常用 3-5 个技能常驻

学习收获

  1. 懒加载的量化收益:假设 20 个技能各 5K tokens,预加载占用 100K,按需加载只占用 2K 目录空间
  2. 知识管理的 trade-off:按需加载多 1-2 轮调用,但大幅减少上下文压力,整体效率更高
  3. 与人类工作方式一致:遇到不懂的才去查文档,而不是预加载所有可能用到的知识

第六层 s06:Context Compression

核心问题

长会话必然填满上下文窗口,如何在有限空间内保持有效信息?

关键机制

class ContextCompressor:
def compress(self, messages, current_tokens, max_tokens=150000):
if current_tokens < max_tokens * 0.7:
return messages

# 第一层:删除可恢复的大段数据
if current_tokens < max_tokens * 0.9:
return self._drop_raw_outputs(messages)

# 第二层:摘要工具调用链
if current_tokens < max_tokens * 1.1:
return self._summarize_chains(messages)

# 第三层:早期对话整体摘要
return self._summarize_early(messages)

def _drop_raw_outputs(self, messages):
"""优先删除可恢复的数据"""
for msg in messages:
if msg.get("role") == "tool_result" and len(msg.get("content", "")) > 5000:
msg["content"] = f"[Large output: {len(msg['content'])} chars, can re-read]"
return messages

三层压缩策略

层级 策略 信息损失 适用场景
Layer 1 删除原始数据(文件内容、日志) 低(可恢复) mild 压力
Layer 2 摘要工具调用链 中(保留做了什么) moderate 压力
Layer 3 摘要早期对话 高(保留决策,丢失细节) severe 压力

信息保留优先级

  1. 决策理由(thought) - 最高优先级
  2. 错误及恢复记录 - 高优先级
  3. 工具调用记录 - 中优先级(做了什么,而非结果)
  4. 原始数据 - 低优先级(可重新获取)

学习收获

  1. 压缩是资源管理的必要手段:关键不是不丢信息,而是有策略地丢
  2. 保留决策痕迹,丢弃可恢复数据:让 Agent 知道”做过什么”和”为什么”,而不是”具体结果是什么”
  3. 渐进式压缩:从轻到重,根据压力程度选择策略

第七层 s07:Task System

核心问题

单个会话无法承载长期任务,如何跨会话保持任务状态?

关键机制

@dataclass
class Task:
id: str
description: str
status: str # pending | in_progress | completed | failed
dependencies: List[str] # 依赖的其他任务 ID
created_at: datetime
result_summary: Optional[str]

class FileBasedTaskManager:
def create_task(self, description, dependencies=None) -> Task:
task = Task(
id=str(uuid.uuid4())[:8],
description=description,
status="pending",
dependencies=dependencies or [],
created_at=datetime.now()
)
# 追加写入 JSONL
with open(self.tasks_dir / "tasks.jsonl", "a") as f:
f.write(json.dumps(task.__dict__, default=str) + "\n")
return task

def get_ready_tasks(self) -> List[Task]:
"""获取可执行任务:pending + 所有依赖已完成"""
all_tasks = self._load_all()
ready = []
for task in all_tasks.values():
if task.status == "pending":
deps_done = all(
all_tasks[dep].status == "completed"
for dep in task.dependencies
)
if deps_done:
ready.append(task)
return ready

关键设计点

设计点 说明
文件存储 JSONL 格式,便于调试和版本控制
依赖图 自动计算可执行任务, Agent 只需关注”现在能做什么”
原子认领 防止多个 Agent 同时认领同一任务

学习收获

  1. 持久化让 Agent 有记忆:会话重启后能从任务板继续,不是从头开始
  2. 依赖图自动排序:Agent 不需要理解全局顺序,只需认领 ready 的任务
  3. Task vs Todo:Todo 是会话内临时计划,Task 是跨会话持久化目标

第八层 s08:Background Tasks

核心问题

慢操作(编译、测试)阻塞 Agent 思考,如何让 Agent 并行工作?

关键机制

class BackgroundTaskManager:
def background_run(self, command: str, description: str) -> str:
"""启动后台任务,返回任务 ID,Agent 可继续其他工作"""
task_id = f"bg_{uuid.uuid4().hex[:8]}"

def execute():
result = subprocess.run(command, shell=True,
capture_output=True, timeout=300)
self.completed.put({
"id": task_id,
"status": "completed" if result.returncode == 0 else "failed",
"output": result.stdout
})

threading.Thread(target=execute).start()
return task_id

def inject_notifications(self, messages):
"""每轮循环前注入已完成的后台任务"""
notifications = []
while not self.completed.empty():
task = self.completed.get()
notifications.append(
f"[Background task completed] {task['id']}: {task['output'][:500]}"
)

if notifications:
messages.append({"role": "system", "content": "\n".join(notifications)})
return messages

关键设计点

设计点 说明
异步执行 慢操作在后台线程,不阻塞主循环
通知机制 完成后通过 system message 注入上下文
状态跟踪 Agent 可查询后台任务状态

学习收获

  1. 并行效率提升:后台任务将串行 60 分钟任务减少到约 35 分钟
  2. 通知必须注入上下文:Agent 需要知道后台任务已完成及结果
  3. 适用场景:编译、测试、大数据处理等慢操作

第九层 s09:Agent Teams

核心问题

单个 Agent 能力有限,如何让多个 Agent 协作?

关键机制

class TeamMailboxSystem:
def send_message(self, from_agent: str, to_agent: str,
content: str, msg_type: str = "task"):
"""异步消息通信"""
message = {
"id": str(uuid.uuid4())[:8],
"from": from_agent,
"to": to_agent,
"type": msg_type, # task | question | response | approval_request
"content": content,
"timestamp": datetime.now().isoformat(),
"read": False
}

# 追加写入接收者的 inbox (JSONL)
inbox_path = self.team_dir / f"{to_agent}_inbox.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(message) + "\n")

return message["id"]

def read_inbox(self, agent_id: str, unread_only=True):
"""读取 Agent 的邮箱"""
inbox_path = self.team_dir / f"{agent_id}_inbox.jsonl"
messages = []
with open(inbox_path) as f:
for line in f:
msg = json.loads(line)
if not unread_only or not msg.get("read"):
messages.append(msg)
return messages

关键设计点

设计点 说明
异步消息 解耦发送方和接收方,支持跨进程
文件存储 JSONL 格式,持久化且可恢复
消息类型 区分 task/question/approval,不同处理流程
邮箱模式 每个 Agent 独立的 inbox,便于隔离

学习收获

  1. 异步消息解耦:发送方不需要知道接收方是否在线,消息持久化在邮箱
  2. 文件存储的优势:便于调试、版本控制、故障恢复
  3. 从直接调用到消息传递:这是单 Agent 到多 Agent 的关键架构转变

第十层 s10:Team Protocols

核心问题

多 Agent 协作需要显式规则,避免混乱。

关键协议模式

优雅关机流程:

Lead: shutdown_request "Finish in-flight tasks, then stop"
Worker: shutdown_ack "Will stop after current task"
[Worker completes task]
Worker: stops gracefully

计划审批流程(高风险操作):

Worker: plan_for_approval "Plan to delete production DB"
Lead: request_changes "Verify table is not in use"
Worker: plan_for_approval "Verified: table empty, last accessed 2023"
Lead: approve "Proceed with caution"
Worker: executes

状态机定义

VALID_FLOWS = {
("lead", "shutdown_request"): [
("worker", "shutdown_ack"),
("worker", "shutdown_reject")
],
("worker", "plan_for_approval"): [
("lead", "approve"),
("lead", "reject"),
("lead", "request_changes")
]
}

关键设计点

设计点 说明
请求-响应模式 每个请求有明确的合法响应集合
状态机约束 防止任意消息导致的混乱
高风险审批 敏感操作需要显式审批流程

学习收获

  1. 协议是协作的契约:没有协议的多 Agent 系统会陷入混乱
  2. 状态机约束行为:明确什么状态下可以有什么响应
  3. 审批流程保障安全:高风险操作必须有确认机制

第十一层 s11:Autonomous Agents

核心问题

主 Agent 逐个分配效率低,如何让 Agent 主动发现和认领工作?

关键机制

class AutonomousAgent:
def run_idle_cycle(self):
# 1. 检查是否有分配的消息
messages = self.mailbox.read_inbox(unread_only=True)
if messages:
return self._handle_assigned(messages[0])

# 2. 扫描任务板,寻找匹配的任务
ready_tasks = self.task_manager.get_ready_tasks()
for task in ready_tasks:
if self._can_handle(task): # 能力匹配
if self.task_manager.claim_task(task.id, self.agent_id):
return self._execute(task)

# 3. 无工作,休眠后重试
time.sleep(5)
return None

def _can_handle(self, task) -> bool:
"""判断任务是否与 Agent 能力匹配"""
return any(cap in task.tags for cap in self.capabilities)

关键设计点

设计点 说明
能力匹配 Agent 只认领自己能处理的任务
原子认领 防止多个 Agent 同时认领同一任务
空闲循环 没有任务时休眠,定期重试

学习收获

  1. 从分配到自主认领:效率提升来自减少协调成本
  2. 能力标签机制:Agent 根据标签匹配任务,类似技能匹配
  3. 空闲循环设计:Agent 不是被动等待,而是主动扫描工作

第十二层 s12:Worktree Isolation

核心问题

多个 Agent 并行工作时,文件系统冲突如何解决?

关键机制

class GitWorktreeManager:
def create_worktree(self, task_id: str) -> str:
"""每个任务一个独立分支 + 目录"""
worktree_path = f"/tmp/worktrees/task_{task_id}"
branch_name = f"agent-task-{task_id}"

subprocess.run([
"git", "worktree", "add", "-b", branch_name, worktree_path
])
return worktree_path

def execute_in_worktree(self, task_id, work_fn):
"""在隔离目录中执行"""
worktree_path = self.get_path(task_id)
original = os.getcwd()
try:
os.chdir(worktree_path)
return work_fn()
finally:
os.chdir(original)

def merge_and_cleanup(self, task_id):
"""合并结果并清理"""
subprocess.run(["git", "checkout", "main"])
subprocess.run(["git", "merge", f"agent-task-{task_id}"])
subprocess.run(["git", "worktree", "remove", self.get_path(task_id)])

关键设计点

设计点 说明
物理隔离 每个任务独立的 Git worktree
版本控制 自动分支管理,可追溯
合并清理 任务完成后合并到主线
并发安全 文件系统级别的隔离

学习收获

  1. 软件隔离不够:Subagent 隔离上下文,但文件操作仍可能冲突
  2. 物理隔离解决并发安全:Git worktree 提供独立的文件系统空间
  3. 工作树与任务绑定:每个任务有独立的 workspace,互不影响

十二层架构总结

分层演进逻辑

Phase 1 (基础):     s01(Loop) → s02(Tools)        → 最简可运行
Phase 2 (规划): s03(Todo) → s04(Subagent) → 防止漂移,管理复杂度
Phase 3 (资源): s05(Skills) → s06(Compress) → 知识管理,资源限制
Phase 4 (持久): s07(Tasks) → s08(Background) → 跨会话,并行执行
Phase 5 (协作): s09(Teams) → s10(Protocols) → 多 Agent,协作规则
Phase 6 (自治): s11(Autonomous) → s12(Worktree) → 主动认领,物理隔离

每层核心收获

核心问题 关键机制 学习收获
s01 如何让模型持续交互 stop_reason + 状态回传 循环的本质是状态反馈
s02 如何扩展工具 注册表模式 工具描述质量决定调用准确率
s03 如何防止漂移 TodoWrite 外化计划 外化认知减少遗忘
s04 如何管理复杂度 Subagent 上下文隔离 隔离是管理复杂度的手段
s05 如何管理知识 Skills 懒加载 知识应该按需获取
s06 如何处理长会话 三层渐进压缩 有策略地丢弃信息
s07 如何跨会话保持 Task 持久化 + 依赖图 持久化让 Agent 有记忆
s08 如何并行工作 Background + 通知 并行提升效率
s09 如何多 Agent 协作 Mailbox 异步消息 消息解耦发送和接收
s10 如何规范协作 状态机协议 协议是协作的契约
s11 如何提高效率 自主认领 减少协调成本
s12 如何解决并发冲突 Worktree 物理隔离 物理隔离解决并发安全

可迁移的最小子集

单 Agent 系统(3 层):

  1. s01 Agent Loop - 所有 Harness 的基础
  2. s03 TodoWrite - 防止计划漂移
  3. s07 Task System - 跨会话持久化

多 Agent 系统(+2 层):
4. s09 Teams - 异步通信机制
5. s12 Worktree - 物理隔离


延伸阅读

本文是对 learn-claude-code 项目十二层架构的学习总结,每层聚焦核心问题、关键机制和学习收获。