最近折腾了一个项目,看名字就是ai智能体了,liuzhixin405/cagentcli 借鉴了claude-code和hermes的设计理念。目前只是通过deepseek的api来做的测试,本人只是微测试,开发团队那就是
deepseek,hermes,claude,gpt一众大佬了。个人精力有限,上线bug,用户体验问题肯定少不了,望多担待或者一起修复。

image

 临近上线还是bug不断,让它自己来修。

image

该项目源码就不一一展示了,特征如下:

image

 鉴于golang语言的简介,实现起来非常简单,结构也非常明了,一看就知道实现了哪些功能。

image

 上手非常简单,有完整的使用文档,以及命令提示。

image

 以下纯属项目介绍:

# agentgo 运行时架构深度解析:一个 Go AI 编程助手的核心引擎设计

> 本文从代码层面剖析 agentgo 的运行时机制——Tool-Calling 循环、权限沙箱、护栏防循环、Git 检查点、记忆梦境整理、子代理架构等核心子系统。

---

## 0. 先看全貌:25 个内部包,最小依赖

agentgo 的 `internal/` 目录下挂着 25+ 个包,但 `go.mod` 只有两个外部依赖:

```
module github.com/agentgo
go 1.24.0
require golang.org/x/term v0.29.0
require golang.org/x/sys v0.30.0
```

`term` 用于终端原始模式(REPL 输入),`sys` 是 `term` 的传递依赖。没有 ORM、没有 Web 框架、没有 JSON schema 库——**一切从零手写**。

这个极简依赖策略带来了一个关键收益:**安全审计面积极小**。你可以在一小时内 review 完所有第三方代码。

---

## 1. 核心引擎循环:`Engine.RunMessageWithStream`

一切从 `engine.go` 的 `RunMessageWithStream` 开始。这是整个运行时的心脏,约 200 行实现了一个完整的 Agentic Loop:

```go
for iter := 0; iter < MaxIterations; iter++ {
    // 1. 检查 context 取消(Ctrl+C 中断)
    if ctx.Err() != nil { return "", ctx.Err() }

    // 2. 构建请求:系统提示词 + 历史消息 + 工具定义
    req := api.ChatRequest{
        Model:      e.config.Model,
        Messages:   reqMessages,
        SystemBase: sp,
        Tools:      toolDefs,
        MaxTokens:  64000,
    }

    // 3. 调用 LLM(支持流式 + 非流式两种路径)
    resp, err = e.provider.ChatStream(ctx, req, onDelta) // 或 Chat()

    // 4. 更新费用追踪
    e.costTracker.AddDetailed(e.config.Model, resp.InputTokens, resp.OutputTokens, ...)

    // 5. 处理截断响应(max_tokens stop reason → 注入续写提示)
    if resp.StopReason == "max_tokens" && len(resp.ToolCalls) == 0 {
        e.messages = append(e.messages, 
            api.Message{Role: "user", Content: "[system: previous response was truncated...]"})
        continue
    }

    // 6. 无工具调用 → 对话结束,返回
    if len(resp.ToolCalls) == 0 {
        e.runTurnEndPipeline() // 触发后台任务
        return resp.Content, nil
    }

    // 7. 执行工具调用(支持并行)
    // ...

    // 8. 护栏检查 + 熔断器
    // ...

    // 9. 上下文压缩检查
    if e.totalTokens > CompactTokenThreshold {
        e.Compact(ctx)
    }
}
```

### 设计要点:

**a) `MaxIterations = 200` 上限保护**

每一轮对话最多执行 200 次 LLM 调用。超过上限直接报错退出,防止无限循环烧钱。

**b) 截断续写机制**

LLM 可能在生成到一半时达到 max_tokens 上限,此时如果只是文本截断(没有完整的 tool_call),引擎会自动注入一条 system 消息让模型继续:

```go
e.messages = append(e.messages, api.Message{
    Role: "user", 
    Content: "[system: your previous response was truncated due to length. Please continue, writing one file at a time.]",
})
```

**c) `ctx.Err()` 快速退出**

每次迭代开始时第一件事就是检查 context——如果用户按了 Ctrl+C,立即退出而非继续消耗 API 调用。

