Agents设计模式

Agents设计模式

一、提示词链

提示词链有时候也称作管道模式,是利用大模型LLM处理复杂人物的强大凡是。这种方式不在要求LLM在单一的整体化步骤中解决复杂问题,而是采用分而治之的策略:将原来复杂的问题进一步分别为一系列更小,更容易理解的子问题,每个子问题通过专门设计的提示词单独处理,一个提示词的输出会作为策略传递给下一个提示词。

提示词链一般模式:

  1. 初始化提示词(总结)
  2. 第二个提示词(趋势识别)
  3. 第三个提示词(极化步骤)
    .......

二、路由

路由是将条件引入了智能体框架之中

常见的路由:

  1. 基于LLM的路由:由LLM模型选择下一步该执行哪个或者哪些agents
  2. 基于嵌入的路由:输入查询向量嵌入,然后将此向量与不同路由的描述进行比较,获取相似度更高的路由
  3. 基于规则的路由:硬编码
  4. 基于机器学习模型的路由:使用传统机器学习的分类器、决策树等方式进行路由判定

示例代码:

  1. 基于LLM的路由选择
from __future__ import annotations

import logging
from typing import Any

import requests

from router.core import Agent, RoutingStrategy

logger = logging.getLogger(__name__)


class LLMRouter:
    def __init__(self, api_key: str, base_url: str, model: str):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model

    def _build_prompt(self, agents: list[Agent]) -> str:
        lines = [
            "You are a routing assistant. Given a user query, select the most appropriate agent from the list below.",
            "Respond ONLY with the agent name, nothing else.",
            "",
            "Available agents:",
        ]
        for agent in agents:
            lines.append(f"- {agent.name}: {agent.description}")
        lines.append("")
        lines.append("Agent name:")
        return "\n".join(lines)

    def route(self, query: str, agents: list[Agent]) -> Agent | None:
        system_prompt = self._build_prompt(agents)
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": query},
            ],
            "temperature": 0.0,
            "max_tokens": 50,
        }
        try:
            resp = requests.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload,
                timeout=60,
            )
            resp.raise_for_status()
            result = resp.json()
            agent_name = result["choices"][0]["message"]["content"].strip()
            for agent in agents:
                if agent.name == agent_name:
                    logger.info("LLMRouter matched: %s", agent.name)
                    return agent
            logger.warning("LLMRouter returned unknown agent name: %s", agent_name)
        except Exception as exc:
            logger.error("LLMRouter API call failed: %s", exc)
        return None
  1. 基于Embedding的路由

from __future__ import annotations

import logging
import math
from typing import Any

import requests

from router.core import Agent, RoutingStrategy

logger = logging.getLogger(__name__)


class EmbeddingRouter:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        threshold: float = 0.6,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.threshold = threshold
        self._agent_embeddings: dict[str, list[float]] = {}

    def _get_embedding(self, text: str) -> list[float]:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {"model": self.model, "input": text}
        resp = requests.post(
            f"{self.base_url}/embeddings",
            headers=headers,
            json=payload,
            timeout=30,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["data"][0]["embedding"]

    @staticmethod
    def _cosine_similarity(a: list[float], b: list[float]) -> float:
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = math.sqrt(sum(x * x for x in a))
        norm_b = math.sqrt(sum(x * x for x in b))
        if norm_a == 0.0 or norm_b == 0.0:
            return 0.0
        return dot / (norm_a * norm_b)

    def warmup(self, agents: list[Agent]) -> None:
        for agent in agents:
            try:
                self._agent_embeddings[agent.name] = self._get_embedding(agent.description)
                logger.info("Precomputed embedding for %s", agent.name)
            except Exception as exc:
                logger.warning("Failed to precompute embedding for %s: %s", agent.name, exc)
                raise

    def route(self, query: str, agents: list[Agent]) -> Agent | None:
        if not self._agent_embeddings:
            self.warmup(agents)

        try:
            query_embedding = self._get_embedding(query)
        except Exception as exc:
            logger.warning("EmbeddingRouter failed to get query embedding: %s", exc)
            return None

        best_agent: Agent | None = None
        best_score = -1.0
        for agent in agents:
            agent_emb = self._agent_embeddings.get(agent.name)
            if agent_emb is None:
                continue
            score = self._cosine_similarity(query_embedding, agent_emb)
            if score > best_score:
                best_score = score
                best_agent = agent

        if best_agent is not None and best_score >= self.threshold:
            logger.info(
                "EmbeddingRouter matched: %s (score=%.3f)",
                best_agent.name,
                best_score,
            )
            return best_agent
        logger.info("EmbeddingRouter found no match above threshold (best=%.3f)", best_score)
        return None

  1. 基于规则的路由

from __future__ import annotations

import logging
import re

from router.core import Agent, RoutingStrategy

logger = logging.getLogger(__name__)


class RuleRouter:
    def __init__(self, case_sensitive: bool = False):
        self.case_sensitive = case_sensitive

    def route(self, query: str, agents: list[Agent]) -> Agent | None:
        flags = 0 if self.case_sensitive else re.IGNORECASE
        for agent in agents:
            if not agent.rules:
                continue
            for rule in agent.rules:
                if re.search(rule, query, flags):
                    logger.info("RuleRouter matched: %s", agent.name)
                    return agent
        logger.info("RuleRouter found no match")
        return None

三、并行化

在我们常见的工作流编排中,或者常见的任务处理过程中,经常会遇到多个子任务同时进行的情况,如:长文本分割读取合并概要提取,A/B同时审核。这时候就需要引入并行处理的概念,Agents编排中对并行任务也是比较基础的一个要求

代码示例(长文本,如要多agent并行,请换为agent):


from __future__ import annotations

import logging
from typing import Any

import requests

from parallel.core import Aggregator, ParallelWorkflow, TaskResult
## 也可以直接使用langchain提供的并行agents处理
### from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough
#### summarize_chain: Runnable = (
####    ChatPromptTemplate.from_maessage([
####        ("system", "请总结一下摘要"),
####        ("user", "{topic}")
####    ])
####    | llm | StrOuputParser()
#### )
#### question_chain: Runable = ()
####  ........
#### terms_chain: Runnable = ()
#### ..........
#### map_chain = RunnablePallel(
####    ChatPromptTemplate.from_message([
####    {
####        "summary": summarize_chain,
####        "question": question_chain,
####        "terms": terms_chain,
####        "topic": RunnablePassThrough()
####    }
####    
#### ])
####)
###### 总结chain
#### prompt_ended =  ChatPromptTemplate.from_massage([....])
#### ....
####full_chain =  map_chain | llm | StrOuputParser()
####

logger = logging.getLogger(__name__)


class ConcatAggregator(Aggregator):
    def aggregate(self, results: list[TaskResult]) -> str:
        return "\n\n".join(r.output for r in results if r.success)


class LLMReduceAggregator(Aggregator):
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        prompt_template: str = (
            "请根据以下各片段的摘要,生成一份连贯的最终摘要:\n\n{chunks}\n\n最终摘要:"
        ),
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.prompt_template = prompt_template

    def aggregate(self, results: list[TaskResult]) -> str:
        chunks = "\n\n".join(
            f"片段 {i+1}:\n{r.output}"
            for i, r in enumerate(results)
            if r.success
        )
        prompt = self.prompt_template.format(chunks=chunks)
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": [
                {"role": "user", "content": prompt},
            ],
            "temperature": 0.3,
            "max_tokens": 500,
        }
        try:
            resp = requests.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload,
                timeout=60,
            )
            resp.raise_for_status()
            data = resp.json()
            return data["choices"][0]["message"]["content"].strip()
        except Exception as exc:
            logger.error("LLMReduceAggregator failed: %s", exc)
            return chunks


