feat: 实时数据展示优化
This commit is contained in:
35
app/run.go
35
app/run.go
@@ -122,6 +122,9 @@ func Run(cfg Config, refs tools.References, prompts Prompts, styles map[string]s
|
||||
// registerSubscription 注册 coordinator 事件订阅,包含确定性控制和可选的 UIEvent/Delta 转发。
|
||||
func registerSubscription(coordinator *agentcore.Agent, store *state.Store, provider string, emit emitFn, onDelta deltaFn, onClear clearFn) {
|
||||
var lastProgressSummary string
|
||||
agentExt := newFieldExtractor("agent") // Coordinator → subagent 目标 agent 名称
|
||||
taskExt := newFieldExtractor("task") // Coordinator → subagent 调度指令
|
||||
subFilter := newStreamFilter("content") // SubAgent:文本透传 + JSON 提取 content
|
||||
|
||||
coordinator.Subscribe(func(ev agentcore.Event) {
|
||||
switch ev.Type {
|
||||
@@ -135,7 +138,9 @@ func registerSubscription(coordinator *agentcore.Agent, store *state.Store, prov
|
||||
// 区分流式 delta 和进度摘要
|
||||
if delta, ok := parseStreamDelta(ev); ok {
|
||||
if onDelta != nil {
|
||||
onDelta(delta)
|
||||
if text := subFilter.Feed(delta); text != "" {
|
||||
onDelta(text)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -153,15 +158,23 @@ func registerSubscription(coordinator *agentcore.Agent, store *state.Store, prov
|
||||
}
|
||||
|
||||
case agentcore.EventMessageStart:
|
||||
// 新一轮 LLM 输出开始,清空流式缓冲
|
||||
// 新一轮 LLM 输出开始,重置提取器 + 清空流式缓冲
|
||||
agentExt.Reset()
|
||||
taskExt.Reset()
|
||||
subFilter.Reset()
|
||||
if onClear != nil {
|
||||
onClear()
|
||||
}
|
||||
|
||||
case agentcore.EventMessageUpdate:
|
||||
// Coordinator 自身思考时的流式 token
|
||||
// Coordinator 的流式 token:先提取 agent 名称做标题,再提取 task 内容
|
||||
if ev.Delta != "" && onDelta != nil {
|
||||
onDelta(ev.Delta)
|
||||
if name := agentExt.Feed(ev.Delta); name != "" {
|
||||
onDelta("\n▸ " + agentLabel(name) + "\n")
|
||||
}
|
||||
if text := taskExt.Feed(ev.Delta); text != "" {
|
||||
onDelta(text)
|
||||
}
|
||||
}
|
||||
|
||||
case agentcore.EventToolExecEnd:
|
||||
@@ -806,6 +819,20 @@ func extractToolErrorText(result json.RawMessage) string {
|
||||
return truncateLog(string(result), 160)
|
||||
}
|
||||
|
||||
// agentLabel 将内部 agent 名称映射为用户友好的标签。
|
||||
func agentLabel(name string) string {
|
||||
switch name {
|
||||
case "architect_short", "architect_mid", "architect_long":
|
||||
return "Architect 规划中"
|
||||
case "writer":
|
||||
return "Writer 创作中"
|
||||
case "editor":
|
||||
return "Editor 审阅中"
|
||||
default:
|
||||
return name
|
||||
}
|
||||
}
|
||||
|
||||
func truncateLog(s string, maxRunes int) string {
|
||||
runes := []rune(s)
|
||||
if len(runes) <= maxRunes {
|
||||
|
||||
216
app/stream_extract.go
Normal file
216
app/stream_extract.go
Normal file
@@ -0,0 +1,216 @@
|
||||
package app
|
||||
|
||||
import "strings"
|
||||
|
||||
// jsonFieldExtractor 从流式 JSON 碎片中提取指定字段的字符串值。
|
||||
//
|
||||
// LLM 流式生成 tool call 时,参数是逐片段到达的(OpenAI/Anthropic)
|
||||
// 或一次性到达的(Gemini)。本提取器用状态机逐字符扫描,
|
||||
// 检测到目标 key 后提取其字符串值,处理 JSON 转义。
|
||||
type jsonFieldExtractor struct {
|
||||
key string // 匹配目标,如 `"content"` 或 `"task"`
|
||||
state extractState
|
||||
matchPos int
|
||||
escape bool
|
||||
buf strings.Builder
|
||||
}
|
||||
|
||||
type extractState int
|
||||
|
||||
const (
|
||||
stateScan extractState = iota // 扫描,寻找目标 key
|
||||
stateColon // 已匹配 key,等冒号和开头引号
|
||||
stateExtract // 提取字符串值中
|
||||
)
|
||||
|
||||
func newFieldExtractor(fieldName string) *jsonFieldExtractor {
|
||||
return &jsonFieldExtractor{key: `"` + fieldName + `"`}
|
||||
}
|
||||
|
||||
// Feed 处理一段 delta,返回提取到的文本(可能为空)。
|
||||
func (e *jsonFieldExtractor) Feed(delta string) string {
|
||||
e.buf.Reset()
|
||||
for _, r := range delta {
|
||||
switch e.state {
|
||||
case stateScan:
|
||||
e.feedScan(r)
|
||||
case stateColon:
|
||||
e.feedColon(r)
|
||||
case stateExtract:
|
||||
e.feedExtract(r)
|
||||
}
|
||||
}
|
||||
return e.buf.String()
|
||||
}
|
||||
|
||||
func (e *jsonFieldExtractor) feedScan(r rune) {
|
||||
if e.matchPos < len(e.key) && byte(r) == e.key[e.matchPos] {
|
||||
e.matchPos++
|
||||
if e.matchPos == len(e.key) {
|
||||
e.state = stateColon
|
||||
e.matchPos = 0
|
||||
}
|
||||
return
|
||||
}
|
||||
e.matchPos = 0
|
||||
if byte(r) == e.key[0] {
|
||||
e.matchPos = 1
|
||||
}
|
||||
}
|
||||
|
||||
func (e *jsonFieldExtractor) feedColon(r rune) {
|
||||
switch r {
|
||||
case ':', ' ', '\t':
|
||||
// 跳过
|
||||
case '"':
|
||||
e.state = stateExtract
|
||||
e.escape = false
|
||||
default:
|
||||
e.state = stateScan
|
||||
e.matchPos = 0
|
||||
if byte(r) == e.key[0] {
|
||||
e.matchPos = 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *jsonFieldExtractor) feedExtract(r rune) {
|
||||
if e.escape {
|
||||
e.escape = false
|
||||
switch r {
|
||||
case 'n':
|
||||
e.buf.WriteByte('\n')
|
||||
case 't':
|
||||
e.buf.WriteByte('\t')
|
||||
case 'r':
|
||||
e.buf.WriteByte('\r')
|
||||
case '"', '\\', '/':
|
||||
e.buf.WriteRune(r)
|
||||
default:
|
||||
e.buf.WriteByte('\\')
|
||||
e.buf.WriteRune(r)
|
||||
}
|
||||
return
|
||||
}
|
||||
switch r {
|
||||
case '\\':
|
||||
e.escape = true
|
||||
case '"':
|
||||
e.state = stateScan
|
||||
e.matchPos = 0
|
||||
default:
|
||||
e.buf.WriteRune(r)
|
||||
}
|
||||
}
|
||||
|
||||
// Reset 重置状态(新 LLM 消息轮次时调用)。
|
||||
func (e *jsonFieldExtractor) Reset() {
|
||||
e.state = stateScan
|
||||
e.matchPos = 0
|
||||
e.escape = false
|
||||
}
|
||||
|
||||
// ThinkingSep 是思考文本与正文之间的分隔标记。
|
||||
// streamFilter 在思考文本段前插入此标记,TUI 据此切换渲染样式。
|
||||
const ThinkingSep = "\x02"
|
||||
|
||||
// streamFilter 区分 SubAgent 的文本回复和 JSON 工具调用。
|
||||
// 文本回复标记为思考内容(前缀 ThinkingSep);JSON 工具调用只提取指定字段。
|
||||
//
|
||||
// 判断依据:遇到 { 进入 JSON 模式(追踪大括号深度),
|
||||
// 深度归零后回到文本模式。
|
||||
type streamFilter struct {
|
||||
fieldExt *jsonFieldExtractor
|
||||
mode filterMode
|
||||
braceDepth int
|
||||
inString bool // 在 JSON 字符串内(大括号不计数)
|
||||
escJSON bool // JSON 字符串内的转义
|
||||
thinking bool // 当前处于思考文本段
|
||||
buf strings.Builder
|
||||
}
|
||||
|
||||
type filterMode int
|
||||
|
||||
const (
|
||||
filterText filterMode = iota // 文本回复,直接透传
|
||||
filterJSON // JSON 工具调用,提取目标字段
|
||||
)
|
||||
|
||||
func newStreamFilter(fieldName string) *streamFilter {
|
||||
return &streamFilter{fieldExt: newFieldExtractor(fieldName)}
|
||||
}
|
||||
|
||||
// Feed 处理一段 delta,返回可展示文本。
|
||||
// 文本回复直接输出;JSON 中的目标字段值被提取输出;其余 JSON 结构丢弃。
|
||||
func (f *streamFilter) Feed(delta string) string {
|
||||
f.buf.Reset()
|
||||
for _, r := range delta {
|
||||
switch f.mode {
|
||||
case filterText:
|
||||
if r == '{' {
|
||||
f.thinking = false
|
||||
f.mode = filterJSON
|
||||
f.braceDepth = 1
|
||||
f.inString = false
|
||||
f.escJSON = false
|
||||
f.fieldExt.Reset()
|
||||
f.feedExtractor(r)
|
||||
} else {
|
||||
if !f.thinking {
|
||||
f.thinking = true
|
||||
f.buf.WriteString(ThinkingSep)
|
||||
}
|
||||
f.buf.WriteRune(r)
|
||||
}
|
||||
case filterJSON:
|
||||
f.feedExtractor(r)
|
||||
f.trackBraces(r)
|
||||
}
|
||||
}
|
||||
return f.buf.String()
|
||||
}
|
||||
|
||||
// feedExtractor 将单个字符喂给 fieldExt,提取结果写入 buf。
|
||||
func (f *streamFilter) feedExtractor(r rune) {
|
||||
if text := f.fieldExt.Feed(string(r)); text != "" {
|
||||
f.buf.WriteString(text)
|
||||
}
|
||||
}
|
||||
|
||||
// trackBraces 追踪 JSON 大括号深度,深度归零时切回文本模式。
|
||||
func (f *streamFilter) trackBraces(r rune) {
|
||||
if f.escJSON {
|
||||
f.escJSON = false
|
||||
return
|
||||
}
|
||||
if f.inString {
|
||||
switch r {
|
||||
case '\\':
|
||||
f.escJSON = true
|
||||
case '"':
|
||||
f.inString = false
|
||||
}
|
||||
return
|
||||
}
|
||||
switch r {
|
||||
case '"':
|
||||
f.inString = true
|
||||
case '{':
|
||||
f.braceDepth++
|
||||
case '}':
|
||||
f.braceDepth--
|
||||
if f.braceDepth <= 0 {
|
||||
f.mode = filterText
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset 重置状态。
|
||||
func (f *streamFilter) Reset() {
|
||||
f.mode = filterText
|
||||
f.braceDepth = 0
|
||||
f.inString = false
|
||||
f.escJSON = false
|
||||
f.thinking = false
|
||||
f.fieldExt.Reset()
|
||||
}
|
||||
194
app/stream_extract_test.go
Normal file
194
app/stream_extract_test.go
Normal file
@@ -0,0 +1,194 @@
|
||||
package app
|
||||
|
||||
import "testing"
|
||||
|
||||
// --- jsonFieldExtractor tests ---
|
||||
|
||||
func TestFieldExtractor_SingleFeed(t *testing.T) {
|
||||
e := newFieldExtractor("content")
|
||||
got := e.Feed(`{"chapter":1,"content":"hello world","mode":"write"}`)
|
||||
if got != "hello world" {
|
||||
t.Errorf("got %q, want %q", got, "hello world")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_CrossDelta(t *testing.T) {
|
||||
e := newFieldExtractor("content")
|
||||
var result string
|
||||
for _, d := range []string{`{"chapter":1,"con`, `tent":"`, `第三章`, `\n\n夜幕低垂`, `","mode":"write"}`} {
|
||||
result += e.Feed(d)
|
||||
}
|
||||
if want := "第三章\n\n夜幕低垂"; result != want {
|
||||
t.Errorf("got %q, want %q", result, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_JSONEscape(t *testing.T) {
|
||||
e := newFieldExtractor("content")
|
||||
got := e.Feed(`{"content":"line1\nline2\t\"quoted\"\\end"}`)
|
||||
if want := "line1\nline2\t\"quoted\"\\end"; got != want {
|
||||
t.Errorf("got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_NoTargetField(t *testing.T) {
|
||||
e := newFieldExtractor("content")
|
||||
got := e.Feed(`{"chapter":1,"summary":"test","characters":["A"]}`)
|
||||
if got != "" {
|
||||
t.Errorf("got %q, want empty", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_ColonWithSpace(t *testing.T) {
|
||||
e := newFieldExtractor("content")
|
||||
got := e.Feed(`{"content" : "spaced"}`)
|
||||
if got != "spaced" {
|
||||
t.Errorf("got %q, want %q", got, "spaced")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_Reset(t *testing.T) {
|
||||
e := newFieldExtractor("content")
|
||||
e.Feed(`{"content":"partial`)
|
||||
e.Reset()
|
||||
got := e.Feed(`{"content":"fresh"}`)
|
||||
if got != "fresh" {
|
||||
t.Errorf("got %q, want %q", got, "fresh")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_TaskField(t *testing.T) {
|
||||
e := newFieldExtractor("task")
|
||||
got := e.Feed(`{"agent":"writer","task":"写第1章。核心事件:林尘目睹斗法"}`)
|
||||
if want := "写第1章。核心事件:林尘目睹斗法"; got != want {
|
||||
t.Errorf("got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_TaskCrossDelta(t *testing.T) {
|
||||
e := newFieldExtractor("task")
|
||||
var result string
|
||||
for _, d := range []string{`{"agent":"writer","ta`, `sk":"写第`, `1章"}`, `extra`} {
|
||||
result += e.Feed(d)
|
||||
}
|
||||
if want := "写第1章"; result != want {
|
||||
t.Errorf("got %q, want %q", result, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFieldExtractor_Chinese(t *testing.T) {
|
||||
e := newFieldExtractor("content")
|
||||
var result string
|
||||
for _, d := range []string{`{"content":"`, `林远站在窗前,`, `望着远处的山峦。`, `"}`} {
|
||||
result += e.Feed(d)
|
||||
}
|
||||
if want := "林远站在窗前,望着远处的山峦。"; result != want {
|
||||
t.Errorf("got %q, want %q", result, want)
|
||||
}
|
||||
}
|
||||
|
||||
// --- streamFilter tests ---
|
||||
|
||||
func TestStreamFilter_TextPassthrough(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
got := f.Feed("好的,我来加载上下文信息。")
|
||||
if want := ThinkingSep + "好的,我来加载上下文信息。"; got != want {
|
||||
t.Errorf("got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_JSONExtractContent(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
got := f.Feed(`{"chapter":1,"content":"第一章 晨曦","mode":"write"}`)
|
||||
if want := "第一章 晨曦"; got != want {
|
||||
t.Errorf("got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_JSONNoContent(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
got := f.Feed(`{"chapter":1,"title":"暗流","goal":"揭示线索"}`)
|
||||
if got != "" {
|
||||
t.Errorf("got %q, want empty", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_TextThenJSON(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
var result string
|
||||
result += f.Feed("规划完成,开始写作。")
|
||||
result += f.Feed(`{"chapter":1,"content":"正文","mode":"write"}`)
|
||||
if want := ThinkingSep + "规划完成,开始写作。正文"; result != want {
|
||||
t.Errorf("got %q, want %q", result, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_JSONThenText(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
var result string
|
||||
result += f.Feed(`{"chapter":1,"summary":"摘要"}`)
|
||||
result += f.Feed("提交完成。")
|
||||
if want := ThinkingSep + "提交完成。"; result != want {
|
||||
t.Errorf("got %q, want %q", result, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_CrossDeltaMixed(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
var result string
|
||||
deltas := []string{
|
||||
"好的,开始",
|
||||
"写作。",
|
||||
`{"chapter":1`,
|
||||
`,"content":"`,
|
||||
"第一章",
|
||||
"\n\n正文",
|
||||
`","mode":"write"}`,
|
||||
"已写入。",
|
||||
}
|
||||
for _, d := range deltas {
|
||||
result += f.Feed(d)
|
||||
}
|
||||
want := ThinkingSep + "好的,开始写作。第一章\n\n正文" + ThinkingSep + "已写入。"
|
||||
if result != want {
|
||||
t.Errorf("got %q, want %q", result, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_NestedBraces(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
got := f.Feed(`{"summary":"摘要","foreshadow":[{"type":"plant"}]}`)
|
||||
if got != "" {
|
||||
t.Errorf("got %q, want empty (nested JSON should be fully consumed)", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_BracesInString(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
got := f.Feed(`{"content":"文中有{大括号}和\"引号\""}后续文本`)
|
||||
want := "文中有{大括号}和\"引号\"" + ThinkingSep + "后续文本"
|
||||
if got != want {
|
||||
t.Errorf("got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_Reset(t *testing.T) {
|
||||
f := newStreamFilter("content")
|
||||
f.Feed(`{"content":"半截`)
|
||||
f.Reset()
|
||||
got := f.Feed("重新开始")
|
||||
if want := ThinkingSep + "重新开始"; got != want {
|
||||
t.Errorf("got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamFilter_ThinkingMarkerOnce(t *testing.T) {
|
||||
// 连续文本只在段首插入一次标记
|
||||
f := newStreamFilter("content")
|
||||
var result string
|
||||
result += f.Feed("好的")
|
||||
result += f.Feed(",继续")
|
||||
if want := ThinkingSep + "好的,继续"; result != want {
|
||||
t.Errorf("got %q, want %q", result, want)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user