**d) `CompactTokenThreshold = 64000`**

当累计 token 超过 64000 且已执行 5+ 次迭代时,自动触发上下文压缩。

---

## 2. 权限系统:三层决策链

agentgo 的权限模型不是简单的「允许/拒绝」二元开关,而是一条**三层决策链**:

```go
// 第一层:工具自身声明权限意向
toolDecision := t.CheckPermissions(tc.Input, tctx)

// 第二层:权限管理器综合判断
decision, reason := e.perm.Check(tc.Name, tc.Input, mapToolDecision(toolDecision.Decision))

// 第三层:用户交互确认
if decision == permission.DAsk {
    approved := e.PermissionPrompt(tc.Name, tc.Input, reason)
}
```

### 权限管理器核心逻辑 (`permission/permission.go`):

```go
func (m *Manager) Check(toolName string, toolInput map[string]any, defaultDecision Decision) (Decision, string) {
    // 1. Deny 规则优先:黑名单中的操作直接拒绝
    for _, r := range m.deny {
        if matchRule(r, toolName, toolInput) { return DDeny, "denied by policy rule" }
    }
    // 2. Bypass 模式:完全绕过
    if m.mode == Bypass && m.bypassAvailable { return DBypass, "bypass mode" }
    // 3. Allow 规则:白名单放行
    for _, r := range m.allow { ... return DAllow }
    // 4. Ask 规则:需要确认
    for _, r := range m.ask { ... return DAsk }
    // 5. 模式默认行为
    switch m.mode {
    case Auto:  // 安全工具自动放行,危险工具仍需确认
    case Plan:  // 只读工具放行,写操作直接拒绝
    default:    // default 模式:尊重工具自身声明
    }
}
```

四种模式的行为矩阵:


| 模式      | 只读工具 (read/grep/glob) | 写文件 (write/edit) | 执行命令 (bash) | 危险命令 (rm -rf) |
| --------- | ------------------------- | ------------------- | --------------- | ----------------- |
| `default` | ✅ 自动                   | ⚠️ 询问           | ⚠️ 询问       | ❌ 阻断           |
| `plan`    | ✅ 自动                   | ❌ 拒绝             | ❌ 拒绝         | ❌ 阻断           |
| `auto`    | ✅ 自动                   | ✅ 自动             | ⚠️ 询问       | ❌ 阻断           |
| `bypass`  | ✅ 自动                   | ✅ 自动             | ✅ 自动         | ✅ 自动           |

其中「危险命令」由 `classifier.Classify(cmd)` 判定(`permission/classifier.go`),独立于模式设置——**任何模式都阻止 `rm -rf /`、`format c:` 这类操作**---

## 3. 护栏系统:防死循环的三路检测器

`guardrail/guardrail.go` 实现了一个轻量级的工具调用异常检测器,在每次工具执行前进行拦截:

```go
type Tracker struct {
    exactFailures   map[signature]int    // 相同工具+参数连续失败次数
    sameToolFails   map[string]int       // 同一工具名连续失败次数
    idempotentSeen  map[signature]string // 只读工具上次返回结果哈希
    idempotentCount map[signature]int    // 相同结果连续返回次数
}
```

### 三路检测逻辑:

1. **精确失败检测**:同一工具 + 同一参数连续失败 ≥ 5 次 → **Block**,"请换一种方法"
2. **工具级失败检测**:同一工具连续失败 ≥ 8 次 → **Block**,"尝试其他工具"
3. **只读无进展检测**:`read`/`grep`/`glob`/`webfetch` 返回相同结果 ≥ 5 次 → **Block**,"无需重复调用"

签名生成使用SHA256取前8位:

```go
func makeSignature(name string, args map[string]any) signature {
    data, _ := json.Marshal(args)
    h := sha256.Sum256(data)
    return signature{Name: name, ArgsHash: hex.EncodeToString(h[:8])}
}
```

此外,引擎层还有一个**熔断器**(Circuit Breaker):

```go
if allFailed && len(results) > 0 {
    e.consecutiveErrors++
    if e.consecutiveErrors >= 3 {
        // 注入 system 消息引导模型换方案
        e.messages = append(e.messages, api.Message{
            Role: "user",
            Content: "[system: The last 3+ tool calls all failed. Please try a different approach...]",
        })
    }
}
```