class MapReduceSummarizer:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        chunk_size: int = 200,
        max_workers: int = 4,
        use_llm_reduce: bool = True,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.chunk_size = chunk_size
        self.max_workers = max_workers
        self.use_llm_reduce = use_llm_reduce

    def _split_text(self, text: str) -> list[str]:
        paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
        chunks: list[str] = []
        current = ""
        for p in paragraphs:
            if len(current) + len(p) + 1 > self.chunk_size and current:
                chunks.append(current)
                current = p
            else:
                current = f"{current}\n{p}" if current else p
        if current:
            chunks.append(current)
        return chunks if chunks else [text]

    def _summarize_chunk(self, chunk: str) -> str:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        prompt = f"请对以下文本进行简要总结(保留核心观点):\n\n{chunk}\n\n摘要:"
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.3,
            "max_tokens": 300,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["choices"][0]["message"]["content"].strip()

    def summarize(self, text: str) -> str:
        chunks = self._split_text(text)
        logger.info("MapReduceSummarizer: split into %d chunks", len(chunks))

        tasks = {
            f"chunk_{i}": lambda c=c: self._summarize_chunk(c)
            for i, c in enumerate(chunks)
        }

        aggregator: Aggregator
        if self.use_llm_reduce:
            aggregator = LLMReduceAggregator(
                api_key=self.api_key,
                base_url=self.base_url,
                model=self.model,
            )
        else:
            aggregator = ConcatAggregator()

        workflow = ParallelWorkflow(max_workers=self.max_workers)
        return workflow.run_tasks(tasks, aggregator=aggregator)

四、反思

反思模式概述:

反思模式是指智能体评估自身工作、输出或者内部状态,并利用该评估来提升或优化性能。这种自我纠正与自我改进的前提。

通常或称为:

  1. 执行
  2. 评估/评审
  3. 反思/优化:基于评审意见确定改进方向,一般可以记录下来或者直接调用优化agent进行改进
  4. 迭代:优化后的输出作为改进意见加入流程中,一般是写入某个环节的提示词中

注意:反思应该设置最大反思上限,方式盲目的陷入循环消耗不必要的Token

示例代码(为上一个并行处理流程的反思示例代码):


from __future__ import annotations

import logging
from typing import Any

import requests

from parallel.core import Aggregator, ParallelWorkflow, TaskResult

logger = logging.getLogger(__name__)


class ReviewVoteAggregator(Aggregator):
    def aggregate(self, results: list[TaskResult]) -> str:
        lines = ["===== 并行评审结果 =====", ""]
        passed = 0
        failed = 0
        for r in results:
            status = "通过" if r.success else "失败"
            if r.success:
                passed += 1
            else:
                failed += 1
            lines.append(f"【{r.task_id}】状态: {status}")
            lines.append(f"评审意见: {r.output}")
            if r.error:
                lines.append(f"错误: {r.error}")
            lines.append("")
        lines.append(f"汇总: {passed} 通过, {failed} 未通过")
        return "\n".join(lines)


class FanOutReviewer:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        max_workers: int = 4,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.max_workers = max_workers

    def _review(
        self,
        content: str,
        reviewer_name: str,
        persona: str,
        criteria: list[str],
    ) -> str:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        criteria_text = "\n".join(f"- {c}" for c in criteria)
        prompt = (
            f"你是一位{persona}。请对以下内容进行评审。\n\n"
            f"评审标准:\n{criteria_text}\n\n"
            f"内容:\n{content}\n\n"
            f"请给出是否通过(是/否)以及简要理由。格式:\n通过: 是/否\n理由: ..."
        )
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.2,
            "max_tokens": 300,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["choices"][0]["message"]["content"].strip()

    def review(self, content: str) -> str:
        reviewers = {
            "技术审核": {
                "persona": "资深技术专家",
                "criteria": [
                    "代码逻辑是否清晰",
                    "是否有潜在的边界情况未处理",
                    "命名是否规范",
                ],
            },
            "安全审核": {
                "persona": "安全工程师",
                "criteria": [
                    "是否存在注入风险(SQL/命令/代码注入)",
                    "是否有敏感信息泄露风险",
                    "输入是否经过验证",
                ],
            },
            "语法审核": {
                "persona": "严谨的语法检查员",
                "criteria": [
                    "语法是否正确",
                    "表达是否通顺",
                    "格式是否规范",
                ],
            },
        }

        tasks = {
            name: lambda c=content, n=name, r=cfg: self._review(
                c, n, r["persona"], r["criteria"]
            )
            for name, cfg in reviewers.items()
        }

        workflow = ParallelWorkflow(max_workers=self.max_workers)
        return workflow.run_tasks(tasks, aggregator=ReviewVoteAggregator())

