diff --git a/cmd/entire/cli/agent/agent.go b/cmd/entire/cli/agent/agent.go index 306f5fa04..afc7c3fe8 100644 --- a/cmd/entire/cli/agent/agent.go +++ b/cmd/entire/cli/agent/agent.go @@ -187,6 +187,49 @@ type TextGenerator interface { GenerateText(ctx context.Context, prompt string, model string) (string, error) } +// ProgressPhase identifies a coarse stage in streaming text generation. +type ProgressPhase string + +const ( + // PhaseConnecting is emitted once when the CLI signals it is making the upstream request. + PhaseConnecting ProgressPhase = "connecting" + // PhaseFirstToken is emitted once when the upstream responds with the first event, + // carrying TTFT and input/cache token counts. + PhaseFirstToken ProgressPhase = "first-token" + // PhaseGenerating is emitted repeatedly as text or thinking deltas arrive. + // OutputTokens carries a running estimate based on delta sizes. + PhaseGenerating ProgressPhase = "generating" + // PhaseDone is emitted once when the final result event is received without error. + PhaseDone ProgressPhase = "done" +) + +// GenerationProgress reports a snapshot of streaming text generation progress. +// Fields not relevant to the current Phase may be zero-valued. +type GenerationProgress struct { + Phase ProgressPhase + OutputTokens int // running estimate during PhaseGenerating; final at PhaseDone + InputTokens int // populated at PhaseFirstToken + CachedInputTokens int // populated at PhaseFirstToken + TTFTms int // time-to-first-token, populated at PhaseFirstToken + DurationMs int // populated at PhaseDone (final result event) +} + +// ProgressFn receives streaming progress updates. It must not block — invoke it +// from the same goroutine that reads the stream and keep handlers fast. +type ProgressFn func(GenerationProgress) + +// StreamingTextGenerator is an optional interface for text generators whose +// underlying CLI exposes a streaming output mode. Callers can use AsStreamingTextGenerator +// to detect support and fall back to plain GenerateText when unavailable. +type StreamingTextGenerator interface { + Agent + + // GenerateTextStreaming invokes the agent's streaming text generation and + // calls progress for each phase update. progress may be nil to suppress + // reporting. The returned string is the final response text. + GenerateTextStreaming(ctx context.Context, prompt, model string, progress ProgressFn) (string, error) +} + // HookResponseWriter is implemented by agents that support structured hook responses. // Agents that implement this can output messages (e.g., banners) to the user via // the agent's response protocol. For example, Claude Code outputs JSON with a diff --git a/cmd/entire/cli/agent/capabilities.go b/cmd/entire/cli/agent/capabilities.go index 3b302dc1d..6443f6b8e 100644 --- a/cmd/entire/cli/agent/capabilities.go +++ b/cmd/entire/cli/agent/capabilities.go @@ -21,6 +21,7 @@ type DeclaredCaps struct { TranscriptPreparer bool `json:"transcript_preparer"` TokenCalculator bool `json:"token_calculator"` TextGenerator bool `json:"text_generator"` + StreamingTextGenerator bool `json:"streaming_text_generator"` HookResponseWriter bool `json:"hook_response_writer"` SubagentAwareExtractor bool `json:"subagent_aware_extractor"` } @@ -105,6 +106,22 @@ func AsTextGenerator(ag Agent) (TextGenerator, bool) { //nolint:ireturn // type- return tg, true } +// AsStreamingTextGenerator returns the agent as StreamingTextGenerator if it both +// implements the interface and (for CapabilityDeclarer agents) has declared the capability. +func AsStreamingTextGenerator(ag Agent) (StreamingTextGenerator, bool) { //nolint:ireturn // type-assertion helper must return interface + if ag == nil { + return nil, false + } + stg, ok := ag.(StreamingTextGenerator) + if !ok { + return nil, false + } + if cd, ok := ag.(CapabilityDeclarer); ok { + return stg, cd.DeclaredCapabilities().StreamingTextGenerator + } + return stg, true +} + // AsHookResponseWriter returns the agent as HookResponseWriter if it both // implements the interface and (for CapabilityDeclarer agents) has declared the capability. func AsHookResponseWriter(ag Agent) (HookResponseWriter, bool) { //nolint:ireturn // type-assertion helper must return interface diff --git a/cmd/entire/cli/agent/capabilities_test.go b/cmd/entire/cli/agent/capabilities_test.go index 7ed1c79c2..02366b071 100644 --- a/cmd/entire/cli/agent/capabilities_test.go +++ b/cmd/entire/cli/agent/capabilities_test.go @@ -94,6 +94,20 @@ func (m *mockFullAgent) CalculateTotalTokenUsage([]byte, int, string) (*TokenUsa return nil, nil //nolint:nilnil // test mock } +// StreamingTextGenerator +func (m *mockFullAgent) GenerateTextStreaming(context.Context, string, string, ProgressFn) (string, error) { + return "", nil +} + +// mockBuiltinStreamingAgent is a built-in agent that implements StreamingTextGenerator but NOT CapabilityDeclarer. +type mockBuiltinStreamingAgent struct { + mockBaseAgent +} + +func (m *mockBuiltinStreamingAgent) GenerateTextStreaming(context.Context, string, string, ProgressFn) (string, error) { + return "", nil +} + // mockBuiltinPromptAgent is a built-in agent that implements PromptExtractor but NOT CapabilityDeclarer. type mockBuiltinPromptAgent struct { mockBaseAgent @@ -371,3 +385,43 @@ func TestAsPromptExtractor(t *testing.T) { } }) } + +func TestAsStreamingTextGenerator(t *testing.T) { + t.Parallel() + + t.Run("not implemented", func(t *testing.T) { + t.Parallel() + ag := &mockBaseAgent{} + _, ok := AsStreamingTextGenerator(ag) + if ok { + t.Error("expected false for agent not implementing StreamingTextGenerator") + } + }) + + t.Run("builtin agent", func(t *testing.T) { + t.Parallel() + ag := &mockBuiltinStreamingAgent{} + stg, ok := AsStreamingTextGenerator(ag) + if !ok || stg == nil { + t.Error("expected true for built-in agent implementing StreamingTextGenerator") + } + }) + + t.Run("declared true", func(t *testing.T) { + t.Parallel() + ag := &mockFullAgent{caps: DeclaredCaps{StreamingTextGenerator: true}} + stg, ok := AsStreamingTextGenerator(ag) + if !ok || stg == nil { + t.Error("expected true when capability declared true") + } + }) + + t.Run("declared false", func(t *testing.T) { + t.Parallel() + ag := &mockFullAgent{caps: DeclaredCaps{StreamingTextGenerator: false}} + _, ok := AsStreamingTextGenerator(ag) + if ok { + t.Error("expected false when capability declared false") + } + }) +} diff --git a/cmd/entire/cli/agent/claudecode/generate_streaming.go b/cmd/entire/cli/agent/claudecode/generate_streaming.go new file mode 100644 index 000000000..bd9326dfb --- /dev/null +++ b/cmd/entire/cli/agent/claudecode/generate_streaming.go @@ -0,0 +1,195 @@ +package claudecode + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/logging" +) + +// GenerateTextStreaming runs the Claude CLI in stream-json mode, dispatches +// progress events to the optional callback, and returns the final result text. +// Implements the agent.StreamingTextGenerator interface. +// +// If the CLI rejects the stream-json flags (older Claude CLI), this falls back +// to the non-streaming GenerateText path — without progress events. +func (c *ClaudeCodeAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + if model == "" { + model = "haiku" + } + + commandRunner := c.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + + cmd := commandRunner(ctx, "claude", + "--print", + "--output-format", "stream-json", + "--verbose", + "--model", model, + "--setting-sources", "") + + cmd.Dir = os.TempDir() + cmd.Env = agent.StripGitEnv(os.Environ()) + cmd.Stdin = strings.NewReader(prompt) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return "", fmt.Errorf("claude stream stdout pipe: %w", err) + } + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Start(); err != nil { + return "", fmt.Errorf("claude stream start: %w", err) + } + + final, malformed, parseErr := streamClaudeResponse(stdout, makeProgressDispatcher(progress)) + // Drain any unread stdout so the subprocess can exit cleanly even if the + // scanner aborted early (e.g. bufio.ErrTooLong on an oversized line). + // Without this, a blocked pipe would deadlock cmd.Wait() in interactive + // mode, which has no deadline. The copy error is intentionally discarded — + // stdout is about to be closed anyway. + if _, drainErr := io.Copy(io.Discard, stdout); drainErr != nil { + logging.Debug(ctx, "draining claude stream stdout", slog.String("error", drainErr.Error())) + } + waitErr := cmd.Wait() + + if malformed > 0 { + logging.Warn(ctx, "skipped malformed claude stream lines", + slog.Int("count", malformed)) + } + + // A specific envelope error outranks a generic ctx-cancel message: if + // the stream produced an is_error result just before cancellation, the + // user deserves the specific cause (auth, rate-limit, etc.) rather than + // "canceled". + if final != nil && final.IsError { + return "", envelopeErrorMessage(final) + } + + // Context errors pass through as sentinels so callers can use errors.Is. + if ctx.Err() != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return "", context.DeadlineExceeded + } + return "", context.Canceled + } + + if final != nil { + if final.Result == nil { + return "", errors.New("claude returned empty result") + } + if progress != nil { + progress(agent.GenerationProgress{ + Phase: agent.PhaseDone, + OutputTokens: outputTokensFromUsage(final.Usage), + DurationMs: final.DurationMs, + }) + } + return *final.Result, nil + } + + // No envelope: check if the CLI rejected streaming flags (older version). + // If so, fall back to the non-streaming path. + if waitErr != nil { + stderrStr := stderr.String() + if looksLikeUnrecognizedFlag(stderrStr) { + logging.Warn(ctx, "claude CLI rejected stream-json flags; falling back to non-streaming (no progress output)", + slog.String("stderr", strings.TrimSpace(stderrStr))) + return c.GenerateText(ctx, prompt, model) + } + if stderrStr != "" { + return "", fmt.Errorf("claude stream failed: %s: %w", strings.TrimSpace(stderrStr), waitErr) + } + return "", fmt.Errorf("claude stream failed: %w", waitErr) + } + + if parseErr != nil { + return "", fmt.Errorf("claude stream parse: %w", parseErr) + } + return "", errors.New("claude exited without producing a result") +} + +// envelopeErrorMessage formats an is_error result envelope as a user-facing +// error. Preserves the CLI's result text and HTTP status when present. +func envelopeErrorMessage(final *streamEvent) error { + msg := "claude CLI reported error" + if final.Result != nil && *final.Result != "" { + msg = fmt.Sprintf("%s: %s", msg, *final.Result) + } + if final.APIErrorStatus != nil { + msg = fmt.Sprintf("%s (HTTP %d)", msg, *final.APIErrorStatus) + } + return errors.New(msg) +} + +// makeProgressDispatcher returns a per-event handler that translates raw +// stream events into agent.GenerationProgress callbacks. PhaseDone is +// emitted by GenerateTextStreaming after cmd.Wait, because it needs data +// from the parsed final envelope (OutputTokens, DurationMs). +func makeProgressDispatcher(progress agent.ProgressFn) func(streamEvent) { + if progress == nil { + return func(streamEvent) {} // progress disabled; scanner itself reads the stream + } + var outputTokensEstimate int + return func(ev streamEvent) { + switch { + case ev.Type == "system" && ev.Subtype == "status" && ev.Status == "requesting": + progress(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + case ev.Type == "stream_event" && ev.Event.Type == "message_start": + p := agent.GenerationProgress{Phase: agent.PhaseFirstToken, TTFTms: ev.TTFTms} + if ev.Event.Message != nil && ev.Event.Message.Usage != nil { + p.InputTokens = ev.Event.Message.Usage.InputTokens + p.CachedInputTokens = ev.Event.Message.Usage.CacheReadInputTokens + } + progress(p) + case ev.Type == "stream_event" && ev.Event.Type == "content_block_delta" && ev.Event.Delta != nil: + text := ev.Event.Delta.Text + if text == "" { + text = ev.Event.Delta.Thinking + } + outputTokensEstimate += len(text) / 4 // rough estimate: ~4 chars/token + progress(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: outputTokensEstimate}) + } + } +} + +func outputTokensFromUsage(u *messageUsage) int { + if u == nil { + return 0 + } + return u.OutputTokens +} + +// looksLikeUnrecognizedFlag returns true if stderr indicates the CLI +// rejected one of the streaming-specific flags (older Claude CLI that +// doesn't support stream-json or --verbose). Requires both a rejection +// phrase AND a streaming flag name to avoid false-positives on unrelated +// errors that happen to contain "unknown option". +func looksLikeUnrecognizedFlag(stderr string) bool { + lower := strings.ToLower(stderr) + hasRejectPhrase := strings.Contains(lower, "unrecognized option") || + strings.Contains(lower, "unknown flag") || + strings.Contains(lower, "unknown option") || + strings.Contains(lower, "invalid option") + if !hasRejectPhrase { + return false + } + return strings.Contains(lower, "stream-json") || + strings.Contains(lower, "verbose") || + strings.Contains(lower, "include-partial") +} diff --git a/cmd/entire/cli/agent/claudecode/generate_streaming_test.go b/cmd/entire/cli/agent/claudecode/generate_streaming_test.go new file mode 100644 index 000000000..7c5c25d93 --- /dev/null +++ b/cmd/entire/cli/agent/claudecode/generate_streaming_test.go @@ -0,0 +1,184 @@ +package claudecode + +import ( + "context" + "errors" + "os/exec" + "strings" + "testing" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +func TestGenerateTextStreaming_SuccessEmitsPhases(t *testing.T) { + t.Parallel() + body := strings.Join([]string{ + `{"type":"system","subtype":"status","status":"requesting"}`, + `{"type":"stream_event","event":{"type":"message_start","message":{"usage":{"input_tokens":10,"cache_read_input_tokens":2000}}},"ttft_ms":1500}`, + `{"type":"stream_event","event":{"type":"content_block_delta","delta":{"type":"text_delta","text":"hello world"}}}`, + `{"type":"result","subtype":"success","is_error":false,"result":"hello world","duration_ms":1700}`, + }, "\n") + ag := newAgentWithStdout(body) + var phases []agent.ProgressPhase + got, err := ag.GenerateTextStreaming(context.Background(), "p", "", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("err = %v; want nil", err) + } + if got != "hello world" { + t.Fatalf("got = %q; want %q", got, "hello world") + } + want := []agent.ProgressPhase{agent.PhaseConnecting, agent.PhaseFirstToken, agent.PhaseGenerating, agent.PhaseDone} + if !slicesEqual(phases, want) { + t.Fatalf("phases = %v; want %v", phases, want) + } +} + +func TestGenerateTextStreaming_EnvelopeErrorReturnsError(t *testing.T) { + t.Parallel() + body := `{"type":"result","subtype":"success","is_error":true,"api_error_status":401,"result":"Auth required"}` + ag := newAgentWithStdout(body) + _, err := ag.GenerateTextStreaming(context.Background(), "p", "", nil) + if err == nil { + t.Fatal("err = nil; want non-nil for envelope error") + } + if !strings.Contains(err.Error(), "Auth required") { + t.Errorf("err = %v; want error mentioning result text", err) + } + if !strings.Contains(err.Error(), "401") { + t.Errorf("err = %v; want error mentioning HTTP status", err) + } +} + +func TestGenerateTextStreaming_FallsBackToLegacy(t *testing.T) { + t.Parallel() + // Simulate an older CLI that rejects stream-json, then succeeds on legacy json. + callCount := 0 + ag := &ClaudeCodeAgent{ + CommandRunner: func(ctx context.Context, _ string, args ...string) *exec.Cmd { + callCount++ + for _, a := range args { + if a == "stream-json" { + return exec.CommandContext(ctx, "sh", "-c", + "printf 'error: unknown option stream-json' 1>&2; exit 1") + } + } + return exec.CommandContext(ctx, "sh", "-c", + `printf '{"type":"result","result":"legacy result"}'`) + }, + } + got, err := ag.GenerateTextStreaming(context.Background(), "p", "haiku", nil) + if err != nil { + t.Fatalf("err = %v; want nil (legacy fallback should succeed)", err) + } + if got != "legacy result" { + t.Fatalf("got = %q; want %q", got, "legacy result") + } + if callCount < 2 { + t.Errorf("callCount = %d; want >= 2 (streaming + legacy fallback)", callCount) + } +} + +func TestGenerateTextStreaming_ContextCanceledPassesThrough(t *testing.T) { + t.Parallel() + ag := &ClaudeCodeAgent{ + CommandRunner: func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + return exec.CommandContext(ctx, "sh", "-c", "sleep 5") + }, + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := ag.GenerateTextStreaming(ctx, "p", "", nil) + if !errors.Is(err, context.Canceled) { + t.Errorf("err = %v; want context.Canceled", err) + } +} + +func TestGenerateTextStreaming_ContextDeadlineExceededPassesThrough(t *testing.T) { + t.Parallel() + ag := &ClaudeCodeAgent{ + CommandRunner: func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + return exec.CommandContext(ctx, "sh", "-c", "sleep 5") + }, + } + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + _, err := ag.GenerateTextStreaming(ctx, "p", "", nil) + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("err = %v; want context.DeadlineExceeded", err) + } +} + +func TestGenerateTextStreaming_EnvelopeErrorBeatsCtxCancel(t *testing.T) { + t.Parallel() + // A specific envelope error should be preserved even if ctx was canceled + // after the result event arrived — otherwise users see generic "canceled" + // instead of the specific failure (auth / rate-limit / HTTP 401 / etc.). + body := `{"type":"result","subtype":"success","is_error":true,"api_error_status":401,"result":"Auth required"}` + ctx, cancel := context.WithCancel(context.Background()) + cancel() // parent ctx is dead before the call + ag := &ClaudeCodeAgent{ + CommandRunner: func(_ context.Context, _ string, _ ...string) *exec.Cmd { + // Detach the subprocess from the canceled parent ctx so cmd.Start + // and the scan-to-EOF complete, letting us exercise the + // "envelope-present but ctx.Err() != nil" branch. + return exec.CommandContext(context.Background(), "sh", "-c", + "cat <<'ENDOFSTREAM'\n"+body+"\nENDOFSTREAM") + }, + } + _, err := ag.GenerateTextStreaming(ctx, "p", "", nil) + if err == nil { + t.Fatal("err = nil; want envelope error surfaced despite cancellation") + } + if !strings.Contains(err.Error(), "Auth required") { + t.Errorf("err = %q; want envelope result text preserved over cancellation", err) + } +} + +func TestLooksLikeUnrecognizedFlag(t *testing.T) { + t.Parallel() + tests := []struct { + name string + stderr string + want bool + }{ + {"stream-json unknown", "error: unknown option 'stream-json'", true}, + {"verbose unrecognized", "unrecognized option: --verbose", true}, + {"include-partial invalid", "invalid option: --include-partial-messages", true}, + {"unrelated unknown option", "error: unknown option 'foobar'", false}, + {"auth error", "Invalid API key", false}, + {"empty", "", false}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + if got := looksLikeUnrecognizedFlag(tc.stderr); got != tc.want { + t.Errorf("looksLikeUnrecognizedFlag(%q) = %v; want %v", tc.stderr, got, tc.want) + } + }) + } +} + +// newAgentWithStdout returns a ClaudeCodeAgent whose CommandRunner produces a +// subprocess that prints the given body to stdout and exits 0. +func newAgentWithStdout(body string) *ClaudeCodeAgent { + return &ClaudeCodeAgent{ + CommandRunner: func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + return exec.CommandContext(ctx, "sh", "-c", "cat <<'ENDOFSTREAM'\n"+body+"\nENDOFSTREAM") + }, + } +} + +func slicesEqual[T comparable](a, b []T) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/cmd/entire/cli/agent/claudecode/response.go b/cmd/entire/cli/agent/claudecode/response.go index a1e10ee1d..6c0556e53 100644 --- a/cmd/entire/cli/agent/claudecode/response.go +++ b/cmd/entire/cli/agent/claudecode/response.go @@ -1,9 +1,11 @@ package claudecode import ( + "bufio" "encoding/json" "errors" "fmt" + "io" ) type responseEnvelope struct { @@ -32,3 +34,85 @@ func parseGenerateTextResponse(stdout []byte) (string, error) { return "", errors.New("unsupported Claude CLI JSON response: missing result item") } + +// streamBufferMax bounds a single NDJSON line. Stream events can be large +// (init carries the full tool list; deltas can carry long thinking chunks) +// so we lift the default scanner limit substantially. +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// streamEvent represents one decoded line from the stream-json NDJSON output. +// Fields are populated based on the event Type/Subtype. +type streamEvent struct { + Type string `json:"type"` + Subtype string `json:"subtype"` + Status string `json:"status"` // e.g. "requesting" for type=system,subtype=status + Event streamInnerEvent `json:"event"` // for type=stream_event + + // Fields populated for type=result. + IsError bool `json:"is_error"` + APIErrorStatus *int `json:"api_error_status"` + Result *string `json:"result"` + DurationMs int `json:"duration_ms"` + TTFTms int `json:"ttft_ms,omitempty"` // time-to-first-token; on outer stream_event envelope + Usage *messageUsage `json:"usage"` +} + +// streamInnerEvent holds the nested "event" payload for type=stream_event. +type streamInnerEvent struct { + Type string `json:"type"` // "message_start" | "content_block_delta" | "message_delta" | ... + Delta *streamDelta `json:"delta,omitempty"` + Message *streamMessage `json:"message,omitempty"` +} + +// streamDelta carries the content-block delta payload. +type streamDelta struct { + Type string `json:"type"` // "text_delta" | "thinking_delta" | ... + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` +} + +// streamMessage is the partial-message payload on message_start. Only the +// usage field is currently consumed by callers; other fields are ignored. +type streamMessage struct { + Usage *messageUsage `json:"usage,omitempty"` +} + +// streamClaudeResponse reads NDJSON-encoded events from r, invokes onEvent +// for every successfully decoded event, and returns the final result event +// once the stream ends. Malformed lines are skipped to keep the stream +// resilient against single-line corruption; the count is returned so callers +// can log schema drift even on otherwise-successful runs. +func streamClaudeResponse(r io.Reader, onEvent func(streamEvent)) (*streamEvent, int, error) { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + var final *streamEvent + var malformedLines int + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev streamEvent + if err := json.Unmarshal(line, &ev); err != nil { + malformedLines++ + continue // best-effort: skip and keep streaming + } + if onEvent != nil { + onEvent(ev) + } + if ev.Type == "result" { + captured := ev + final = &captured + } + } + if err := scanner.Err(); err != nil { + return nil, malformedLines, fmt.Errorf("reading claude stream: %w", err) + } + if final == nil { + if malformedLines > 0 { + return nil, malformedLines, fmt.Errorf("claude stream ended without a result event (%d malformed lines skipped)", malformedLines) + } + return nil, 0, errors.New("claude stream ended without a result event") + } + return final, malformedLines, nil +} diff --git a/cmd/entire/cli/agent/claudecode/response_test.go b/cmd/entire/cli/agent/claudecode/response_test.go index 776e34f00..c6a026a09 100644 --- a/cmd/entire/cli/agent/claudecode/response_test.go +++ b/cmd/entire/cli/agent/claudecode/response_test.go @@ -1,6 +1,9 @@ package claudecode import ( + "bytes" + "errors" + "os" "strings" "testing" ) @@ -70,3 +73,98 @@ func TestParseGenerateTextResponse(t *testing.T) { }) } } + +func TestStreamClaudeResponse_SuccessFixture(t *testing.T) { + t.Parallel() + raw, err := os.ReadFile("testdata/stream_success.jsonl") + if err != nil { + t.Fatalf("read fixture: %v", err) + } + var phases []string + final, _, err := streamClaudeResponse(bytes.NewReader(raw), func(ev streamEvent) { + phases = append(phases, ev.Type+"/"+ev.Subtype+"/"+ev.Status) + }) + if err != nil { + t.Fatalf("streamClaudeResponse error = %v; want nil", err) + } + if final == nil { + t.Fatal("final event = nil; want result envelope") + } + const wantType = "result" + if final.Type != wantType { + t.Errorf("final.Type = %q; want %q", final.Type, wantType) + } + if final.IsError { + t.Errorf("final.IsError = true; want false for success fixture") + } + if final.Result == nil || *final.Result == "" { + t.Error("final.Result = nil/empty; want non-empty result string") + } + if len(phases) < 3 { + t.Errorf("observed %d events; want >= 3 phases", len(phases)) + } +} + +func TestStreamClaudeResponse_ErrorFixture(t *testing.T) { + t.Parallel() + raw, err := os.ReadFile("testdata/stream_error_404.jsonl") + if err != nil { + t.Fatalf("read fixture: %v", err) + } + final, _, err := streamClaudeResponse(bytes.NewReader(raw), nil) + if err != nil { + t.Fatalf("streamClaudeResponse error = %v; want nil (parsing succeeded)", err) + } + if final == nil { + t.Fatal("final event = nil; want result envelope with is_error:true") + } + if !final.IsError { + t.Error("final.IsError = false; want true for invalid-model fixture") + } + if final.APIErrorStatus == nil || *final.APIErrorStatus != 404 { + t.Errorf("APIErrorStatus = %v; want *404", final.APIErrorStatus) + } +} + +func TestStreamClaudeResponse_SkipsMalformedLine(t *testing.T) { + t.Parallel() + body := strings.Join([]string{ + `{"type":"system","subtype":"init"}`, + `not valid json at all`, + `{"type":"result","subtype":"success","is_error":false,"result":"ok"}`, + }, "\n") + final, malformed, err := streamClaudeResponse(strings.NewReader(body), nil) + if err != nil { + t.Fatalf("err = %v; want nil (malformed lines skipped)", err) + } + if final == nil || final.Result == nil || *final.Result != "ok" { + t.Errorf("final.Result = %v; want \"ok\"", final.Result) + } + if malformed != 1 { + t.Errorf("malformed = %d; want 1 (one invalid line)", malformed) + } +} + +func TestStreamClaudeResponse_NoFinalEvent(t *testing.T) { + t.Parallel() + body := `{"type":"system","subtype":"init"}` + final, _, err := streamClaudeResponse(strings.NewReader(body), nil) + if final != nil { + t.Errorf("final = %v; want nil when stream has no result event", final) + } + if err == nil { + t.Error("err = nil; want non-nil for stream with no result") + } +} + +func TestStreamClaudeResponse_ReaderError(t *testing.T) { + t.Parallel() + r := errReader{err: errors.New("boom")} + if _, _, err := streamClaudeResponse(r, nil); err == nil { + t.Error("err = nil; want propagated reader error") + } +} + +type errReader struct{ err error } + +func (e errReader) Read(_ []byte) (int, error) { return 0, e.err } diff --git a/cmd/entire/cli/agent/claudecode/testdata/stream_error_404.jsonl b/cmd/entire/cli/agent/claudecode/testdata/stream_error_404.jsonl new file mode 100644 index 000000000..fb3ca2821 --- /dev/null +++ b/cmd/entire/cli/agent/claudecode/testdata/stream_error_404.jsonl @@ -0,0 +1,4 @@ +{"type":"system","subtype":"init","cwd":"","session_id":"cf930b0a-5950-472c-9516-7d5bab135057","tools":["Task","AskUserQuestion","Bash","CronCreate","CronDelete","CronList","Edit","EnterPlanMode","EnterWorktree","ExitPlanMode","ExitWorktree","Glob","Grep","Monitor","NotebookEdit","PushNotification","Read","RemoteTrigger","ScheduleWakeup","Skill","TaskOutput","TaskStop","TodoWrite","ToolSearch","WebFetch","WebSearch","Write","mcp__claude_ai_Gmail__authenticate","mcp__claude_ai_Gmail__complete_authentication","mcp__claude_ai_Honeycomb__authenticate","mcp__claude_ai_Honeycomb__complete_authentication","mcp__claude_ai_Linear__create_attachment","mcp__claude_ai_Linear__create_document","mcp__claude_ai_Linear__create_issue_label","mcp__claude_ai_Linear__delete_attachment","mcp__claude_ai_Linear__delete_comment","mcp__claude_ai_Linear__extract_images","mcp__claude_ai_Linear__get_attachment","mcp__claude_ai_Linear__get_document","mcp__claude_ai_Linear__get_issue","mcp__claude_ai_Linear__get_issue_status","mcp__claude_ai_Linear__get_milestone","mcp__claude_ai_Linear__get_project","mcp__claude_ai_Linear__get_team","mcp__claude_ai_Linear__get_user","mcp__claude_ai_Linear__list_comments","mcp__claude_ai_Linear__list_cycles","mcp__claude_ai_Linear__list_documents","mcp__claude_ai_Linear__list_issue_labels","mcp__claude_ai_Linear__list_issue_statuses","mcp__claude_ai_Linear__list_issues","mcp__claude_ai_Linear__list_milestones","mcp__claude_ai_Linear__list_project_labels","mcp__claude_ai_Linear__list_projects","mcp__claude_ai_Linear__list_teams","mcp__claude_ai_Linear__list_users","mcp__claude_ai_Linear__save_comment","mcp__claude_ai_Linear__save_issue","mcp__claude_ai_Linear__save_milestone","mcp__claude_ai_Linear__save_project","mcp__claude_ai_Linear__search_documentation","mcp__claude_ai_Linear__update_document","mcp__claude_ai_Slack__authenticate","mcp__claude_ai_Slack__complete_authentication"],"mcp_servers":[{"name":"claude.ai Gmail","status":"needs-auth"},{"name":"claude.ai Slack","status":"needs-auth"},{"name":"claude.ai Honeycomb","status":"needs-auth"},{"name":"claude.ai Linear","status":"connected"}],"model":"nonexistent","permissionMode":"default","slash_commands":["update-config","debug","simplify","batch","loop","schedule","claude-api","compact","context","cost","heapdump","init","review","security-review","extra-usage","insights","team-onboarding"],"apiKeySource":"none","claude_code_version":"2.1.110","output_style":"default","agents":["general-purpose","statusline-setup","Explore","Plan"],"skills":["update-config","debug","simplify","batch","loop","schedule","claude-api"],"plugins":[],"uuid":"e2c75f6d-31ac-49db-a6f4-d2d306f91f57","memory_paths":{"auto":""},"fast_mode_state":"off"} +{"type":"system","subtype":"status","status":"requesting","uuid":"107dda39-d81a-455d-b122-65e2ce68c9fe","session_id":"cf930b0a-5950-472c-9516-7d5bab135057"} +{"type":"assistant","message":{"id":"823af606-7910-49be-9bbc-1e2a55c11d05","container":null,"model":"","role":"assistant","stop_reason":"stop_sequence","stop_sequence":"","type":"message","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"server_tool_use":{"web_search_requests":0,"web_fetch_requests":0},"service_tier":null,"cache_creation":{"ephemeral_1h_input_tokens":0,"ephemeral_5m_input_tokens":0},"inference_geo":null,"iterations":null,"speed":null},"content":[{"type":"text","text":"There's an issue with the selected model (nonexistent). It may not exist or you may not have access to it. Run --model to pick a different model."}],"context_management":null},"parent_tool_use_id":null,"session_id":"cf930b0a-5950-472c-9516-7d5bab135057","uuid":"cf0ec5ae-cece-4c5d-b767-51fbd9132a48","error":"invalid_request"} +{"type":"result","subtype":"success","is_error":true,"api_error_status":404,"duration_ms":584,"duration_api_ms":0,"num_turns":1,"result":"There's an issue with the selected model (nonexistent). It may not exist or you may not have access to it. Run --model to pick a different model.","stop_reason":"stop_sequence","session_id":"cf930b0a-5950-472c-9516-7d5bab135057","total_cost_usd":0,"usage":{"input_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":0,"server_tool_use":{"web_search_requests":0,"web_fetch_requests":0},"service_tier":"standard","cache_creation":{"ephemeral_1h_input_tokens":0,"ephemeral_5m_input_tokens":0},"inference_geo":"","iterations":[],"speed":"standard"},"modelUsage":{},"permission_denials":[],"terminal_reason":"completed","fast_mode_state":"off","uuid":"50d07b72-de6e-4526-ac6c-0d478c46f2c0"} diff --git a/cmd/entire/cli/agent/claudecode/testdata/stream_success.jsonl b/cmd/entire/cli/agent/claudecode/testdata/stream_success.jsonl new file mode 100644 index 000000000..6b2191df7 --- /dev/null +++ b/cmd/entire/cli/agent/claudecode/testdata/stream_success.jsonl @@ -0,0 +1,24 @@ +{"type":"system","subtype":"init","cwd":"","session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","tools":["Task","AskUserQuestion","Bash","CronCreate","CronDelete","CronList","Edit","EnterPlanMode","EnterWorktree","ExitPlanMode","ExitWorktree","Glob","Grep","Monitor","NotebookEdit","PushNotification","Read","RemoteTrigger","ScheduleWakeup","Skill","TaskOutput","TaskStop","TodoWrite","ToolSearch","WebFetch","WebSearch","Write","mcp__claude_ai_Gmail__authenticate","mcp__claude_ai_Gmail__complete_authentication","mcp__claude_ai_Honeycomb__authenticate","mcp__claude_ai_Honeycomb__complete_authentication","mcp__claude_ai_Linear__create_attachment","mcp__claude_ai_Linear__create_document","mcp__claude_ai_Linear__create_issue_label","mcp__claude_ai_Linear__delete_attachment","mcp__claude_ai_Linear__delete_comment","mcp__claude_ai_Linear__extract_images","mcp__claude_ai_Linear__get_attachment","mcp__claude_ai_Linear__get_document","mcp__claude_ai_Linear__get_issue","mcp__claude_ai_Linear__get_issue_status","mcp__claude_ai_Linear__get_milestone","mcp__claude_ai_Linear__get_project","mcp__claude_ai_Linear__get_team","mcp__claude_ai_Linear__get_user","mcp__claude_ai_Linear__list_comments","mcp__claude_ai_Linear__list_cycles","mcp__claude_ai_Linear__list_documents","mcp__claude_ai_Linear__list_issue_labels","mcp__claude_ai_Linear__list_issue_statuses","mcp__claude_ai_Linear__list_issues","mcp__claude_ai_Linear__list_milestones","mcp__claude_ai_Linear__list_project_labels","mcp__claude_ai_Linear__list_projects","mcp__claude_ai_Linear__list_teams","mcp__claude_ai_Linear__list_users","mcp__claude_ai_Linear__save_comment","mcp__claude_ai_Linear__save_issue","mcp__claude_ai_Linear__save_milestone","mcp__claude_ai_Linear__save_project","mcp__claude_ai_Linear__search_documentation","mcp__claude_ai_Linear__update_document","mcp__claude_ai_Slack__authenticate","mcp__claude_ai_Slack__complete_authentication"],"mcp_servers":[{"name":"claude.ai Gmail","status":"needs-auth"},{"name":"claude.ai Slack","status":"needs-auth"},{"name":"claude.ai Honeycomb","status":"needs-auth"},{"name":"claude.ai Linear","status":"connected"}],"model":"claude-haiku-4-5-20251001","permissionMode":"default","slash_commands":["update-config","debug","simplify","batch","loop","schedule","claude-api","compact","context","cost","heapdump","init","review","security-review","extra-usage","insights","team-onboarding"],"apiKeySource":"none","claude_code_version":"2.1.110","output_style":"default","agents":["general-purpose","statusline-setup","Explore","Plan"],"skills":["update-config","debug","simplify","batch","loop","schedule","claude-api"],"plugins":[],"uuid":"97b78fa0-0608-488e-8077-9d04885a1317","memory_paths":{"auto":""},"fast_mode_state":"off"} +{"type":"system","subtype":"status","status":"requesting","uuid":"816276d8-6625-4d81-8083-b83954841d47","session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425"} +{"type":"rate_limit_event","rate_limit_info":{"status":"rejected","resetsAt":1776308400,"rateLimitType":"five_hour","overageStatus":"allowed","overageResetsAt":1777593600,"isUsingOverage":true},"uuid":"f1f41e27-2cff-41ea-ad85-2d39901c5fe9","session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425"} +{"type":"stream_event","event":{"type":"message_start","message":{"model":"claude-haiku-4-5-20251001","id":"msg_01WYRi26EYPBZd6tuBrE49Xj","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":9,"cache_creation_input_tokens":37251,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":37251,"ephemeral_1h_input_tokens":0},"output_tokens":5,"service_tier":"standard","inference_geo":"not_available"}}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"57f1708d-ca19-4d0c-bb20-39071d8bfad8","ttft_ms":935} +{"type":"stream_event","event":{"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":"","signature":""}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"ec8fd42e-875d-4fc4-9278-41929f0d0838"} +{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"The user is asking me"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"b38b8603-c4d4-4cd5-b23b-2e65b286c96c"} +{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" to write 3 sentences about Go programming. This is a straightforward request that doesn't require any tool usage. I should provide informative sentences"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"599c8623-e1df-44ae-88ac-11a6feb19b45"} +{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" about Go.\n\nThis is not related to the software engineering task context of"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"5c76b517-623b-4aa3-8000-2e34b6107e8e"} +{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" the environment or any of the tools available. It's just a general knowledge question about Go programming.\n\nLet me write"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"0498a649-c720-4ded-89db-aa528f43958d"} +{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" 3 informative sentences about Go:"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"5c152806-c3a7-4573-8f8d-d85bfe81e6f5"} +{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"EtsECmMIDBgCKkB+zWRkkDVWFdrREUwI7qayCKA25pV5RqzrYH/T9mOWwsuQZ7xyPPaF4XF8cfsSNOYwlHOOYCTxwTgN1nbuSy4eMhljbGF1ZGUtaGFpa3UtNC01LTIwMjUxMDAxOAASDOsjlKHxxUP1s1pk+hoM752RVjrtaYwQlsrHIjAv2rdvoxaZ/DBZXCDyI91YaZq9yMmhzcTCa5Neog9KqkZkah8Gqwsh7hutiJ1ROlkqpQPvRo+35DF/ZBTksgydhc8JGZxNBtCucmfaQOpglMu3DMGYXYlLzmyEMF8M9zLWMEOhL/aT3rchD1aFAX4q4xVMoadeMeeiR61APZgakwHJU7mP9ctHyAT+IswDIHry/r9toQTzNDquhkbeeAGmi8UJtEocp++Oghwl9rcBOs9nmhkSqacnj6hqSdcQsKI7rcZV0xYR5EhVmpviFsffYGp4CkUYdxIg7DFdyujRnoKciOWmz7C73khgfuHnvCUIheHgFz6GTG5mV4hZ2vTDeswdHktq0g+Fm77UgSjn4KWNwcILP5kU38xZqdL24gDKU4Myv/ckiCS/OKWlXX3vd1pZpQyGP1AFlF4ku8JnkAf2CS0hrdVXUy34x9/ouNHvDESdbpz++hUlxR6BAj+iT9k2uhl8BikWXxGVZlurMu2Ax3A1zpjYCkKfSNQsLWkq2xk40n+g08uDNYU4vOKh92uATYUXdxLviwLPUsee/X/gHFAK7BNjn9E9QQ+uI+THPcbspWyYFDHHrYCvwqO5G0Ci+eD6VHN14QWJcfoYK+0ruDW9cXAcGAE="}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"c39faae7-433f-4714-9230-3cccbef55c59"} +{"type":"assistant","message":{"model":"claude-haiku-4-5-20251001","id":"msg_01WYRi26EYPBZd6tuBrE49Xj","type":"message","role":"assistant","content":[{"type":"thinking","thinking":"The user is asking me to write 3 sentences about Go programming. This is a straightforward request that doesn't require any tool usage. I should provide informative sentences about Go.\n\nThis is not related to the software engineering task context of the environment or any of the tools available. It's just a general knowledge question about Go programming.\n\nLet me write 3 informative sentences about Go:","signature":"EtsECmMIDBgCKkB+zWRkkDVWFdrREUwI7qayCKA25pV5RqzrYH/T9mOWwsuQZ7xyPPaF4XF8cfsSNOYwlHOOYCTxwTgN1nbuSy4eMhljbGF1ZGUtaGFpa3UtNC01LTIwMjUxMDAxOAASDOsjlKHxxUP1s1pk+hoM752RVjrtaYwQlsrHIjAv2rdvoxaZ/DBZXCDyI91YaZq9yMmhzcTCa5Neog9KqkZkah8Gqwsh7hutiJ1ROlkqpQPvRo+35DF/ZBTksgydhc8JGZxNBtCucmfaQOpglMu3DMGYXYlLzmyEMF8M9zLWMEOhL/aT3rchD1aFAX4q4xVMoadeMeeiR61APZgakwHJU7mP9ctHyAT+IswDIHry/r9toQTzNDquhkbeeAGmi8UJtEocp++Oghwl9rcBOs9nmhkSqacnj6hqSdcQsKI7rcZV0xYR5EhVmpviFsffYGp4CkUYdxIg7DFdyujRnoKciOWmz7C73khgfuHnvCUIheHgFz6GTG5mV4hZ2vTDeswdHktq0g+Fm77UgSjn4KWNwcILP5kU38xZqdL24gDKU4Myv/ckiCS/OKWlXX3vd1pZpQyGP1AFlF4ku8JnkAf2CS0hrdVXUy34x9/ouNHvDESdbpz++hUlxR6BAj+iT9k2uhl8BikWXxGVZlurMu2Ax3A1zpjYCkKfSNQsLWkq2xk40n+g08uDNYU4vOKh92uATYUXdxLviwLPUsee/X/gHFAK7BNjn9E9QQ+uI+THPcbspWyYFDHHrYCvwqO5G0Ci+eD6VHN14QWJcfoYK+0ruDW9cXAcGAE="}],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":9,"cache_creation_input_tokens":37251,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":37251,"ephemeral_1h_input_tokens":0},"output_tokens":5,"service_tier":"standard","inference_geo":"not_available"},"context_management":null},"parent_tool_use_id":null,"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","uuid":"de6647ac-eb10-40c9-adf8-e815f2beb07b"} +{"type":"stream_event","event":{"type":"content_block_stop","index":0},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"1d5a8e32-ad4d-4019-8cf2-521840cba0b5"} +{"type":"stream_event","event":{"type":"content_block_start","index":1,"content_block":{"type":"text","text":""}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"b908c007-9cb8-439e-bc53-a390eeb2a343"} +{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"Go is a statically-typed, compiled programming language created by Google that"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"f1368a50-491e-4c13-b12d-e07eed7425f4"} +{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":" emphasizes simplicity, concurrency, and fast compilation times. It features built-in support"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"ce4d0db5-35f1-4ad6-9913-20c222be3ef4"} +{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":" for concurrent programming through goroutines and channels, making it well-suited for building scalable systems and microservices. Go's minimal syntax and standard library provide"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"25ebccab-3994-4fdb-855d-06175c6cf776"} +{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":" everything needed for most common tasks, which helps developers write clean, maintainable code with"}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"a98ecc49-a307-4ef3-876a-796f901b1345"} +{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":" less boilerplate than languages like Java or C++."}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"b1d3fecc-080e-446b-9b5a-02be20b467fe"} +{"type":"assistant","message":{"model":"claude-haiku-4-5-20251001","id":"msg_01WYRi26EYPBZd6tuBrE49Xj","type":"message","role":"assistant","content":[{"type":"text","text":"Go is a statically-typed, compiled programming language created by Google that emphasizes simplicity, concurrency, and fast compilation times. It features built-in support for concurrent programming through goroutines and channels, making it well-suited for building scalable systems and microservices. Go's minimal syntax and standard library provide everything needed for most common tasks, which helps developers write clean, maintainable code with less boilerplate than languages like Java or C++."}],"stop_reason":null,"stop_sequence":null,"stop_details":null,"usage":{"input_tokens":9,"cache_creation_input_tokens":37251,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":37251,"ephemeral_1h_input_tokens":0},"output_tokens":5,"service_tier":"standard","inference_geo":"not_available"},"context_management":null},"parent_tool_use_id":null,"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","uuid":"fa5e82d1-f560-497b-ba3e-2a965a4a8fd3"} +{"type":"stream_event","event":{"type":"content_block_stop","index":1},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"fd5b5d18-f122-4aac-9e87-6b7813ebf77f"} +{"type":"stream_event","event":{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null,"stop_details":null},"usage":{"input_tokens":9,"cache_creation_input_tokens":37251,"cache_read_input_tokens":0,"output_tokens":191,"iterations":[{"input_tokens":9,"output_tokens":191,"cache_read_input_tokens":0,"cache_creation_input_tokens":37251,"cache_creation":{"ephemeral_5m_input_tokens":37251,"ephemeral_1h_input_tokens":0},"type":"message"}]},"context_management":{"applied_edits":[]}},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"e4bf6cdc-e938-464e-987a-5d597b15ae47"} +{"type":"stream_event","event":{"type":"message_stop"},"session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","parent_tool_use_id":null,"uuid":"fdc9b469-a6c2-4824-80cb-aa14dc73527f"} +{"type":"result","subtype":"success","is_error":false,"api_error_status":null,"duration_ms":2509,"duration_api_ms":3176,"num_turns":1,"result":"Go is a statically-typed, compiled programming language created by Google that emphasizes simplicity, concurrency, and fast compilation times. It features built-in support for concurrent programming through goroutines and channels, making it well-suited for building scalable systems and microservices. Go's minimal syntax and standard library provide everything needed for most common tasks, which helps developers write clean, maintainable code with less boilerplate than languages like Java or C++.","stop_reason":"end_turn","session_id":"b1ad535f-7fc5-43ae-94fa-1382b6c13425","total_cost_usd":0.04793475,"usage":{"input_tokens":9,"cache_creation_input_tokens":37251,"cache_read_input_tokens":0,"output_tokens":191,"server_tool_use":{"web_search_requests":0,"web_fetch_requests":0},"service_tier":"standard","cache_creation":{"ephemeral_1h_input_tokens":0,"ephemeral_5m_input_tokens":37251},"inference_geo":"","iterations":[{"input_tokens":9,"output_tokens":191,"cache_read_input_tokens":0,"cache_creation_input_tokens":37251,"cache_creation":{"ephemeral_5m_input_tokens":37251,"ephemeral_1h_input_tokens":0},"type":"message"}],"speed":"standard"},"modelUsage":{"claude-haiku-4-5-20251001":{"inputTokens":356,"outputTokens":203,"cacheReadInputTokens":0,"cacheCreationInputTokens":37251,"webSearchRequests":0,"costUSD":0.04793475,"contextWindow":200000,"maxOutputTokens":32000}},"permission_denials":[],"terminal_reason":"completed","fast_mode_state":"off","uuid":"e8da9aa8-7c8b-4710-9ce4-c685aa4d3084"} diff --git a/cmd/entire/cli/explain.go b/cmd/entire/cli/explain.go index eaef81571..b7843f9e9 100644 --- a/cmd/entire/cli/explain.go +++ b/cmd/entire/cli/explain.go @@ -11,6 +11,7 @@ import ( "runtime" "sort" "strings" + "sync/atomic" "time" "github.com/entireio/cli/cmd/entire/cli/agent" @@ -37,11 +38,31 @@ import ( "golang.org/x/term" ) -const defaultCheckpointSummaryTimeout = 30 * time.Second +var generateTranscriptSummary = summarize.GenerateFromTranscript -var checkpointSummaryTimeout = defaultCheckpointSummaryTimeout +// deadlineMode describes how the summary deadline should be applied. +type deadlineMode int -var generateTranscriptSummary = summarize.GenerateFromTranscript +const ( + deadlineModeNone deadlineMode = iota // interactive TTY: no deadline, user Ctrl+C + deadlineModeIdle // non-interactive default: idle-based watchdog +) + +// summaryDeadlineResult is the resolved deadline policy for a single +// summary generation, computed once from TTY state. +type summaryDeadlineResult struct { + mode deadlineMode + duration time.Duration // zero when mode == deadlineModeNone +} + +// summaryDeadlineResolver is package-level so tests can substitute the +// resolution policy without touching stderr. +var summaryDeadlineResolver = func(errW io.Writer) summaryDeadlineResult { + if isTerminalWriter(errW) { + return summaryDeadlineResult{mode: deadlineModeNone} + } + return summaryDeadlineResult{mode: deadlineModeIdle, duration: defaultNonTTYSummaryDeadline} +} // interaction holds a single prompt and its responses for display. type interaction struct { @@ -516,10 +537,10 @@ func generateCheckpointSummary(ctx context.Context, w, errW io.Writer, v1Store * // Generate summary using shared helper logging.Info(ctx, "generating checkpoint summary") if errW != nil { - fmt.Fprintln(errW, "Generating checkpoint summary...") + fmt.Fprintf(errW, "Generating checkpoint summary... (transcript: %s)\n", humanizeBytes(len(scopedTranscript))) } - summary, err := generateCheckpointAISummary(ctx, scopedTranscript, cpSummary.FilesTouched, content.Metadata.Agent, provider.Generator) + summary, err := generateCheckpointAISummary(ctx, errW, scopedTranscript, cpSummary.FilesTouched, content.Metadata.Agent, provider.Generator) if err != nil { return fmt.Errorf("failed to generate summary: %w", err) } @@ -555,22 +576,58 @@ func generateCheckpointSummary(ctx context.Context, w, errW io.Writer, v1Store * return nil } -func generateCheckpointAISummary(ctx context.Context, scopedTranscript []byte, filesTouched []string, agentType types.AgentType, generator summarize.Generator) (*checkpoint.Summary, error) { - timeoutCtx, cancel := context.WithTimeout(ctx, checkpointSummaryTimeout) - timeoutDuration := checkpointSummaryTimeout - if deadline, ok := timeoutCtx.Deadline(); ok { - timeoutDuration = time.Until(deadline) - } - defer cancel() - - // scopedTranscript is read from checkpoint storage, which redacts on write. - summary, err := generateTranscriptSummary(timeoutCtx, redact.AlreadyRedacted(scopedTranscript), filesTouched, agentType, generator) +func generateCheckpointAISummary(ctx context.Context, errW io.Writer, scopedTranscript []byte, filesTouched []string, agentType types.AgentType, generator summarize.Generator) (*checkpoint.Summary, error) { + if errW == nil { + errW = io.Discard + } + progressWriter := newSummaryProgressWriter(errW) + + // Resolve deadline once based on TTY state. + resolved := summaryDeadlineResolver(errW) + + runCtx := ctx + var idleFired *atomic.Bool + if resolved.mode == deadlineModeIdle { + // Layer a hard wall-clock cap on top of the idle watchdog so a + // degraded-but-chatty Claude CLI cannot hold a CI job indefinitely. + wallCtx, wallCancel := context.WithTimeout(ctx, maxWallClockSummaryDeadline) + defer wallCancel() + var stop func() + runCtx, idleFired, stop = startIdleWatchdog(wallCtx, resolved.duration, &progressWriter.lastActivity) + defer stop() + } + + // Bump lastActivity immediately before the call so the idle watchdog's + // clock starts fresh at the actual API call start, not at + // progress-writer construction. + progressWriter.lastActivity.Store(time.Now().UnixNano()) + + summary, err := generateTranscriptSummary( + runCtx, + redact.AlreadyRedacted(scopedTranscript), + filesTouched, + agentType, + generator, + agent.ProgressFn(func(p agent.GenerationProgress) { progressWriter.handle(p) }), + ) if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(timeoutCtx.Err(), context.Canceled) { - return nil, fmt.Errorf("summary generation canceled: %w", context.Canceled) + // Idle watchdog fires via context.WithCancel (produces Canceled, not + // DeadlineExceeded). Check the fired flag first so CI users see the + // diagnostic timeout message instead of "summary generation canceled". + if idleFired != nil && idleFired.Load() { + return nil, fmt.Errorf("summary generation timed out after %s idle: %w", formatSummaryTimeout(resolved.duration), context.DeadlineExceeded) + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(runCtx.Err(), context.DeadlineExceeded) { + // In idle mode, DeadlineExceeded comes from the wall-clock cap, + // not the idle watchdog (which fires via Canceled, caught above). + duration := resolved.duration + if resolved.mode == deadlineModeIdle { + duration = maxWallClockSummaryDeadline + } + return nil, fmt.Errorf("summary generation timed out after %s: %w", formatSummaryTimeout(duration), context.DeadlineExceeded) } - if errors.Is(err, context.DeadlineExceeded) || errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) { - return nil, fmt.Errorf("summary generation timed out after %s: %w", formatSummaryTimeout(timeoutDuration), context.DeadlineExceeded) + if errors.Is(err, context.Canceled) || errors.Is(runCtx.Err(), context.Canceled) { + return nil, fmt.Errorf("summary generation canceled: %w", context.Canceled) } return nil, err } @@ -578,6 +635,140 @@ func generateCheckpointAISummary(ctx context.Context, scopedTranscript []byte, f return summary, nil } +const defaultNonTTYSummaryDeadline = 5 * time.Minute + +// maxWallClockSummaryDeadline is the absolute upper bound for summary +// generation, even when the idle watchdog keeps getting refreshed. +// Prevents a degraded but chatty Claude CLI from holding CI jobs open +// indefinitely. +const maxWallClockSummaryDeadline = 30 * time.Minute + +// startIdleWatchdog cancels the returned context if more than idle elapses +// between successive updates to *lastActivity. It is a no-op when idle <= 0. +// stop() must be called in a defer to release the goroutine; fired reports +// whether the watchdog was the reason for cancellation. +func startIdleWatchdog(parent context.Context, idle time.Duration, lastActivity *atomic.Int64) (ctx context.Context, fired *atomic.Bool, stop func()) { + fired = &atomic.Bool{} + if idle <= 0 { + return parent, fired, func() {} + } + ctx, cancel := context.WithCancel(parent) + done := make(chan struct{}) + go func() { + poll := idle / 4 + if poll < time.Millisecond { + poll = time.Millisecond + } + ticker := time.NewTicker(poll) + defer ticker.Stop() + for { + select { + case <-ticker.C: + ts := lastActivity.Load() + if ts > 0 && time.Since(time.Unix(0, ts)) >= idle { + fired.Store(true) + cancel() + return + } + case <-done: + return + case <-parent.Done(): + return + } + } + }() + return ctx, fired, func() { close(done); cancel() } +} + +// summaryProgressWriter renders agent.GenerationProgress events to the +// configured writer using the existing statusStyles helper. On a TTY (and +// outside ACCESSIBLE mode) it rewrites the current line for the running +// token count; otherwise it appends one line per non-deduplicated event. +type summaryProgressWriter struct { + w io.Writer + inplace bool + lastActivity atomic.Int64 // Unix-nano timestamp of last progress event + lastLine string + arrow string // precomputed styled glyph + check string // precomputed styled glyph +} + +func newSummaryProgressWriter(w io.Writer) *summaryProgressWriter { + styles := newStatusStyles(w) + pw := &summaryProgressWriter{ + w: w, + inplace: isTerminalWriter(w) && !IsAccessibleMode(), + arrow: styles.render(styles.cyan, "→"), + check: styles.render(styles.green, "✓"), + } + pw.lastActivity.Store(time.Now().UnixNano()) + return pw +} + +func (s *summaryProgressWriter) handle(p agent.GenerationProgress) { + s.lastActivity.Store(time.Now().UnixNano()) + switch p.Phase { + case agent.PhaseConnecting: + s.printLine(s.arrow + " Sending request to Anthropic...") + case agent.PhaseFirstToken: + s.printLine(fmt.Sprintf( + "%s Anthropic responded (TTFT %s, %s cached input tokens) -- generating...", + s.arrow, formatMs(p.TTFTms), formatTokenCount(p.CachedInputTokens))) + case agent.PhaseGenerating: + s.updateLine(fmt.Sprintf( + "%s Writing summary... (~%s tokens)", + s.arrow, formatTokenCount(p.OutputTokens))) + case agent.PhaseDone: + s.printLine(fmt.Sprintf( + "%s Summary generated (%s, %s output tokens)", + s.check, formatMs(p.DurationMs), formatTokenCount(p.OutputTokens))) + } +} + +func (s *summaryProgressWriter) printLine(line string) { + if s.inplace && s.lastLine != "" { + fmt.Fprint(s.w, "\r\033[2K") + } + fmt.Fprintln(s.w, line) + s.lastLine = "" +} + +func (s *summaryProgressWriter) updateLine(line string) { + if s.lastLine == line { + return + } + if s.inplace { + fmt.Fprintf(s.w, "\r\033[2K%s", line) + } else { + fmt.Fprintln(s.w, line) + } + s.lastLine = line +} + +// formatMs formats a millisecond count as "1.5s" / "120ms". +func formatMs(ms int) string { + if ms < 1000 { + return fmt.Sprintf("%dms", ms) + } + return fmt.Sprintf("%.1fs", float64(ms)/1000.0) +} + +// humanizeBytes formats a byte count as a short human-readable string +// (e.g., 0 B, 500 B, 1.5 KB, 47 KB, 1.2 MB). Uses 1024-based units. +func humanizeBytes(n int) string { + const unit = 1024 + if n < unit { + return fmt.Sprintf("%d B", n) + } + div, exp := int64(unit), 0 + for x := int64(n) / unit; x >= unit; x /= unit { + div *= unit + exp++ + } + suffix := []string{"KB", "MB", "GB", "TB"}[exp] + return fmt.Sprintf("%.1f %s", float64(n)/float64(div), suffix) +} + func formatSummaryTimeout(d time.Duration) string { if d < 0 { d = 0 diff --git a/cmd/entire/cli/explain_test.go b/cmd/entire/cli/explain_test.go index cb94c3133..fdbcef95d 100644 --- a/cmd/entire/cli/explain_test.go +++ b/cmd/entire/cli/explain_test.go @@ -4,10 +4,12 @@ import ( "bytes" "context" "errors" + "io" "os" "os/exec" "path/filepath" "strings" + "sync/atomic" "testing" "time" @@ -103,145 +105,90 @@ func TestExplainCmd_RejectsPositionalArgs(t *testing.T) { } } -func TestGenerateCheckpointAISummary_AddsDefaultTimeoutWithoutParentDeadline(t *testing.T) { - tmpTimeout := checkpointSummaryTimeout +func TestGenerateCheckpointAISummary_NoDeadlineWhenInteractive(t *testing.T) { + tmpResolver := summaryDeadlineResolver tmpGenerator := generateTranscriptSummary t.Cleanup(func() { - checkpointSummaryTimeout = tmpTimeout + summaryDeadlineResolver = tmpResolver generateTranscriptSummary = tmpGenerator }) - checkpointSummaryTimeout = 50 * time.Millisecond + summaryDeadlineResolver = func(io.Writer) summaryDeadlineResult { + return summaryDeadlineResult{mode: deadlineModeNone} + } - var gotDeadline time.Time + var hadDeadline bool generateTranscriptSummary = func( ctx context.Context, _ redact.RedactedBytes, _ []string, _ types.AgentType, _ summarize.Generator, + _ agent.ProgressFn, ) (*checkpoint.Summary, error) { - deadline, ok := ctx.Deadline() - if !ok { - return nil, errors.New("expected deadline on summary context") - } - gotDeadline = deadline + _, hadDeadline = ctx.Deadline() return &checkpoint.Summary{Intent: "intent", Outcome: "outcome"}, nil } - start := time.Now() - summary, err := generateCheckpointAISummary(context.Background(), []byte("transcript"), nil, agent.AgentTypeClaudeCode, nil) + summary, err := generateCheckpointAISummary(context.Background(), io.Discard, []byte("transcript"), nil, agent.AgentTypeClaudeCode, nil) if err != nil { t.Fatalf("generateCheckpointAISummary() error = %v", err) } if summary == nil { t.Fatal("expected summary") } - if gotDeadline.IsZero() { - t.Fatal("expected deadline to be set") - } - if remaining := gotDeadline.Sub(start); remaining < 30*time.Millisecond || remaining > 200*time.Millisecond { - t.Fatalf("deadline offset = %s, want around %s", remaining, checkpointSummaryTimeout) + if hadDeadline { + t.Error("interactive mode should run without a deadline") } } -func TestGenerateCheckpointAISummary_UsesParentDeadlineAndWrapsSentinel(t *testing.T) { - tmpTimeout := checkpointSummaryTimeout +func TestGenerateCheckpointAISummary_IdleDeadlineWrapsSentinel(t *testing.T) { + tmpResolver := summaryDeadlineResolver tmpGenerator := generateTranscriptSummary t.Cleanup(func() { - checkpointSummaryTimeout = tmpTimeout + summaryDeadlineResolver = tmpResolver generateTranscriptSummary = tmpGenerator }) - checkpointSummaryTimeout = 30 * time.Second - - parentCtx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) - defer cancel() - parentDeadline, _ := parentCtx.Deadline() + summaryDeadlineResolver = func(io.Writer) summaryDeadlineResult { + return summaryDeadlineResult{mode: deadlineModeIdle, duration: 20 * time.Millisecond} + } - var gotDeadline time.Time generateTranscriptSummary = func( ctx context.Context, _ redact.RedactedBytes, _ []string, _ types.AgentType, _ summarize.Generator, + _ agent.ProgressFn, ) (*checkpoint.Summary, error) { - gotDeadline, _ = ctx.Deadline() <-ctx.Done() return nil, ctx.Err() } - _, err := generateCheckpointAISummary(parentCtx, []byte("transcript"), nil, agent.AgentTypeClaudeCode, nil) + _, err := generateCheckpointAISummary(context.Background(), io.Discard, []byte("transcript"), nil, agent.AgentTypeClaudeCode, nil) if err == nil { t.Fatal("expected timeout error") } if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("expected DeadlineExceeded, got %v", err) } - if gotDeadline.IsZero() { - t.Fatal("expected deadline to be captured") - } - if delta := gotDeadline.Sub(parentDeadline); delta < -5*time.Millisecond || delta > 5*time.Millisecond { - t.Fatalf("deadline delta = %s, want near 0", delta) - } - if strings.Contains(err.Error(), "30s") { - t.Fatalf("timeout error should not report default timeout when parent deadline fired: %v", err) + if !strings.Contains(err.Error(), "idle") { + t.Errorf("expected message mentioning idle watchdog, got %q", err) } } -func TestGenerateCheckpointAISummary_ClampsLongParentDeadlineToDefaultTimeout(t *testing.T) { - tmpTimeout := checkpointSummaryTimeout +func TestGenerateCheckpointAISummary_UsesCancellationSentinel(t *testing.T) { + tmpResolver := summaryDeadlineResolver tmpGenerator := generateTranscriptSummary t.Cleanup(func() { - checkpointSummaryTimeout = tmpTimeout + summaryDeadlineResolver = tmpResolver generateTranscriptSummary = tmpGenerator }) - checkpointSummaryTimeout = 50 * time.Millisecond - - parentCtx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - var gotDeadline time.Time - generateTranscriptSummary = func( - ctx context.Context, - _ redact.RedactedBytes, - _ []string, - _ types.AgentType, - _ summarize.Generator, - ) (*checkpoint.Summary, error) { - deadline, ok := ctx.Deadline() - if !ok { - return nil, errors.New("expected deadline on summary context") - } - gotDeadline = deadline - return &checkpoint.Summary{Intent: "intent", Outcome: "outcome"}, nil - } - - start := time.Now() - summary, err := generateCheckpointAISummary(parentCtx, []byte("transcript"), nil, agent.AgentTypeClaudeCode, nil) - if err != nil { - t.Fatalf("generateCheckpointAISummary() error = %v", err) - } - if summary == nil { - t.Fatal("expected summary") + summaryDeadlineResolver = func(io.Writer) summaryDeadlineResult { + return summaryDeadlineResult{mode: deadlineModeNone} } - if gotDeadline.IsZero() { - t.Fatal("expected deadline to be set") - } - if remaining := gotDeadline.Sub(start); remaining < 30*time.Millisecond || remaining > 200*time.Millisecond { - t.Fatalf("deadline offset = %s, want around %s", remaining, checkpointSummaryTimeout) - } -} - -func TestGenerateCheckpointAISummary_UsesCancellationSentinel(t *testing.T) { - tmpTimeout := checkpointSummaryTimeout - tmpGenerator := generateTranscriptSummary - t.Cleanup(func() { - checkpointSummaryTimeout = tmpTimeout - generateTranscriptSummary = tmpGenerator - }) parentCtx, cancel := context.WithCancel(context.Background()) @@ -251,13 +198,14 @@ func TestGenerateCheckpointAISummary_UsesCancellationSentinel(t *testing.T) { _ []string, _ types.AgentType, _ summarize.Generator, + _ agent.ProgressFn, ) (*checkpoint.Summary, error) { cancel() <-ctx.Done() return nil, ctx.Err() } - _, err := generateCheckpointAISummary(parentCtx, []byte("transcript"), nil, agent.AgentTypeClaudeCode, nil) + _, err := generateCheckpointAISummary(parentCtx, io.Discard, []byte("transcript"), nil, agent.AgentTypeClaudeCode, nil) if err == nil { t.Fatal("expected cancellation error") } @@ -4860,3 +4808,109 @@ func createCommitWithTree(t *testing.T, repo *git.Repository, treeHash plumbing. } return hash } + +func TestSummaryProgressWriter_AllPhases(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + w := newSummaryProgressWriter(&buf) + // Force non-inplace mode so output is deterministic regardless of test environment. + w.inplace = false + + w.handle(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + w.handle(agent.GenerationProgress{Phase: agent.PhaseFirstToken, TTFTms: 1500, CachedInputTokens: 35900}) + w.handle(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: 200}) + w.handle(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: 1240}) + w.handle(agent.GenerationProgress{Phase: agent.PhaseDone, DurationMs: 3140, OutputTokens: 1890}) + + out := buf.String() + for _, want := range []string{ + "Sending request to Anthropic", + "Anthropic responded", + "1.5s", // TTFT + "35.9k", // cached input tokens + "Writing summary", + "1.2k", // generating tokens + "Summary generated", + "3.1s", // duration + "1.9k", // output tokens + } { + if !strings.Contains(out, want) { + t.Errorf("output missing %q\nfull output:\n%s", want, out) + } + } +} + +func TestSummaryProgressWriter_DedupsGenerating(t *testing.T) { + t.Parallel() + var buf bytes.Buffer + w := newSummaryProgressWriter(&buf) + w.inplace = false + w.handle(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: 100}) + w.handle(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: 100}) + w.handle(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: 200}) + + lines := strings.Split(strings.TrimSpace(buf.String()), "\n") + if len(lines) != 2 { + t.Errorf("got %d lines; want 2 (dedup of identical Generating updates)\nlines: %v", len(lines), lines) + } +} + +func TestSummaryProgressWriter_LastActivityTracked(t *testing.T) { + t.Parallel() + w := newSummaryProgressWriter(io.Discard) + before := time.Now().UnixNano() + w.handle(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: 1}) + got := w.lastActivity.Load() + if got < before { + t.Errorf("lastActivity = %d; want >= %d", got, before) + } +} + +func TestStartIdleWatchdog_FiresOnStaleness(t *testing.T) { + t.Parallel() + var last atomic.Int64 + last.Store(time.Now().UnixNano()) + ctx, fired, stop := startIdleWatchdog(context.Background(), 30*time.Millisecond, &last) + defer stop() + select { + case <-ctx.Done(): + if !fired.Load() { + t.Error("ctx canceled but watchdog flag not set") + } + case <-time.After(500 * time.Millisecond): + t.Fatal("watchdog did not fire within 500ms (idle=30ms)") + } +} + +func TestStartIdleWatchdog_DoesNotFireWhenBumped(t *testing.T) { + t.Parallel() + var last atomic.Int64 + last.Store(time.Now().UnixNano()) + ctx, fired, stop := startIdleWatchdog(context.Background(), 50*time.Millisecond, &last) + defer stop() + deadline := time.Now().Add(200 * time.Millisecond) + for time.Now().Before(deadline) { + last.Store(time.Now().UnixNano()) + time.Sleep(10 * time.Millisecond) + } + if fired.Load() { + t.Error("watchdog fired despite continuous bumps") + } + if ctx.Err() != nil { + t.Errorf("ctx.Err() = %v; want nil", ctx.Err()) + } +} + +func TestStartIdleWatchdog_ZeroIdleIsNoop(t *testing.T) { + t.Parallel() + var last atomic.Int64 + last.Store(time.Now().UnixNano()) + ctx, _, stop := startIdleWatchdog(context.Background(), 0, &last) + defer stop() + select { + case <-ctx.Done(): + t.Error("ctx canceled; want unaffected with idle=0") + case <-time.After(100 * time.Millisecond): + // OK + } +} diff --git a/cmd/entire/cli/strategy/manual_commit_condensation.go b/cmd/entire/cli/strategy/manual_commit_condensation.go index 1bc32abf6..122f44466 100644 --- a/cmd/entire/cli/strategy/manual_commit_condensation.go +++ b/cmd/entire/cli/strategy/manual_commit_condensation.go @@ -457,7 +457,7 @@ func generateSummary(ctx context.Context, redactedTranscript redact.RedactedByte generator := buildSummaryGenerator(summarizeCtx) // scopedTranscript is sliced from redactedTranscript, which was redacted earlier in CondenseSession. - summary, err := summarize.GenerateFromTranscript(summarizeCtx, redact.AlreadyRedacted(scopedTranscript), filesTouched, state.AgentType, generator) + summary, err := summarize.GenerateFromTranscript(summarizeCtx, redact.AlreadyRedacted(scopedTranscript), filesTouched, state.AgentType, generator, nil) if err != nil { logging.Warn(summarizeCtx, "summary generation failed", slog.String("session_id", state.SessionID), diff --git a/cmd/entire/cli/summarize/claude.go b/cmd/entire/cli/summarize/claude.go index cebc85a66..fb91c13b2 100644 --- a/cmd/entire/cli/summarize/claude.go +++ b/cmd/entire/cli/summarize/claude.go @@ -67,6 +67,10 @@ type ClaudeGenerator struct { // Model is the Claude model to use for summarization. // If empty, defaults to DefaultModel ("sonnet"). Model string + + // Progress receives streaming progress events when the underlying agent + // implements agent.StreamingTextGenerator. Optional; nil suppresses reporting. + Progress agent.ProgressFn } // Generate creates a summary from checkpoint data by calling the Claude CLI. @@ -91,7 +95,13 @@ func (g *ClaudeGenerator) Generate(ctx context.Context, input Input) (*checkpoin } } - resultJSON, err := textGenerator.GenerateText(ctx, prompt, model) + var resultJSON string + var err error + if streamer, ok := agent.AsStreamingTextGenerator(textGenerator); ok { + resultJSON, err = streamer.GenerateTextStreaming(ctx, prompt, model, g.Progress) + } else { + resultJSON, err = textGenerator.GenerateText(ctx, prompt, model) + } if err != nil { return nil, fmt.Errorf("failed to generate summary text: %w", err) } diff --git a/cmd/entire/cli/summarize/summarize.go b/cmd/entire/cli/summarize/summarize.go index 8160f8363..bb213f61b 100644 --- a/cmd/entire/cli/summarize/summarize.go +++ b/cmd/entire/cli/summarize/summarize.go @@ -29,9 +29,10 @@ import ( // - filesTouched: list of files modified during the session // - agentType: the agent type to determine transcript format // - generator: summary generator to use (if nil, uses default ClaudeGenerator) +// - progress: optional callback for streaming progress events (nil to suppress) // // Returns nil, error if transcript is empty or cannot be parsed. -func GenerateFromTranscript(ctx context.Context, transcriptBytes redact.RedactedBytes, filesTouched []string, agentType types.AgentType, generator Generator) (*checkpoint.Summary, error) { +func GenerateFromTranscript(ctx context.Context, transcriptBytes redact.RedactedBytes, filesTouched []string, agentType types.AgentType, generator Generator, progress agent.ProgressFn) (*checkpoint.Summary, error) { if transcriptBytes.Len() == 0 { return nil, errors.New("empty transcript") } @@ -51,7 +52,12 @@ func GenerateFromTranscript(ctx context.Context, transcriptBytes redact.Redacted } if generator == nil { - generator = &ClaudeGenerator{} + generator = &ClaudeGenerator{Progress: progress} + } else if cg, ok := generator.(*ClaudeGenerator); ok && cg.Progress == nil && progress != nil { + // Clone to avoid mutating the caller's struct. + clone := *cg + clone.Progress = progress + generator = &clone } summary, err := generator.Generate(ctx, input) diff --git a/cmd/entire/cli/summarize/summarize_test.go b/cmd/entire/cli/summarize/summarize_test.go index 6ace80935..1266f57cb 100644 --- a/cmd/entire/cli/summarize/summarize_test.go +++ b/cmd/entire/cli/summarize/summarize_test.go @@ -660,7 +660,7 @@ func TestGenerateFromTranscript(t *testing.T) { transcript := []byte(`{"type":"user","message":{"content":"Hello"}} {"type":"assistant","message":{"content":[{"type":"text","text":"Hi there"}]}}`) - summary, err := GenerateFromTranscript(context.Background(), redact.AlreadyRedacted(transcript), []string{"file.go"}, "", mockGenerator) + summary, err := GenerateFromTranscript(context.Background(), redact.AlreadyRedacted(transcript), []string{"file.go"}, "", mockGenerator, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -673,7 +673,7 @@ func TestGenerateFromTranscript(t *testing.T) { func TestGenerateFromTranscript_EmptyTranscript(t *testing.T) { mockGenerator := &ClaudeGenerator{} - summary, err := GenerateFromTranscript(context.Background(), redact.AlreadyRedacted([]byte{}), []string{}, "", mockGenerator) + summary, err := GenerateFromTranscript(context.Background(), redact.AlreadyRedacted([]byte{}), []string{}, "", mockGenerator, nil) if err == nil { t.Error("expected error for empty transcript") } @@ -687,7 +687,7 @@ func TestGenerateFromTranscript_NilGenerator(t *testing.T) { // With nil generator, should use default ClaudeGenerator // This will fail because claude CLI isn't available in test, but tests the nil handling - _, err := GenerateFromTranscript(context.Background(), redact.AlreadyRedacted(transcript), []string{}, "", nil) + _, err := GenerateFromTranscript(context.Background(), redact.AlreadyRedacted(transcript), []string{}, "", nil, nil) // Error is expected (claude CLI not available), but function should not panic if err == nil { t.Log("Unexpectedly succeeded - claude CLI must be available")