---

## 4. Git 检查点:用 Bare Repo 实现无侵入快照

`checkpoint/checkpoint.go` 是一个精巧的设计——它在 `~/.agentgo/checkpoints/store` 下初始化一个 **bare Git 仓库**,用一个 **临时 index 文件** 来做快照,完全不污染工作目录的 `.git`:

```go
env := []string{
    "GIT_DIR=" + m.storeDir,      // 独立的 bare repo
    "GIT_WORK_TREE=" + m.workDir,  // 指向实际工作目录
    "GIT_INDEX_FILE=" + indexFile, // 临时索引,不影响真实 git 状态
}
```

每个工作目录通过 SHA256 哈希映射到独立的 ref:

```go
h := sha256.Sum256([]byte(workDir))
refName := "refs/agentgo/" + hex.EncodeToString(h[:8])
```

自动排除大文件和构建产物:

```go
var excludePatterns = []string{
    "node_modules", ".git", ".venv", "__pycache__",
    "*.exe", "*.dll", "*.so", "*.dylib",
    "target/", "dist/", "build/", ".next/",
}
```

在引擎中,写文件和编辑文件之前自动创建检查点:

```go
if e.checkpointMgr != nil && (tc.Name == "write" || tc.Name == "edit" || tc.Name == "bash") {
    e.checkpointMgr.Create(tc.Name + ": " + shortPath(tc.Input))
}
```

用户可以用 `/undo` 回退,用 `/checkpoints` 查看历史。

---

## 5. 多供应商抽象:一个 Interface,两种实现

`api/provider.go` 定义了一个统一的 `Provider` 接口:

```go
type Provider interface {
    Name() string
    DisplayName() string
    Chat(ctx context.Context, req ChatRequest) (*ChatResponse, error)
    ChatStream(ctx context.Context, req ChatRequest, handler StreamHandler) (*ChatResponse, error)
    Validate() error
}
```

实际上只有两个具体实现:

- **`anthropicProvider`** — 调用 Anthropic Messages API
- **`openAICompatProvider`** — 覆盖所有 OpenAI-compatible API(DeepSeek、GLM、Kimi、Qwen、Groq 等 15+ 个供应商)

`DetectProvider` 根据模型名自动推断:

```go
func DetectProvider(model string, cfg ProviderConfig) Provider {
    if cfg.Name != "" { return NewProvider(cfg) }        // 用户指定
    if containsAny(model, "deepseek") { ... }             // 模型名包含 deepseek
    if containsAny(model, "gpt-", "o1-", "o3-", "o4-") { ... }  // OpenAI 系列
    return newAnthropicProvider(cfg)                     // 默认走 Anthropic
}
```

### 密钥池:多 Key 轮转 + 自动冷却

`api/keypool.go` 实现了一个带健康状态的 API Key 池:

```go
type PoolKey struct {
    Key       string
    Status    KeyStatus   // KeyOK | KeyExhausted | KeyDead
    CoolUntil time.Time   // 冷却到期时间
    UseCount  int
    LastError string
}
```

- Round-robin 轮转取 key
- 收到 429 (Rate Limit) → 自动标记 `KeyExhausted` + 设定冷却时间
- 冷却期满自动复活
- 认证失败 (401/403) → 标记 `KeyDead`,不再使用
- `/ratelimit` 命令可查看当前密钥池健康状态

---

## 6. 记忆系统:磁盘文件 + XML 注入 + 梦境整理

### 6.1 存储层 (`memory/store.go`)

记忆以**独立文件**存储在 `~/.agentgo/memory/` 目录,同时自动加载工作目录下的 `CLAUDE.md`:

```go
func (s *Store) All() []Entry {
    // 1. 读取 ~/.agentgo/memory/ 下所有文件
    // 2. 读取项目根目录的 CLAUDE.md
    // 3. 读取 .claude/CLAUDE.md(如有)
}
```

容量限制:

- 单个条目 ≤ 25KB
- 总容量 ≤ 100KB
- 自动 truncation

### 6.2 注入到 System Prompt