五、工具使用

工具使用模式通常通过函数调用(Function Calling)机制实现,是智能体能够与外部的数据,接口进行交互

该流程通常包括(这个过程也可以不使用LLM,比如使用机器学习,规则等):

  1. 工具定于:向LLM定义并描述的外部函数或者能力。
  2. LLM决策:LLM接收用户的请求和可用的工具定义来决策是否要调用多个或者一个外部工具。
  3. 函数调用生成
  4. 工具执行
  5. 结果:工具执行之后将结果返回给智能体
  6. LLM处理:
    ........

示例代码(工具实现省略):


from __future__ import annotations

import json
import logging
from typing import Any

import requests

from tools.core import ToolExecutor

logger = logging.getLogger(__name__)


class ToolAgent:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        tool_executor: ToolExecutor,
        max_iterations: int = 3,
        system_prompt: str | None = None,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.tool_executor = tool_executor
        self.max_iterations = max_iterations
        self.system_prompt = system_prompt or (
            "你是一个 helpful assistant,可以使用外部工具来帮助用户解决问题。"
            "当你需要调用工具时,请使用 function_call。"
        )

    def _call_llm(self, messages: list[dict[str, Any]]) -> dict[str, Any]:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "tools": self.tool_executor.get_definitions(),
            "temperature": 0.3,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        return resp.json()

    @staticmethod
    def _extract_tool_calls(message: dict[str, Any]) -> list[dict[str, Any]]:
        return message.get("tool_calls") or []

    def run(self, user_query: str) -> str:
        messages: list[dict[str, Any]] = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": user_query},
        ]

        for iteration in range(self.max_iterations):
            logger.info("ToolAgent iteration %d/%d", iteration + 1, self.max_iterations)
            response = self._call_llm(messages)
            message = response["choices"][0]["message"]

            tool_calls = self._extract_tool_calls(message)
            if not tool_calls:
                return message.get("content", "")

            messages.append({
                "role": "assistant",
                "content": message.get("content", ""),
                "tool_calls": tool_calls,
            })

            for tc in tool_calls:
                fn = tc.get("function", {})
                tool_name = fn.get("name", "")
                try:
                    arguments = json.loads(fn.get("arguments", "{}"))
                except json.JSONDecodeError:
                    arguments = {}

                logger.info("Executing tool: %s(%s)", tool_name, arguments)
                result = self.tool_executor.execute(tool_name, arguments)
                messages.append({
                    "role": "tool",
                    "tool_call_id": tc.get("id", ""),
                    "name": tool_name,
                    "content": result.output if result.success else f"Error: {result.error}",
                })

        return "达到最大迭代次数,未能完成请求。"

六、规划/Plan

规划是自主系统中的核心计算过程,使智能体综合一系列行动以实现指定目标,特别在动态复杂的花井中。这个过程将高级目标转换为离线的可以实现的步骤。规划可能承接各个智能体或者工具,使得LLM在工程上具备了一定的实现可能。

示例代码:


from __future__ import annotations

import json
import logging
from dataclasses import dataclass
from typing import Any, Callable

import requests

logger = logging.getLogger(__name__)


@dataclass
class PlanStep:
    step_id: int
    description: str
    handler: Callable[[], str] | None = None


class PlanExecutor:
    def __init__(self, steps: list[PlanStep] | None = None):
        self.steps: list[PlanStep] = steps or []
        self.results: list[str] = []

    def add_step(self, step: PlanStep) -> None:
        self.steps.append(step)

    def execute(self) -> str:
        self.results = []
        for step in self.steps:
            logger.info("Executing step %d: %s", step.step_id, step.description)
            if step.handler:
                try:
                    result = step.handler()
                except Exception as exc:
                    result = f"步骤执行失败: {exc}"
                    logger.error("Step %d failed: %s", step.step_id, exc)
            else:
                result = f"[步骤 {step.step_id}] {step.description}"
            self.results.append(result)
            logger.info("Step %d finished", step.step_id)
        return "\n".join(
            f"步骤 {i + 1}: {step.description}\n结果: {res}"
            for i, (step, res) in enumerate(zip(self.steps, self.results))
        )


class PlanAgent:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        max_steps: int = 5,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.max_steps = max_steps

    def _call_llm(self, messages: list[dict[str, Any]]) -> str:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "temperature": 0.3,
            "max_tokens": 800,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["choices"][0]["message"]["content"].strip()

    def plan(self, task: str) -> list[PlanStep]:
        prompt = (
            f"请将以下任务分解为不超过 {self.max_steps} 个可执行的步骤。"
            f"每个步骤应简洁明确,适合逐步执行。\n\n"
            f"任务: {task}\n\n"
            f"请用 JSON 数组格式返回步骤列表,每个元素包含 'description' 字段。\n"
            f"示例: [{{'description': '步骤1描述'}}, {{'description': '步骤2描述'}}]\n\n"
            f"步骤列表:"
        )
        messages = [
            {
                "role": "system",
                "content": "你是一个任务规划助手,擅长将复杂任务分解为清晰的执行步骤。",
            },
            {"role": "user", "content": prompt},
        ]
        raw = self._call_llm(messages)
        try:
            start = raw.find("[")
            end = raw.rfind("]")
            if start != -1 and end != -1:
                raw = raw[start : end + 1]
            data = json.loads(raw)
            return [
                PlanStep(step_id=i + 1, description=item["description"])
                for i, item in enumerate(data)
            ]
        except Exception as exc:
            logger.error("Failed to parse plan JSON: %s", exc)
            return [
                PlanStep(step_id=1, description="分析任务目标"),
                PlanStep(step_id=2, description="收集必要信息"),
                PlanStep(step_id=3, description="执行任务核心步骤"),
                PlanStep(step_id=4, description="验证结果"),
            ]

    def run(self, task: str) -> str:
        steps = self.plan(task)
        executor = PlanExecutor(steps)
        return executor.execute()

