# 不好的做法:每次加工具都要改这里 for block in response.content: if block.type == "tool_use": if block.name == "bash": result = bash_tool(**block.input) elif block.name == "read_file": result = read_file_tool(**block.input) elif block.name == "new_tool": # 每次都要加 elif! result = new_tool(**block.input)
@register_tool( name="bash", description="Execute a bash command in the working directory. Use this for file operations, running scripts, or system commands.", input_schema={ "type": "object", "properties": { "command": { "type": "string", "description": "The bash command to execute" } }, "required": ["command"] } ) defbash_tool(command: str) -> str: """执行 shell 命令,给 Agent 一个终端""" import subprocess try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=30, # 生产级:防止卡死 cwd=os.getcwd() ) if result.returncode == 0: return result.stdout else: returnf"Error (exit code {result.returncode}): {result.stderr}" except subprocess.TimeoutExpired: returnf"Error: Command timed out after 30 seconds" except Exception as e: returnf"Error: {str(e)}"
@register_tool( name="read_file", description="Read the contents of a file at the specified path. Use this when you need to examine file contents for understanding, implementation, or debugging. Do not use for directories or binary files.", input_schema={ "type": "object", "properties": { "path": { "type": "string", "description": "The path to the file to read" }, "offset": { "type": "integer", "description": "Line number to start reading from (optional)" }, "limit": { "type": "integer", "description": "Maximum number of lines to read (optional)" } }, "required": ["path"] } ) defread_file_tool(path: str, offset: int = None, limit: int = None) -> str: """读取文件内容,支持分页""" try: withopen(path, 'r', encoding='utf-8') as f: if offset: for _ inrange(offset - 1): next(f, None) if limit: lines = [] for i, line inenumerate(f): if i >= limit: break lines.append(line) content = ''.join(lines) else: content = f.read() # 大文件提示 iflen(content) > 10000: content = content[:10000] + "\n... [truncated, use offset/limit to read more]" return content except FileNotFoundError: returnf"Error: File '{path}' not found" except UnicodeDecodeError: returnf"Error: File '{path}' is not text-readable (binary file?)" except Exception as e: returnf"Error reading file: {str(e)}"
@register_tool( name="edit_file", description="Apply edits to a file. Use this for making precise changes to file content.", input_schema={ "type": "object", "properties": { "path": {"type": "string", "description": "File path"}, "old_string": {"type": "string", "description": "Text to replace"}, "new_string": {"type": "string", "description": "Replacement text"} }, "required": ["path", "old_string", "new_string"] } ) defedit_file_tool(path: str, old_string: str, new_string: str) -> str: """精确编辑文件内容""" try: withopen(path, 'r', encoding='utf-8') as f: content = f.read() if old_string notin content: returnf"Error: Could not find the text to replace in {path}" new_content = content.replace(old_string, new_string, 1) withopen(path, 'w', encoding='utf-8') as f: f.write(new_content) returnf"Successfully edited {path}" except Exception as e: returnf"Error editing file: {str(e)}"
# ========== 添加新工具,循环代码完全不变 ==========
@register_tool( name="get_weather", description="Get current weather information for a city. Use this when the user asks about weather conditions or temperature.", input_schema={ "type": "object", "properties": { "city": { "type": "string", "description": "City name, e.g., 'Beijing', 'New York'" } }, "required": ["city"] } ) defget_weather(city: str) -> str: """查询城市天气""" import requests try: resp = requests.get( f"https://api.weather.com/v1/current?city={city}", timeout=10 ) data = resp.json() returnf"{city}: {data['temp']}°C, {data['condition']}" except Exception as e: returnf"Error fetching weather: {str(e)}"
classTodoManager: """ 任务管理器:让 Agent 能够制定和跟踪计划 关键设计: - Agent 通过工具主动更新计划(不是被动记录) - 计划状态在上下文中始终可见 - 支持提醒机制(Nag)防止计划被遗忘 """ def__init__(self): self.todos: List[TodoItem] = [] self.last_nag_round = 0# 上次提醒的轮次 deftodo_write(self, updates: List[Dict]) -> str: """ 工具函数:Agent 调用此函数更新计划 Args: updates: 更新操作列表 - {"type": "add", "content": "...", "status": "pending"} - {"type": "update", "id": "1", "status": "done"} - {"type": "delete", "id": "1"} Returns: 当前完整清单的格式化字符串 """ for update in updates: if update["type"] == "add": todo = TodoItem( id=str(len(self.todos) + 1), content=update["content"], status=update.get("status", "pending"), created_at=datetime.now() ) self.todos.append(todo) elif update["type"] == "update": for todo inself.todos: if todo.id == update["id"]: todo.status = update["status"] todo.updated_at = datetime.now() if"notes"in update: todo.notes = update["notes"] elif update["type"] == "delete": self.todos = [t for t inself.todos if t.id != update["id"]] returnself.format_list() defformat_list(self) -> str: """格式化任务清单供模型查看""" ifnotself.todos: return"No tasks currently." lines = ["Current Task List:"] for todo inself.todos: # 状态图标 if todo.status == "done": icon = "✓" elif todo.status == "in_progress": icon = "►" else: icon = "○" lines.append(f"{icon} [{todo.id}] {todo.content}") # 显示备注(如果有) if todo.notes: lines.append(f" Note: {todo.notes}") # 统计信息 done_count = sum(1for t inself.todos if t.status == "done") lines.append(f"\nProgress: {done_count}/{len(self.todos)} completed") return"\n".join(lines) defget_pending_count(self) -> int: """获取未完成任务数""" returnsum(1for t inself.todos if t.status != "done") defget_in_progress(self) -> List[TodoItem]: """获取进行中任务""" return [t for t inself.todos if t.status == "in_progress"]
# 在 System Prompt 中引导使用 SYSTEM_PROMPT = """ You are a coding assistant. Follow this workflow: 1. **Plan**: When given a task, first use todo_write to create a step-by-step plan 2. **Execute**: Work through each step, marking items done as you complete them 3. **Adapt**: Update the plan if your understanding of the task changes 4. **Verify**: Check your progress against the plan before finishing Always keep the todo list updated. It helps you stay organized and prevents forgetting important steps. """
defbuild_context_with_nag(messages: List[Dict], todo_manager: TodoManager, current_round: int) -> List[Dict]: """ Nag 提醒机制:长时间对话后提醒未完成任务 原理:当对话轮次增多但计划未完成时,温和地提醒 Agent """ pending = todo_manager.get_pending_count() in_progress = len(todo_manager.get_in_progress()) # 每 10 轮提醒一次,如果还有未完成任务 if pending > 0and current_round - todo_manager.last_nag_round >= 10: todo_manager.last_nag_round = current_round reminder = f""" [Note: You have {pending} unfinished tasks ({in_progress} in progress). Current plan status:\ {todo_manager.format_list()} Consider whether you should continue with the current plan or adjust it based on new information.] """ messages.append({"role": "system", "content": reminder}) return messages
Lead: shutdown_request "Finish in-flight tasks, then stop" Worker: shutdown_ack "Will stop after current task" [Worker completes task] Worker: stops gracefully
计划审批流程(高风险操作):
Worker: plan_for_approval "Plan to delete production DB" Lead: request_changes "Verify table is not in use" Worker: plan_for_approval "Verified: table empty, last accessed 2023" Lead: approve "Proceed with caution" Worker: executes