记忆内容以 XML 格式注入到系统提示词中,放在 `` 标签内:

```xml


environment.md
...


```

带 30 秒 TTL 的缓存:

```go
if s.cachedAll != nil && time.Since(s.cacheTime) < s.cacheTTL {
    return s.cachedAll  // 避免每次构建 system prompt 都读磁盘
}
```

### 6.3 Dream 梦境整理 (`dream/dream.go`)

这是一个**后台运行的独立 LLM 代理**,专门负责记忆文件的增删改和整理。

触发门控(三重检查):

```
1. 时间门控:距上次整理 ≥ MinHours 小时(默认 24h)
2. 扫描节流:两次扫描间隔 ≥ 10 分钟(避免高频检查)
3. 会话门控:新会话数 ≥ MinSessions(默认 3 个)
```

触发后独立运行一个 Tool-Calling 循环(最多 30 次迭代),只授予受限工具:

```
允许:bash(ls/find/grep/cat/stat/wc/head/tail), read, write, edit, glob, grep
禁止:任何写命令(rm/mv/cp)、网络请求
```

且 `write`/`edit` 被限制只能操作 `~/.agentgo/memory/` 目录:

```go
if !isInsideMemoryDir(absPath, r.memoryRoot) {
    return "Error: dream mode can only write to memory directory"
}
```

加锁机制防止多个 agentgo 进程同时运行 dream:

```go
priorMtime, acquired, err := TryAcquireConsolidationLock()
if !acquired { return }
defer RollbackConsolidationLock(task.PriorMtime) // 失败回滚
```

---

## 7. 并行工具执行:区分并发安全与串行

当 LLM 返回多个 tool_call 时,引擎自动并行执行并发安全的工具:

```go
if len(resp.ToolCalls) > 1 {
    var wg sync.WaitGroup
    for i, tc := range resp.ToolCalls {
        t, _ := e.registry.Find(tc.Name)
        safe := t != nil && t.Def().IsConcurrencySafe

        // write/edit 到不同文件也可以并行
        if !safe && (tc.Name == "write" || tc.Name == "edit") {
            if fp, ok := tc.Input["filePath"].(string); ok && fp != "" {
                if !serialFilePaths[fp] {
                    serialFilePaths[fp] = true
                    safe = true
                }
            }
        }

        if safe {
            wg.Add(1)
            go func(idx int, tcall api.ToolCall) {
                defer wg.Done()
                defer func() {
                    if r := recover(); r != nil {
                        results[idx] = toolResult{...Error: "tool panicked: ..."}
                    }
                }()
                results[idx] = toolResult{...e.executeTool(ctx, tcall)}
            }(i, tc)
        } else {
            results[i] = toolResult{...e.executeTool(ctx, tc)}  // 串行
        }
    }
    wg.Wait()
}
```

关键设计:

- 每个并行 goroutine 包裹 `recover()` 防止 panic 导致进程崩溃
- `write`/`edit` 到不同文件路径视为安全的并发操作
- `bash` 命令默认串行(有副作用)

---

## 8. 上下文窗口管理:三级压缩策略

### 8.1 工具输出自适应截断

不同工具的输出采用不同的 token 上限:

```go
switch tc.Name {
case "read", "grep":
    maxTokens = 6000  // 源代码上下文更有价值
case "bash":
    maxTokens = 3000  // 构建/测试输出通常重复
case "webfetch":
    maxTokens = 3000
default:
    maxTokens = 4000
}
```

### 8.2 旧工具结果瘦身

`trimOldToolResults` 将早期的 verbose 工具输出替换为一行摘要:

```go
m.Content = fmt.Sprintf("[%s] %s (%d chars原始输出已压缩)", m.Name, firstLine, len(m.Content))
```

### 8.3 全量压缩 (`Compact`)

当 token 超过 64000 时,调用 LLM 对早期对话进行摘要,保留最近的消息段:

```go
// 结构: [第一条用户消息] + [LLM 摘要] + [最近消息]
newMsgs := make([]api.Message, 0, 2+keepTail)
newMsgs = append(newMsgs, e.messages[0])          // 第一个 user 消息
newMsgs = append(newMsgs, api.Message{
    Role: "user",
    Content: "[Context Summary]\n" + compactResp.Content,
})
newMsgs = append(newMsgs, e.messages[splitIdx:]...)  // 保留尾部
```