七、多智能体协作

多智能体写作涉及设计系统,其中多个独立或者半独立的智能体协同工作以实现共同目标。没给智能体有明确定义的角色、与总体目标一致的特定目标,并且可以访问工具或者知识库。

协作模式可以采用多种形式:

  1. 顺序交接
  2. 并行处理
  3. 辩论与共识
  4. 层次结构
  5. 专家团队
  6. 反思

示例代码:


from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import Any

import requests

logger = logging.getLogger(__name__)


@dataclass
class CollaborativeAgent:
    name: str
    role: str
    system_prompt: str
    api_key: str
    base_url: str
    model: str

    def run(self, input_text: str) -> str:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        prompt = (
            f"你是{self.role}。请根据以下输入完成你的工作。\n\n"
            f"输入:\n{input_text}\n\n"
            f"输出:"
        )
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": prompt},
            ],
            "temperature": 0.5,
            "max_tokens": 800,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        data = resp.json()
        output = data["choices"][0]["message"]["content"].strip()
        logger.info("Agent '%s' finished", self.name)
        return output


class PipelineCollaborator:
    def __init__(self, agents: list[CollaborativeAgent]):
        self.agents = agents

    def run(self, topic: str) -> str:
        result = topic
        for agent in self.agents:
            logger.info("Pipeline step: %s (%s)", agent.name, agent.role)
            result = agent.run(result)
        return result

八、记忆管理

由于LLM上下文限制,记忆功能对于智能体尤为重要。一般可以将记忆分为:

  1. 短期记忆:一般存在于一个工作流程之内,用于存放用户在这个工作流程中的操作信息,也称作上下文记忆。
  2. 长期记忆:一般在智能体工具调用,或者流程发生之前获取,比如:知识库,数据库,外部接口等。长期记忆可以由短期记忆转变而来,比如:需要缓存的一些高频信息,需要长期存储的一些用户习惯,反思中存在的问题等

代码实现(长期记忆暂存于文件中):


from __future__ import annotations

import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import requests

logger = logging.getLogger(__name__)


@dataclass
class ShortTermMemory:
    max_turns: int = 10
    messages: list[dict[str, str]] = field(default_factory=list)

    def add_user(self, content: str) -> None:
        self.messages.append({"role": "user", "content": content})
        self._trim()

    def add_assistant(self, content: str) -> None:
        self.messages.append({"role": "assistant", "content": content})
        self._trim()

    def _trim(self) -> None:
        while len(self.messages) > self.max_turns * 2:
            self.messages.pop(0)

    def get_messages(self) -> list[dict[str, str]]:
        return list(self.messages)

    def clear(self) -> None:
        self.messages.clear()


class LongTermMemory:
    def __init__(self, storage_path: str = "long_term_memory.json"):
        self.storage_path = Path(storage_path)
        self._memories: list[str] = []
        self._load()

    def _load(self) -> None:
        if self.storage_path.exists():
            try:
                data = json.loads(self.storage_path.read_text(encoding="utf-8"))
                self._memories = data if isinstance(data, list) else []
            except Exception as exc:
                logger.warning("Failed to load long-term memory: %s", exc)
                self._memories = []

    def save(self) -> None:
        try:
            self.storage_path.write_text(
                json.dumps(self._memories, ensure_ascii=False, indent=2),
                encoding="utf-8",
            )
            logger.info("Long-term memory saved to %s", self.storage_path)
        except Exception as exc:
            logger.error("Failed to save long-term memory: %s", exc)

    def add(self, memory: str) -> None:
        self._memories.append(memory)
        self.save()

    def extend(self, memories: list[str]) -> None:
        self._memories.extend(memories)
        self.save()

    def get_all(self) -> list[str]:
        return list(self._memories)

    def to_prompt(self) -> str:
        if not self._memories:
            return ""
        lines = ["### 已知信息(长期记忆)", ""]
        for i, m in enumerate(self._memories, 1):
            lines.append(f"{i}. {m}")
        return "\n".join(lines)


class MemoryExtractor:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model

    def extract(self, messages: list[dict[str, str]]) -> list[str]:
        conversation = "\n".join(
            f"{'用户' if m['role'] == 'user' else '助手'}: {m['content']}"
            for m in messages
        )
        prompt = (
            "请从以下对话中抽取关于用户的关键信息(如偏好、习惯、重要事实),"
            "每条信息用一句话概括。如果对话内容为空或无有效信息,返回空数组。\n\n"
            f"对话:\n{conversation}\n\n"
            "请用 JSON 数组格式返回,每条为一个字符串。示例: [\"用户喜欢 Python\", \"用户在学 AI\"]\n"
            "结果:"
        )
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": [
                {
                    "role": "system",
                    "content": "你是一个信息提取助手,擅长从对话中抽取关键事实。",
                },
                {"role": "user", "content": prompt},
            ],
            "temperature": 0.2,
            "max_tokens": 500,
        }
        try:
            resp = requests.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload,
                timeout=60,
            )
            resp.raise_for_status()
            data = resp.json()
            raw = data["choices"][0]["message"]["content"].strip()
            start = raw.find("[")
            end = raw.rfind("]")
            if start != -1 and end != -1:
                raw = raw[start : end + 1]
            result = json.loads(raw)
            if isinstance(result, list):
                valid = [str(item) for item in result if item]
                logger.info("Extracted %d long-term memories", len(valid))
                return valid
        except Exception as exc:
            logger.error("Memory extraction failed: %s", exc)
        return []


