【从0到1构建一个ClaudeAgent】协作-Agent团队
一个 Agent 干不完怎么办?
Java实现代码
public class AgentTeamsSystem {
// --- 配置 ---
private static final Path WORKDIR = Paths.get(System.getProperty("user.dir"));
private static final Path TEAM_DIR = WORKDIR.resolve(".team");
private static final Path INBOX_DIR = TEAM_DIR.resolve("inbox");
private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
// 有效消息类型
private static final Set VALID_MSG_TYPES = Set.of(
"message", "broadcast", "shutdown_request",
"shutdown_response", "plan_approval_response"
);
// --- 消息系统(MessageBus)---
static class MessageBus {
private final Path inboxDir;
public MessageBus(Path inboxDir) {
this.inboxDir = inboxDir;
try {
Files.createDirectories(inboxDir);
} catch (IOException e) {
throw new RuntimeException("Failed to create inbox directory", e);
}
}
/**
* 发送消息到指定智能体
*/
public String send(String sender, String to, String content,
String msgType, Map extra) {
if (!VALID_MSG_TYPES.contains(msgType)) {
return String.format("Error: Invalid type '%s'. Valid: %s",
msgType, String.join(", ", VALID_MSG_TYPES));
}
Map message = new LinkedHashMap<>();
message.put("type", msgType);
message.put("from", sender);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis() / 1000.0);
if (extra != null) {
message.putAll(extra);
}
Path inboxPath = inboxDir.resolve(to + ".jsonl");
try {
String jsonLine = gson.toJson(message) + "\n";
Files.writeString(inboxPath, jsonLine,
StandardOpenOption.CREATE, StandardOpenOption.APPEND);
return String.format("Sent %s to %s", msgType, to);
} catch (IOException e) {
return "Error: " + e.getMessage();
}
}
/**
* 读取并清空邮箱
*/
public List
这段代码引入了智能体团队系统,实现了多个 Agent 之间的协作
核心思想:持久队友 + 异步邮箱。
什么是持久队友(Persistent Teammates)?它们是长期存活、有身份意识的 Agent,通过基于文件的邮箱(JSONL 格式)异步通信,能够处理跨越单个执行周期的复杂任务委托。
多智能体系统架构
核心思想:从单智能体系统升级为多智能体协作系统,引入分布式、角色化、可通信的智能体团队,实现复杂的协同工作流和分布式问题解决。
// 系统配置
private static final Path TEAM_DIR = WORKDIR.resolve(".team");
private static final Path INBOX_DIR = TEAM_DIR.resolve("inbox");
// 团队持久化:.team目录存储团队配置
// 消息传递:inbox子目录实现智能体间通信
// 文件系统基础:通过文件系统实现简单的分布式通信
- 多智能体协同:多个智能体可以并行工作,协同解决问题
- 角色化分工:不同智能体担任不同角色,专业化分工
- 持久化团队:团队配置和状态可以持久化保存
- 去中心化通信:基于文件系统的轻量级消息传递
消息总线系统(MessageBus)
// 消息总线 - 智能体间通信基础设施
static class MessageBus {
private final Path inboxDir;
// 文件系统邮箱:每个智能体一个jsonl文件
// 异步通信:发送方不阻塞,接收方主动拉取
// 松耦合:智能体间通过邮箱解耦
/**
* 发送消息到指定智能体
*/
public String send(String sender, String to, String content,
String msgType, Map extra) {
if (!VALID_MSG_TYPES.contains(msgType)) {
return String.format("Error: Invalid type '%s'. Valid: %s",
msgType, String.join(", ", VALID_MSG_TYPES));
}
// 消息类型验证:确保消息结构符合协议
Map message = new LinkedHashMap<>();
message.put("type", msgType);
message.put("from", sender);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis() / 1000.0);
// 结构化消息:类型、发送方、内容、时间戳
// 可扩展:支持额外字段
Path inboxPath = inboxDir.resolve(to + ".jsonl");
try {
String jsonLine = gson.toJson(message) + "\n";
Files.writeString(inboxPath, jsonLine,
StandardOpenOption.CREATE, StandardOpenOption.APPEND);
// 追加写入:支持多条消息
// JSONL格式:每行一个JSON对象,便于处理
}
}
/**
* 读取并清空邮箱
*/
public List> readInbox(String name) {
Path inboxPath = inboxDir.resolve(name + ".jsonl");
if (!Files.exists(inboxPath)) {
return new ArrayList<>();
}
try {
List> messages = new ArrayList<>();
List lines = Files.readAllLines(inboxPath);
for (String line : lines) {
if (!line.trim().isEmpty()) {
Type type = new TypeToken>(){}.getType();
Map message = gson.fromJson(line, type);
messages.add(message);
}
}
// 清空邮箱(消费模式)
Files.writeString(inboxPath, "");
// 消费一次:消息被读取后清空,避免重复处理
// 确保每个消息只被处理一次
return messages;
}
}
}
- 异步通信:发送和接收解耦,不阻塞发送方
- 文件系统存储:简单可靠,支持进程间通信
- 结构化消息:明确的消息格式,支持多种消息类型
- 消费模式:读取后清空,避免消息重复处理
- 可扩展协议:通过msgType支持不同的通信语义
智能体管理器(TeammateManager)
// 智能体管理器 - 多智能体生命周期管理
static class TeammateManager {
private final Path teamDir;
private final Path configPath;
private Map config;
private final Map threads = new ConcurrentHashMap<>();
private final Map stopFlags = new ConcurrentHashMap<>();
// 配置管理:团队配置持久化到文件
// 线程管理:每个智能体在自己的线程中运行
// 停止控制:支持优雅停止智能体
public String spawn(String name, String role, String prompt) {
Map member = findMember(name);
if (member != null) {
String status = (String) member.get("status");
if (!"idle".equals(status) && !"shutdown".equals(status)) {
return String.format("Error: '%s' is currently %s", name, status);
}
member.put("status", "working");
member.put("role", role);
} else {
member = new LinkedHashMap<>();
member.put("name", name);
member.put("role", role);
member.put("status", "working");
((List>) config.get("members")).add(member);
}
// 状态管理:智能体有明确的状态机
// 重用支持:可以重用已有的智能体
// 角色配置:为智能体分配特定角色
saveConfig();
// 创建新的停止标志
AtomicBoolean stopFlag = new AtomicBoolean(false);
stopFlags.put(name, stopFlag);
// 创建并启动新线程
Thread thread = new Thread(() -> teammateLoop(name, role, prompt, stopFlag),
"Teammate-" + name);
thread.setDaemon(true);
threads.put(name, thread);
thread.start();
// 独立线程:每个智能体在独立线程中运行
// 守护线程:不会阻止JVM退出
// 命名线程:便于调试和监控
return String.format("Spawned '%s' (role: %s)", name, role);
}
private void teammateLoop(String name, String role, String prompt, AtomicBoolean stopFlag) {
String systemPrompt = String.format(
"You are '%s', role: %s, at %s. " +
"Use send_message to communicate. Complete your task.",
name, role, WORKDIR
);
// 个性化系统提示:为每个智能体定制角色
// 明确角色:让智能体知道自己的身份和职责
List> messages = new ArrayList<>();
messages.add(Map.of("role", "user", "content", prompt));
// 初始化消息:从传入的prompt开始
// 最大迭代次数限制
for (int i = 0; i < 50 && !stopFlag.get(); i++) {
try {
// 检查邮箱
List> inbox = BUS.readInbox(name);
for (Map msg : inbox) {
messages.add(Map.of("role", "user", "content", gson.toJson(msg)));
}
// 邮箱检查:每次迭代前检查新消息
// 消息注入:将收到的消息加入上下文
// 持续通信:支持动态的任务调整
// 短暂休眠,避免 CPU 过度使用
Thread.sleep(100);
// 节能设计:避免忙等待
}
}
// 更新状态
Map member = findMember(name);
if (member != null && !"shutdown".equals(member.get("status"))) {
member.put("status", "idle");
saveConfig();
}
// 状态恢复:完成后状态恢复为idle
// 配置持久化:状态变化立即保存
}
}
- 生命周期管理:智能体的创建、运行、停止、销毁
- 状态持久化:智能体状态保存到文件,重启可恢复
- 独立执行:每个智能体在自己的线程中独立运行
- 通信集成:自动检查邮箱,支持动态通信
- 优雅停止:支持安全的停止机制
多智能体通信工具集
// 团队管理工具集
public enum ToolType {
SPAWN_TEAMMATE("spawn_teammate", "Spawn a persistent teammate that runs in its own thread."),
LIST_TEAMMATES("list_teammates", "List all teammates with name, role, status."),
SEND_MESSAGE("send_message", "Send a message to a teammate's inbox."),
READ_INBOX("read_inbox", "Read and drain the lead's inbox."),
BROADCAST("broadcast", "Send a message to all teammates.");
// 团队创建:动态生成新的智能体
// 状态查询:查看所有智能体状态
// 点对点通信:向特定智能体发送消息
// 广播通信:向所有智能体发送消息
// 邮箱读取:获取收到的消息
}
// 工具处理器
TOOL_HANDLERS.put(ToolType.SEND_MESSAGE.name, args -> {
String to = (String) args.get("to");
String content = (String) args.get("content");
String msgType = (String) args.get("msg_type");
if (msgType == null) msgType = "message";
return BUS.send("lead", to, content, msgType);
// 领导身份:所有消息都以"lead"身份发送
// 灵活消息类型:支持不同类型的消息
});
TOOL_HANDLERS.put(ToolType.BROADCAST.name, args -> {
String content = (String) args.get("content");
return BUS.broadcast("lead", content, TEAM_MANAGER.memberNames());
// 批量发送:向所有团队成员发送消息
// 排除自己:广播不包含发送者自己
});
- 完整的通信API:提供完整的智能体间通信能力
- 领导-成员模式:明确的领导智能体控制整个团队
- 灵活的通信模式:支持点对点、广播、邮箱读取
- 与现有系统集成:与基础工具无缝集成
领导智能体主循环
// Agent 主循环(领导智能体)
public static void agentLoop(List> messages) {
while (true) {
try {
// 检查领导邮箱
List> inbox = BUS.readInbox("lead");
if (!inbox.isEmpty()) {
String inboxJson = gson.toJson(inbox);
messages.add(Map.of(
"role", "user",
"content", "" + inboxJson + " "
));
messages.add(Map.of(
"role", "assistant",
"content", "Noted inbox messages."
));
// 自动邮箱检查:每次迭代前检查新消息
// 结构化注入:用XML标签包裹,便于解析
// 对话完整:添加assistant响应,保持结构
}
// 显示团队状态
int activeCount = TEAM_MANAGER.getActiveCount();
if (activeCount > 0) {
System.out.printf("[Active teammates: %d]%n", activeCount);
}
// 状态监控:实时显示活跃智能体数量
}
}
}
- 自动通信:领导智能体自动接收和处理消息
- 状态感知:实时了解团队状态
- 决策依据:基于团队反馈做出更好的决策
- 领导协调:领导智能体负责协调整个团队
架构演进与价值
从 BackgroundTasksSystem 到 AgentTeamsSystem 的升级:
| 维度 | BackgroundTasksSystem | AgentTeamsSystem |
|---|---|---|
| 架构模式 | 主从异步任务 | 多智能体协作 |
| 智能水平 | 被动执行任务 | 主动协作解决 |
| 通信方式 | 结果通知 | 结构化消息传递 |
| 角色分工 | 无 | 明确的角色化分工 |
| 决策机制 | 集中决策 | 分布式协同决策 |
原文地址: https://www.cveoy.top/t/topic/qGr8 著作权归作者所有。请勿转载和采集!