---

## 9. Turn-End Pipeline:后台任务流水线

每轮对话结束后,`runTurnEndPipeline` 启动五个并行的后台 goroutine:

```go
func (e *Engine) runTurnEndPipeline() {
    // 1. 会话笔记落盘(无 API 开销)
    go func() { e.sessionNotes.Flush() }()

    // 2. Dream 梦境整理检查(条件触发)
    go func() { e.dreamRunner.ExecuteAutoDream(context.Background()) }()

    // 3. 后台回顾学习
    go func() { e.backgroundReview() }()

    if !e.autoExtract { return }

    // 4. 记忆提取(调用 LLM 从对话中提取关键信息)
    go func() { e.extractRunner.Extract(context.Background(), msgs) }()

    // 5. 后续建议生成(带 5 秒超时)
    go func() {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        sug := e.suggestRunner.Generate(ctx, msgs)
        e.lastSuggestions = sug
    }()
}
```

所有 goroutine 都包裹了 `recover()`,保证一个后台 panic 不会炸掉主进程。

---

## 10. 子代理系统:递归的引擎实例化

`agent/runner.go` 实现了子代理调度器:

```go
func (r *Runner) Run(ctx context.Context, name string, task string) (*api.AgentRunResult, error) {
    def := r.defs[name]
    agentTools := r.filterTools(def.Tools)

    // 创建独立的 Engine 实例
    eng, _ := engine.New(engine.Config{
        Model:          def.Model,
        PermissionMode: "auto",   // 子代理默认 auto 模式
        Tools:          agentTools,
        Provider:       r.config.Provider,
    })

    eng.SetSystemOverride(strings.Join([]string{
        fmt.Sprintf("[Sub-agent: %s]", def.Name),
        def.Description,
        def.Prompt,
    }, "\n"))

    output, err := eng.Run(ctx, task)
    return &api.AgentRunResult{Output: output, Cost: ..., Success: err == nil}
}
```

预定义了五种子代理类型:

- `general` — 通用任务
- `explore` — 代码探索
- `plan` — 方案设计
- `review` — 代码审查
- `test` — 测试编写

每个子代理是**全新的 Engine 实例**,有自己的消息历史、费用追踪和工具集。这相当于在主会话中 fork 出一个隔离的沙箱执行环境。

---

## 11. 费用追踪:精算到 Token 级别

`cost/tracker.go` 硬编码了主流模型的实时价格:

```go
var Prices = map[string]Price{
    "claude-sonnet-4-20250514": {Input: 3.0, Output: 15.0},     // $3/$15 per 1M tokens
    "deepseek-v4-pro":          {Input: 0.435, Output: 0.87},   // $0.435/$0.87
    "gpt-4o":                   {Input: 2.5, Output: 10.0},
    // ...
}
```

支持 Anthropic 的 Prompt Cache 分层计费:

```go
t.TotalCost += (float64(cacheMiss)/1e6)*p.Input + 
               (float64(cacheHit)/1e6)*p.InputCacheHit + 
               (float64(output)/1e6)*p.Output
```

每次 API 调用后更新,`OverBudget()` 检查在下一轮迭代前生效:

```go
if e.costTracker.OverBudget() {
    return "", fmt.Errorf("budget exceeded: %s", e.costTracker.Summary())
}
```

持久化的 `CostHistory` 提供 24 小时、7 天、累计统计。

---

## 结语

agentgo 的核心引擎设计体现了一种 **「深度防御 + 激进优化」** 的哲学:

- **深度防御**:Classifier → Permission Manager → Guardrail → Circuit Breaker → MaxIterations,五层保护
- **激进优化**:并行工具执行、自适应截断、多级压缩、Prompt 缓存
- **最小依赖**:两个外部包,一切从零手写

它证明了一件事:用 8,000 行 Go 代码 + 两个第三方依赖,完全可以构建出一个功能不输于任何竞品的 AI 编程助手运行时。

---

 

代码仓库:liuzhixin405/cagentcli


原文地址: https://www.cveoy.top/t/topic/qGLY 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录