class MemoryAgent:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        short_term: ShortTermMemory | None = None,
        long_term: LongTermMemory | None = None,
        extractor: MemoryExtractor | None = None,
        system_prompt: str | None = None,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.short_term = short_term or ShortTermMemory()
        self.long_term = long_term or LongTermMemory()
        self.extractor = extractor or MemoryExtractor(api_key, base_url, model)
        self.system_prompt = system_prompt or (
            "你是一个 helpful assistant,能够记住之前的对话内容。"
        )

    def _build_system_content(self) -> str:
        ltm_prompt = self.long_term.to_prompt()
        if ltm_prompt:
            return f"{self.system_prompt}\n\n{ltm_prompt}"
        return self.system_prompt

    def _call_llm(self, messages: list[dict[str, Any]]) -> str:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "temperature": 0.5,
            "max_tokens": 800,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["choices"][0]["message"]["content"].strip()

    def chat(self, user_input: str) -> str:
        self.short_term.add_user(user_input)
        messages: list[dict[str, Any]] = [
            {"role": "system", "content": self._build_system_content()},
        ]
        messages.extend(self.short_term.get_messages())
        response = self._call_llm(messages)
        self.short_term.add_assistant(response)
        return response

    def consolidate(self) -> list[str]:
        memories = self.extractor.extract(self.short_term.get_messages())
        if memories:
            self.long_term.extend(memories)
        self.short_term.clear()
        return memories

九、学习于适应

智能体通过基于新经验或者数据改变思维方式、行动或知识来实现学些和适应。这让智能体从简单的遵循指令,逐步演变为更加切近于具体业务的系统。常见的学习方式:

  1. 强化学习:通过反思极致来对消极结果进行惩罚,从而在动态环境中学习最优行为。
  2. 监督学习:通过预先提供的标注示例等信息,来学习输入与映射之间的关系。
  3. 无监督学习:智能体在未学习标注信息的情况下,构建的输入与结果之间的映射关系。
  4. 基于LLM的少量样本、零样本学习:利用大模型作为智能基座,提供少量的指令或者标记数据,实现对输入与结果之间的映射。
  5. 在线学习:智能体在运行过程中持续从外部获取知识,更新知识库。
  6. 基于记忆的学习:智能体从短期记忆中抽取信息,增加到长期记忆中,或者对抽取的信息进行标记输入到智能体中,从而建立输入与结果之间的映射关系。

近端策略优化(PPO):

近端策略优化是一种强化学习方法,在于在动作范围连续的环境中训练智能体,例如:机器人关节或游戏角色。

策略:
  1. 收集信息
  2. 评估替代目标
  3. 剪裁机制:在当前策略周围创建一个”信任区域“或安全区,阻止算法进行与当前策略差异过大的更新。

直接策略优化(DPO)

策略:
  1. 训练奖励模型
  2. 使用DPO数据微调

代码示例(略):太多了


十、模型上下文协议(MCP)

MCP与工具调用具有一定相似性,都是向模型提供外部数据,如:数据库,知识库,服务等,一般界限是比较模糊的,但也具备一定的差异:

  1. 工具函数调用可以被视为LLM对特定预定义的工具或函数的直接请求。
  2. MCP协议作为LLM发现、通信和使用外部工具能力的标准化接口运行。她作为开放协议,促进与各个工具和系统的交互,旨在建立一个兼容任何工具都可以被任何兼容LLM访问的生态体系。
    即:MCP相对于工具/函数调用,更加标准化,协议更加开放,范围也更加的广泛。

示例代码(无FastMCP服务):


from __future__ import annotations

import json
import logging
import subprocess
import sys
from typing import Any, Callable

import requests

logger = logging.getLogger(__name__)


class MCPServer:
    def __init__(self):
        self._tools: dict[str, dict[str, Any]] = {}
        self._handlers: dict[str, Callable[..., str]] = {}

    def register_tool(
        self, name: str, description: str, parameters: dict[str, Any], handler: Callable[..., str]
    ) -> None:
        self._tools[name] = {
            "name": name,
            "description": description,
            "parameters": parameters,
        }
        self._handlers[name] = handler
        logger.info("MCP Server registered tool: %s", name)

    def list_tools(self) -> list[dict[str, Any]]:
        return list(self._tools.values())

    def call_tool(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]:
        handler = self._handlers.get(name)
        if handler is None:
            return {"success": False, "error": f"Tool '{name}' not found"}
        try:
            output = handler(**arguments)
            return {"success": True, "output": output}
        except Exception as exc:
            logger.error("MCP tool %s failed: %s", name, exc)
            return {"success": False, "error": str(exc)}

    def handle_request(self, raw: str) -> str:
        try:
            req = json.loads(raw)
            method = req.get("method")
            params = req.get("params", {})
            if method == "list_tools":
                result = self.list_tools()
            elif method == "call_tool":
                result = self.call_tool(params.get("name", ""), params.get("arguments", {}))
            else:
                result = {"error": f"Unknown method: {method}"}
            return json.dumps({"result": result}, ensure_ascii=False)
        except Exception as exc:
            return json.dumps({"error": str(exc)}, ensure_ascii=False)

    def run_stdio(self) -> None:
        logger.info("MCP Server started on stdio")
        for line in sys.stdin:
            line = line.strip()
            if not line:
                continue
            response = self.handle_request(line)
            print(response, flush=True)


class MCPClient:
    def __init__(self, command: list[str] | None = None):
        self.command = command or [sys.executable, "-m", "mcp.server_stdio"]
        self._proc: subprocess.Popen | None = None
        self._tools: list[dict[str, Any]] = []

    def connect(self) -> None:
        self._proc = subprocess.Popen(
            self.command,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
        logger.info("MCP Client connected to server")
        self._tools = self._request("list_tools", {})
        logger.info("Discovered %d tools", len(self._tools))

    def _request(self, method: str, params: dict[str, Any]) -> Any:
        if self._proc is None or self._proc.stdin is None or self._proc.stdout is None:
            raise RuntimeError("MCP client not connected")
        payload = json.dumps({"method": method, "params": params}, ensure_ascii=False)
        self._proc.stdin.write(payload + "\n")
        self._proc.stdin.flush()
        response = self._proc.stdout.readline().strip()
        data = json.loads(response)
        if "error" in data:
            raise RuntimeError(data["error"])
        return data.get("result")

    def get_tools(self) -> list[dict[str, Any]]:
        return list(self._tools)

    def call_tool(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]:
        return self._request("call_tool", {"name": name, "arguments": arguments})

    def close(self) -> None:
        if self._proc is not None:
            self._proc.terminate()
            self._proc.wait()
            logger.info("MCP Client disconnected")


class MCPAgent:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        mcp_client: MCPClient,
        max_iterations: int = 3,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.mcp_client = mcp_client
        self.max_iterations = max_iterations

    def _build_tool_definitions(self) -> list[dict[str, Any]]:
        tools = self.mcp_client.get_tools()
        return [
            {
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t["description"],
                    "parameters": t["parameters"],
                },
            }
            for t in tools
        ]

    def _call_llm(self, messages: list[dict[str, Any]]) -> dict[str, Any]:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "tools": self._build_tool_definitions(),
            "temperature": 0.3,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        return resp.json()

    @staticmethod
    def _extract_tool_calls(message: dict[str, Any]) -> list[dict[str, Any]]:
        return message.get("tool_calls") or []

    def run(self, user_query: str) -> str:
        messages: list[dict[str, Any]] = [
            {
                "role": "system",
                "content": "你是一个 helpful assistant,可以通过 MCP 协议使用外部工具。",
            },
            {"role": "user", "content": user_query},
        ]

        for iteration in range(self.max_iterations):
            logger.info("MCPAgent iteration %d/%d", iteration + 1, self.max_iterations)
            response = self._call_llm(messages)
            message = response["choices"][0]["message"]

            tool_calls = self._extract_tool_calls(message)
            if not tool_calls:
                return message.get("content", "")

            messages.append({
                "role": "assistant",
                "content": message.get("content", ""),
                "tool_calls": tool_calls,
            })

            for tc in tool_calls:
                fn = tc.get("function", {})
                tool_name = fn.get("name", "")
                try:
                    arguments = json.loads(fn.get("arguments", "{}"))
                except json.JSONDecodeError:
                    arguments = {}

                logger.info("MCPAgent calling remote tool: %s(%s)", tool_name, arguments)
                result = self.mcp_client.call_tool(tool_name, arguments)
                content = result["output"] if result.get("success") else f"Error: {result.get('error')}"
                messages.append({
                    "role": "tool",
                    "tool_call_id": tc.get("id", ""),
                    "name": tool_name,
                    "content": content,
                })

        return "达到最大迭代次数,未能完成请求。"

十一、目标设定与监控

在AI智能体的背景下,规划通常涉及智能体接受高层目标,并自由或半自助的生成一系列中间步骤或者子目标。这些步骤可以按照顺序执行,或者以更复杂的流程执行,可能涉及其他模式,如:工具使用、路由、多智能体协作。规划极致可能涉及复杂的搜索算法、逻辑推理,或者越来越多的利用大模型的能力,根据训练数据核对任务的理解生成合理且有效的计划。一个良好的规划能力能使智能体处理非常简单的单步查询问题。这是支撑许多高级智能体的基础模式,将简单的反应系统转变为能够主动朝着既定目标工作的系统。

场景:

  1. 客户支持自动化
  2. 个性化学习系统
  3. 项目管理主力
  4. 自动交易机器人
  5. 机器人和自动驾驶车辆
  6. 自动审查系统

编码实现:略,自主提升,报警,监控等等,场景过多


十二、异常处理和恢复

字面意思,添加拦截器或线程钩子,设置等待超时时间等

编码实现:略,方法太多


十三、人机协同(HITL)

同样字面意思,与十一雷同,与十二处理方式大致相同

吐槽一下:明明一个简单的工作流,与传统工作流不同的是介入AI支撑流程选择,路由控制,并行控制等,造那么多新词干什么


十四、知识检索(RAG)

RAG模式通过生成相应之前给与LLM访问外部只是库的权限,显著增强了它们的能力。与仅依赖内部训练只是不同,RAG允许LLM查找信息。当用户使用AI的时候,AI可以自行判定是否使用RAG或者在LLM之前,通过RAG获取相应的知识,作为前置提示词提供给LLM。

核心概念:

  1. 嵌入(Embedding):具体可以查找相应文章,这里不做具体概述
  2. 文本相似度:
  • 余弦相似度:常见的相似度
  • TF-IDF加权:结合词频与逆文档频率构建特征向量,能有效区分常用词与关键词的重要性
  • SimHash:生成文本的哈希签名并计算海明(Hamming)距离,对内容”相同“与否极为敏感,适合大模型去重
  • 词嵌入:Word2Vec,BFE等
  1. 语义相似度
  • Bert
  • USE
  1. 文档分块
  2. 向量数据库:Qdrant等
  3. 图RAG:与向量数据库不同,图RAG引入了知识图谱,也就是引入了信息结构,使得检索按照一定的结构进行
  4. Agentic RAG:检索过程中引入Agent,对检索出的内容使用agent进行处理

代码示例(Qdrant):


from __future__ import annotations

import logging
import math
from dataclasses import dataclass
from typing import Any

import requests

logger = logging.getLogger(__name__)


@dataclass
class Document:
    id: str
    content: str
    embedding: list[float] | None = None


class DocumentStore:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        chunk_size: int = 200,
        chunk_overlap: int = 50,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self._documents: list[Document] = []

    def _get_embedding(self, text: str) -> list[float]:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {"model": self.model, "input": text}
        resp = requests.post(
            f"{self.base_url}/embeddings",
            headers=headers,
            json=payload,
            timeout=30,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["data"][0]["embedding"]

    def _split_text(self, text: str) -> list[str]:
        paragraphs = [p.strip() for p in text.split("\n") if p.strip()]
        chunks: list[str] = []
        current = ""
        for p in paragraphs:
            if len(current) + len(p) + 1 > self.chunk_size and current:
                chunks.append(current)
                overlap_start = max(0, len(current) - self.chunk_overlap)
                current = current[overlap_start:] + "\n" + p
            else:
                current = f"{current}\n{p}" if current else p
        if current:
            chunks.append(current)
        return chunks if chunks else [text]

    def add_document(self, doc_id: str, text: str) -> None:
        chunks = self._split_text(text)
        for i, chunk in enumerate(chunks):
            try:
                emb = self._get_embedding(chunk)
                self._documents.append(
                    Document(id=f"{doc_id}_{i}", content=chunk, embedding=emb)
                )
                logger.info("Indexed chunk %s_%d", doc_id, i)
            except Exception as exc:
                logger.error("Failed to embed chunk %s_%d: %s", doc_id, i, exc)

    def get_all(self) -> list[Document]:
        return list(self._documents)


class RAGRetriever:
    @staticmethod
    def _cosine_similarity(a: list[float], b: list[float]) -> float:
        dot = sum(x * y for x, y in zip(a, b))
        norm_a = math.sqrt(sum(x * x for x in a))
        norm_b = math.sqrt(sum(x * x for x in b))
        if norm_a == 0.0 or norm_b == 0.0:
            return 0.0
        return dot / (norm_a * norm_b)

    def retrieve(
        self,
        query_embedding: list[float],
        documents: list[Document],
        top_k: int = 3,
    ) -> list[Document]:
        scored = []
        for doc in documents:
            if doc.embedding is None:
                continue
            score = self._cosine_similarity(query_embedding, doc.embedding)
            scored.append((score, doc))
        scored.sort(key=lambda x: x[0], reverse=True)
        return [doc for _, doc in scored[:top_k]]


class RAGAgent:
    def __init__(
        self,
        api_key: str,
        base_url: str,
        model: str,
        embedding_model: str,
        document_store: DocumentStore,
        retriever: RAGRetriever | None = None,
        top_k: int = 3,
    ):
        self.api_key = api_key
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.embedding_model = embedding_model
        self.document_store = document_store
        self.retriever = retriever or RAGRetriever()
        self.top_k = top_k

    def _get_embedding(self, text: str) -> list[float]:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {"model": self.embedding_model, "input": text}
        resp = requests.post(
            f"{self.base_url}/embeddings",
            headers=headers,
            json=payload,
            timeout=30,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["data"][0]["embedding"]

    def _build_prompt(self, query: str, contexts: list[str]) -> str:
        context_text = "\n\n".join(
            f"[文档 {i + 1}]\n{c}" for i, c in enumerate(contexts)
        )
        return (
            "请根据以下参考文档回答用户问题。如果文档中没有相关信息,请明确说明。\n\n"
            f"参考文档:\n{context_text}\n\n"
            f"用户问题: {query}\n\n"
            "回答:"
        )

    def query(self, user_query: str) -> str:
        logger.info("RAGAgent querying: %s", user_query)
        query_embedding = self._get_embedding(user_query)
        docs = self.retriever.retrieve(
            query_embedding,
            self.document_store.get_all(),
            top_k=self.top_k,
        )
        contexts = [d.content for d in docs]
        logger.info("Retrieved %d documents", len(contexts))

        prompt = self._build_prompt(user_query, contexts)
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload: dict[str, Any] = {
            "model": self.model,
            "messages": [{"role": "user", "content": prompt}],
            "temperature": 0.3,
            "max_tokens": 800,
        }
        resp = requests.post(
            f"{self.base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=60,
        )
        resp.raise_for_status()
        data = resp.json()
        return data["choices"][0]["message"]["content"].strip()

十五、智能体间通讯(A2A)

A2A协议是一个实现不同AI智能体之间的通信协议开放标准,它确保了互操作性,允许使用langGraph、CrewAI等技术开发大的AI智能体系统工作,无论其来源或者框架差异如何。

核心概念:

  1. 核心参与者:
  • 用户:发起请求
  • A2A客户端
  • A2A服务端
  1. Agent卡片:Agent的数字身份由其智能体卡片定义,通常是JSON文件。此文件包含客户端交互和自动发现的关键信息,包括客户端身份、段但URL和版本。详细说明支持的功能、特定技能、输入/输出模式以及身份验证要求。
  2. Agent发现:Agent发现极致允许客户端找到描述可用的A2A服务能力的智能体卡片。
  3. 通讯和任务
  4. 交互机制

常见的框架:

  1. Nacos

示例代码:


from __future__ import annotations

import json
import logging
import threading
import time
from dataclasses import asdict, dataclass
from typing import Any, Callable

import requests

logger = logging.getLogger(__name__)


@dataclass
class AgentCard:
    name: str
    description: str
    endpoint: str
    version: str = "1.0"
    skills: list[str] | None = None
    auth: dict[str, Any] | None = None

    def to_dict(self) -> dict[str, Any]:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> AgentCard:
        return cls(
            name=data["name"],
            description=data["description"],
            endpoint=data["endpoint"],
            version=data.get("version", "1.0"),
            skills=data.get("skills"),
            auth=data.get("auth"),
        )


class NacosRegistry:
    def __init__(self, nacos_url: str = "http://localhost:8848", namespace: str = "public"):
        self.nacos_url = nacos_url.rstrip("/")
        self.namespace = namespace

    def register(self, service_name: str, ip: str, port: int, metadata: dict[str, Any]) -> bool:
        url = f"{self.nacos_url}/nacos/v1/ns/instance"
        payload = {
            "serviceName": service_name,
            "ip": ip,
            "port": port,
            "namespaceId": self.namespace,
            "metadata": json.dumps(metadata, ensure_ascii=False),
            "healthy": "true",
            "enabled": "true",
        }
        try:
            resp = requests.post(url, data=payload, timeout=10)
            return resp.text.strip() == "ok"
        except Exception as exc:
            logger.error("Nacos register failed: %s", exc)
            return False

    def deregister(self, service_name: str, ip: str, port: int) -> bool:
        url = f"{self.nacos_url}/nacos/v1/ns/instance"
        payload = {
            "serviceName": service_name,
            "ip": ip,
            "port": port,
            "namespaceId": self.namespace,
        }
        try:
            resp = requests.delete(url, params=payload, timeout=10)
            return resp.text.strip() == "ok"
        except Exception as exc:
            logger.error("Nacos deregister failed: %s", exc)
            return False

    def discover(self, service_name: str) -> list[dict[str, Any]]:
        url = f"{self.nacos_url}/nacos/v1/ns/instance/list"
        params = {
            "serviceName": service_name,
            "namespaceId": self.namespace,
            "healthyOnly": "true",
        }
        try:
            resp = requests.get(url, params=params, timeout=10)
            data = resp.json()
            hosts = data.get("hosts", [])
            return [
                {
                    "ip": h.get("ip"),
                    "port": h.get("port"),
                    "metadata": json.loads(h.get("metadata", "{}")),
                }
                for h in hosts
            ]
        except Exception as exc:
            logger.error("Nacos discover failed: %s", exc)
            return []


class A2AServer:
    def __init__(
        self,
        agent_card: AgentCard,
        handler: Callable[[str], str],
        registry: NacosRegistry | None = None,
    ):
        self.agent_card = agent_card
        self.handler = handler
        self.registry = registry
        self._running = False
        self._thread: threading.Thread | None = None

    def _run_http(self, host: str, port: int) -> None:
        from http.server import BaseHTTPRequestHandler, HTTPServer

        server = self

        class Handler(BaseHTTPRequestHandler):
            def log_message(self, fmt: str, *args: Any) -> None:
                logger.info(fmt, *args)

            def do_GET(self) -> None:
                if self.path == "/agent-card":
                    self._send_json(200, server.agent_card.to_dict())
                else:
                    self._send_json(404, {"error": "Not found"})

            def do_POST(self) -> None:
                if self.path == "/send-task":
                    content_length = int(self.headers.get("Content-Length", 0))
                    body = self.rfile.read(content_length).decode("utf-8")
                    try:
                        data = json.loads(body)
                        task = data.get("task", "")
                        result = server.handler(task)
                        self._send_json(200, {"result": result})
                    except Exception as exc:
                        self._send_json(500, {"error": str(exc)})
                else:
                    self._send_json(404, {"error": "Not found"})

            def _send_json(self, status: int, data: dict[str, Any]) -> None:
                self.send_response(status)
                self.send_header("Content-Type", "application/json")
                self.end_headers()
                self.wfile.write(json.dumps(data, ensure_ascii=False).encode("utf-8"))

        httpd = HTTPServer((host, port), Handler)
        while self._running:
            httpd.handle_request()

    def start(self, host: str = "127.0.0.1", port: int = 8080) -> None:
        self._running = True
        if self.registry:
            success = self.registry.register(
                self.agent_card.name,
                host,
                port,
                {"card": json.dumps(self.agent_card.to_dict(), ensure_ascii=False)},
            )
            if success:
                logger.info("Registered to Nacos: %s@%s:%d", self.agent_card.name, host, port)
            else:
                logger.warning("Failed to register to Nacos")
        self._thread = threading.Thread(target=self._run_http, args=(host, port), daemon=True)
        self._thread.start()
        logger.info("A2A Server started at %s:%d", host, port)

    def stop(self) -> None:
        self._running = False
        if self.registry:
            # Nacos deregister requires ip/port; use endpoint parsing
            pass
        logger.info("A2A Server stopped")


class A2AClient:
    def __init__(self, registry: NacosRegistry | None = None):
        self.registry = registry

    def discover_agent(self, service_name: str) -> AgentCard | None:
        if self.registry is None:
            return None
        hosts = self.registry.discover(service_name)
        if not hosts:
            return None
        host = hosts[0]
        metadata = host.get("metadata", {})
        card_raw = metadata.get("card", "{}")
        try:
            return AgentCard.from_dict(json.loads(card_raw))
        except Exception as exc:
            logger.error("Failed to parse agent card: %s", exc)
            return None

    def send_task(self, agent_card: AgentCard, task: str) -> str:
        url = f"{agent_card.endpoint}/send-task"
        try:
            resp = requests.post(
                url,
                json={"task": task},
                headers={"Content-Type": "application/json"},
                timeout=30,
            )
            resp.raise_for_status()
            data = resp.json()
            return data.get("result", "")
        except Exception as exc:
            logger.error("A2A send_task failed: %s", exc)
            return f"Error: {exc}"

十六、资源感知与优化

参考:十一、十二


十七、推理技术:

推理技术一般包含:

  1. 思维链COT(Chain of thought)
  2. 思维树TOT(Tree of Thought):是一种建立在思维链基础上的推理技术,它允许LLM通过分支到不同的中间步骤探索多个推理路径,形成树形结构。
  3. 自我纠正
  4. 程序辅助
  5. 可验证建立强化学习RLVR:建立奖惩机制
  6. ReAct:推理与行动,与外部交互
  7. 辩论链路COD:通过引入多个角色,甚至多个LLM,对问题进行辩论,以获取更好的结果
  8. 辩论图GOD:COD引入图
  9. 多智能体系统搜索MASS
  • 块级提示词优化:该过程从对各个之恩那个提类型或块的提示词进行局部优化开始,以确保各个组件在集成到更大系统之前能够有效的执行其角色。
  • 工作流拓扑优化
  • 工作流级提示词优化

结束语

以上为大多数智能体的技术特性。根据这些技术特性,研发者可以按照系统架构的思维,对智能体进行设计和编排以及优化。后面均是一些工程化内容,我觉得未必应该按照谷歌的工程思路进行,了解技术特性,掌握这些特性,去灵活运用才是一个合格的架构师或者一个合格的技术人员应该具备的能力。尤其是在现在极为浮躁的环境下。




Agents设计模式
https://www.lingyepro.com/archives/7f79a88e-1a8f-49ac-a762-3fadd28af498
作者
零叶独舞
发布于
2026年04月26日
许可协议