diff --git a/.changeset/fix-compaction-edge-cases.md b/.changeset/fix-compaction-edge-cases.md new file mode 100644 index 00000000..001dcb95 --- /dev/null +++ b/.changeset/fix-compaction-edge-cases.md @@ -0,0 +1,6 @@ +--- +"@moonshot-ai/agent-core": patch +"@moonshot-ai/kimi-code": patch +--- + +Fix compaction to handle edge cases where no messages are compactable and improve retry logic. diff --git a/packages/agent-core/src/agent/compaction/config.ts b/packages/agent-core/src/agent/compaction/config.ts deleted file mode 100644 index 50d92f30..00000000 --- a/packages/agent-core/src/agent/compaction/config.ts +++ /dev/null @@ -1,19 +0,0 @@ -export interface CompactionConfig { - triggerRatio: number; - blockRatio: number; - reservedContextSize: number; - maxCompactionPerTurn: number; - maxRecentSteps: number; - maxRecentUserMessages: number; - maxRecentSizeRatio: number; -} - -export const DEFAULT_COMPACTION_CONFIG: CompactionConfig = { - triggerRatio: 0.85, - blockRatio: 0.85, // Same as triggerRatio to disable async compaction - reservedContextSize: 50_000, - maxCompactionPerTurn: 3, - maxRecentSteps: 3, - maxRecentUserMessages: Infinity, - maxRecentSizeRatio: 0.2, -}; diff --git a/packages/agent-core/src/agent/compaction/full.ts b/packages/agent-core/src/agent/compaction/full.ts index 4f20cb26..d3d008a4 100644 --- a/packages/agent-core/src/agent/compaction/full.ts +++ b/packages/agent-core/src/agent/compaction/full.ts @@ -8,132 +8,36 @@ import { import { APIEmptyResponseError, isRetryableGenerateError, - inputTotal, type GenerateResult, type Message, type TokenUsage, + APIContextOverflowError, } from '@moonshot-ai/kosong'; import type { Agent } from '..'; import { isAbortError } from '../../loop/errors'; import { - DEFAULT_MAX_RETRY_ATTEMPTS, retryBackoffDelays, sleepForRetry, } from '../../loop/retry'; -import type { TelemetryPropertyValue } from '../../telemetry'; -import { - applyCompletionBudget, - resolveCompletionBudget, -} from '../../utils/completion-budget'; import { renderPrompt } from '../../utils/render-prompt'; import { estimateTokens, - estimateTokensForMessage, estimateTokensForMessages, } from '../../utils/tokens'; -import { sliceCompleteMessages } from '../context/complete-slice'; import { project } from '../context/projector'; import compactionInstructionTemplate from './compaction-instruction.md'; -import { DEFAULT_COMPACTION_CONFIG, type CompactionConfig } from './config'; import { renderMessagesToText } from './render-messages'; import type { CompactionBeginData, CompactionResult } from './types'; +import { DEFAULT_COMPACTION_CONFIG, DefaultCompactionStrategy, type CompactionStrategy } from './strategy'; -export interface CompactionStrategy { - shouldCompact(usedSize: number, maxSize: number): boolean; - shouldBlock(usedSize: number, maxSize: number): boolean; - computeCompactCount(messages: readonly Message[], maxSize: number): number; - readonly checkAfterStep: boolean; - readonly maxCompactionPerTurn: number; -} - -export class DefaultCompactionStrategy implements CompactionStrategy { - constructor(protected readonly config: CompactionConfig = DEFAULT_COMPACTION_CONFIG) {} - - shouldCompact(usedSize: number, maxSize: number): boolean { - if (maxSize <= 0) return false; - return ( - usedSize >= maxSize * this.config.triggerRatio || - this.shouldUseReservedContext(maxSize, usedSize) - ); - } - - shouldBlock(usedSize: number, maxSize: number): boolean { - if (maxSize <= 0) return false; - return ( - usedSize >= maxSize * this.config.blockRatio || - this.shouldUseReservedContext(maxSize, usedSize) - ); - } - - private shouldUseReservedContext(maxSize: number, usedSize: number): boolean { - const reservedSize = this.config.reservedContextSize; - return reservedSize > 0 && reservedSize < maxSize && usedSize + reservedSize >= maxSize; - } - - computeCompactCount(messages: readonly Message[], maxSize: number) { - let splitAt = messages.length; - let recentSize = 0; - let userMessageCount = 0; - let onlySeenTrailingUsers = true; - for (let i = messages.length - 1; i >= 0; i--) { - const m1 = messages[i - 1]; - const m2 = messages[i]; - if (m2 === undefined) continue; - const isTrailingAssistantPlaceholder = - onlySeenTrailingUsers && - m2.role === 'assistant' && - m2.content.length === 0 && - m2.toolCalls.length === 0; - if (isTrailingAssistantPlaceholder) { - splitAt = i; - continue; - } - const isTrailingUserMessage = onlySeenTrailingUsers && m2.role === 'user'; - if (!isTrailingUserMessage && messages.length - i >= this.config.maxRecentSteps) break; - - if (m2.role === 'user') { - userMessageCount++; - if (!isTrailingUserMessage && userMessageCount > this.config.maxRecentUserMessages) { - break; - } - } - - recentSize += estimateTokensForMessage(m2); - if (isTrailingUserMessage) { - splitAt = i; - continue; - } - if (recentSize > maxSize * this.config.maxRecentSizeRatio) { - break; - } - const canSplitBeforeMessage = - m1?.role !== m2.role && !(m1?.role === 'user' && m2.role === 'assistant') && m2.role !== 'tool'; - if (canSplitBeforeMessage) { - splitAt = i; - } - if (m2.role !== 'user') { - onlySeenTrailingUsers = false; - } - } - - return splitAt; - } - - get checkAfterStep(): boolean { - return this.config.triggerRatio !== this.config.blockRatio; - } - - get maxCompactionPerTurn(): number { - return this.config.maxCompactionPerTurn; - } -} +type CompactionTelemetryTrigger = CompactionBeginData['source'] | 'manual-with-prompt' | 'unknown'; export interface CompactedHistory { text: string; } -type CompactionTelemetryTrigger = CompactionBeginData['source'] | 'manual-with-prompt' | 'unknown'; +export const MAX_COMPACTION_RETRY_ATTEMPTS = 5; export class FullCompaction { protected compactionCountInTurn = 0; @@ -152,23 +56,26 @@ export class FullCompaction { ) { this.strategy = strategy ?? - new DefaultCompactionStrategy({ - ...DEFAULT_COMPACTION_CONFIG, - reservedContextSize: - agent.providerManager?.config.loopControl?.reservedContextSize ?? - DEFAULT_COMPACTION_CONFIG.reservedContextSize, - }); + new DefaultCompactionStrategy( + () => agent.config.modelCapabilities.max_context_tokens, + { + ...DEFAULT_COMPACTION_CONFIG, + reservedContextSize: + agent.providerManager?.config.loopControl?.reservedContextSize ?? + DEFAULT_COMPACTION_CONFIG.reservedContextSize, + } + ); } get isCompacting(): boolean { return this.compacting !== null; } + get compactedHistory(): readonly CompactedHistory[] { + return this._compactedHistory; + } + begin(data: Readonly): void { - this.agent.records.logRecord({ - type: 'full_compaction.begin', - ...data, - }); if (this.compacting) return; if (data.source === 'manual') { this.compactionCountInTurn = 0; @@ -176,12 +83,24 @@ export class FullCompaction { this.compactionCountInTurn += 1; } if (this.compactionCountInTurn > this.strategy.maxCompactionPerTurn) return; - if (!this.agent.records.restoring) { - this.startCompactionWorker(data); + if (this.agent.records.restoring) { + return; + } + const compactedCount = this.strategy.computeCompactCount(this.agent.context.history, data.source); + if (compactedCount === 0) { + throw new KimiError(ErrorCodes.COMPACTION_UNABLE, 'No prefix that can be compacted in current history.'); } + this.agent.records.logRecord({ + type: 'full_compaction.begin', + ...data, + }); + this.startCompactionWorker(data, compactedCount); } - private startCompactionWorker(data: Readonly): void { + private startCompactionWorker( + data: Readonly, + compactedCount: number, + ): void { const abortController = new AbortController(); this.agent.emitEvent({ type: 'compaction.started', @@ -195,7 +114,7 @@ export class FullCompaction { promise: Promise.resolve(), }; this.compacting = active; - active.promise = this.compactionWorker(abortController.signal, data); + active.promise = this.compactionWorker(abortController.signal, data, compactedCount); } cancel(): void { @@ -212,55 +131,20 @@ export class FullCompaction { this.agent.emitEvent({ type: 'compaction.cancelled' }); } - complete( - result: CompactionResult, - llmUsage?: TokenUsage | undefined, - retryCount: number = 0, - ): void { + markCompleted() { this.agent.records.logRecord({ type: 'full_compaction.complete', - ...result, }); - const active = this.compacting; this.compacting = null; - const history = this.agent.context.history; this._compactedHistory.push({ - text: renderMessagesToText(history), + text: renderMessagesToText(this.agent.context.history), }); - this.agent.emitEvent({ type: 'compaction.completed', result }); - if (active !== null) { - const properties: Record = { - trigger_type: active.telemetryTrigger, - before_tokens: result.tokensBefore, - after_tokens: result.tokensAfter, - duration_ms: Date.now() - active.startedAt, - compacted_count: result.compactedCount, - retry_count: retryCount, - }; - if (llmUsage !== undefined) { - properties['llm_input_tokens'] = inputTotal(llmUsage); - properties['llm_output_tokens'] = llmUsage.output; - } - this.agent.telemetry.track('compaction_finished', properties); - } } private get tokenCountWithPending(): number { return this.agent.context.tokenCountWithPending; } - private get maxContextSize() { - return this.agent.config.modelCapabilities.max_context_tokens; - } - - private get shouldCompact(): boolean { - return this.strategy.shouldCompact(this.tokenCountWithPending, this.maxContextSize); - } - - private get shouldBlock(): boolean { - return this.strategy.shouldBlock(this.tokenCountWithPending, this.maxContextSize); - } - resetForTurn(): void { this.compactionCountInTurn = 0; } @@ -274,7 +158,7 @@ export class FullCompaction { async beforeStep(signal: AbortSignal): Promise { this.checkAutoCompaction(); - if (this.shouldBlock) { + if (this.strategy.shouldBlock(this.tokenCountWithPending)) { await this.block(signal); } } @@ -288,7 +172,7 @@ export class FullCompaction { private checkAutoCompaction(throwOnLimit: boolean = true): boolean { if (this.compacting) return true; - if (!this.shouldCompact) return false; + if (!this.strategy.shouldCompact(this.tokenCountWithPending)) return false; return this.beginAutoCompaction(throwOnLimit); } @@ -304,17 +188,8 @@ export class FullCompaction { } return false; } - const history = this.agent.context.history; - const compactedCount = this.computeCompactableCount(history); - if (compactedCount === 0) return false; - if ( - this.maxContextSize > 0 && - estimateTokensForMessages(project(history.slice(compactedCount))) >= this.maxContextSize - ) { - return false; - } - this.agent.fullCompaction.begin({ source: 'auto', instruction: undefined }); - return true; + this.begin({ source: 'auto', instruction: undefined }); + return this.compacting !== null; } private async block(signal: AbortSignal): Promise { @@ -336,46 +211,70 @@ export class FullCompaction { private async compactionWorker( signal: AbortSignal, data: Readonly, + initialCompactedCount: number, ): Promise { const startedAt = Date.now(); - let tokensBeforeForError = 0; - let retryCountForTelemetry = 0; + const originalHistory = [...this.agent.context.history]; + const tokensBefore = estimateTokensForMessages(originalHistory); + let retryCount = 0; try { - const originalHistory = [...this.agent.context.history]; - const tokensBefore = this.agent.context.tokenCount; - tokensBeforeForError = tokensBefore; - const compactedCount = this.computeCompactableCount(originalHistory); - if (compactedCount === 0) { - this.markCanceled(); - return undefined; - } - signal.throwIfAborted(); + let compactedCount = initialCompactedCount; + await this.triggerPreCompactHook(data, tokensBefore, signal); - signal.throwIfAborted(); const model = this.agent.config.model; - const messages = [ - ...project(originalHistory.slice(0, compactedCount)), - { - role: 'user', - content: [ - { - type: 'text', - text: COMPACTION_INSTRUCTION(data.instruction), - }, - ], - toolCalls: [], - } satisfies Message, - ]; - const { response, retryCount, summary } = await this.generateCompactionResponse({ - messages, - signal, - onRetry: (count) => { - retryCountForTelemetry = count; - }, - }); - if (response.usage !== null) { - this.agent.usage.record(model, response.usage); + + const delays = retryBackoffDelays(MAX_COMPACTION_RETRY_ATTEMPTS); + let usage: TokenUsage | null; + let summary: string; + while (true) { + const messagesToCompact = originalHistory.slice(0, compactedCount); + const messages = [ + ...project(messagesToCompact), + { + role: 'user', + content: [ + { + type: 'text', + text: COMPACTION_INSTRUCTION(data.instruction), + }, + ], + toolCalls: [], + } satisfies Message, + ]; + class TruncatedError extends Error {} + try { + const response = await this.agent.generate( + this.agent.config.provider, + this.agent.config.systemPrompt, + [...this.agent.tools.loopTools], + messages, + undefined, + { signal }, + ); + if (response.finishReason === 'truncated') { + throw new TruncatedError(); + } + usage = response.usage; + summary = extractCompactionSummary(response); + break; + } catch (error) { + if (error instanceof APIContextOverflowError || error instanceof TruncatedError) { + compactedCount = this.strategy.reduceCompactOnOverflow(messagesToCompact); + } + else if (!isRetryableGenerateError(error)) { + throw error; + } + if (retryCount + 1 >= MAX_COMPACTION_RETRY_ATTEMPTS) { + throw error; + } + await sleepForRetry(delays[retryCount]!, signal); + retryCount += 1; + } + } + + if (usage !== null) { + this.agent.usage.record(model, usage); } const newHistory = this.agent.context.history; @@ -388,7 +287,7 @@ export class FullCompaction { } const recent = originalHistory.slice(compactedCount); - const tokensAfter = estimateTokens(summary) + estimateTokensForMessages(project(recent)); + const tokensAfter = estimateTokens(summary) + estimateTokensForMessages(recent); const result: CompactionResult = { summary, @@ -397,7 +296,18 @@ export class FullCompaction { tokensAfter, }; - this.complete(result, response.usage ?? undefined, retryCount); + const active = this.compacting!; + this.agent.telemetry.track('compaction_finished', { + trigger_type: active.telemetryTrigger, + before_tokens: result.tokensBefore, + after_tokens: result.tokensAfter, + duration_ms: Date.now() - active.startedAt, + compacted_count: result.compactedCount, + retry_count: retryCount, + ...usage, + }); + this.markCompleted(); + this.agent.emitEvent({ type: 'compaction.completed', result }); this.agent.context.applyCompaction(result); this.triggerPostCompactHook(data, result); } catch (error) { @@ -417,76 +327,21 @@ export class FullCompaction { }); this.agent.telemetry.track('compaction_failed', { trigger_type: compactionTelemetryTrigger(data.source, data.instruction), - before_tokens: tokensBeforeForError, + before_tokens: tokensBefore, duration_ms: Date.now() - startedAt, - retry_count: retryCountForTelemetry, + retry_count: retryCount, error_type: error instanceof Error ? error.name : 'Unknown', }); } } } - private async generateCompactionResponse({ - messages, - signal, - onRetry, - }: { - readonly messages: Message[]; - readonly signal: AbortSignal; - readonly onRetry?: ((retryCount: number) => void) | undefined; - }): Promise<{ - readonly response: GenerateResult; - readonly summary: string; - readonly retryCount: number; - }> { - const maxAttempts = - this.agent.providerManager?.config.loopControl?.maxRetriesPerStep ?? - DEFAULT_MAX_RETRY_ATTEMPTS; - const delays = retryBackoffDelays(maxAttempts); - let retryCount = 0; - - const completionBudget = resolveCompletionBudget({ - reservedContextSize: - this.agent.providerManager?.config.loopControl?.reservedContextSize, - }); - const effectiveProvider = applyCompletionBudget({ - provider: this.agent.config.provider, - budget: completionBudget, - capability: this.agent.config.modelCapabilities, - }); - - for (let attempt = 1; ; attempt += 1) { - try { - const response = await this.agent.generate( - effectiveProvider, - this.agent.config.systemPrompt, - [...this.agent.tools.loopTools], - messages, - undefined, - { signal }, - ); - const summary = extractCompactionSummary(response); - return { response, summary, retryCount }; - } catch (error) { - if (attempt >= maxAttempts || !isRetryableGenerateError(error)) { - throw error; - } - retryCount += 1; - onRetry?.(retryCount); - await sleepForRetry(delays[attempt - 1] ?? 0, signal); - } - } - } - - get compactedHistory(): readonly CompactedHistory[] { - return this._compactedHistory; - } - private async triggerPreCompactHook( data: Readonly, tokenCount: number, signal: AbortSignal, ): Promise { + signal.throwIfAborted(); await this.agent.hooks?.trigger('PreCompact', { matcherValue: data.source, signal, @@ -495,6 +350,7 @@ export class FullCompaction { tokenCount, }, }); + signal.throwIfAborted(); } private triggerPostCompactHook( @@ -509,13 +365,6 @@ export class FullCompaction { }, }); } - - private computeCompactableCount(history: readonly Message[]): number { - return sliceCompleteMessages( - history, - this.strategy.computeCompactCount(history, this.maxContextSize), - ); - } } function extractCompactionSummary(response: GenerateResult): string { diff --git a/packages/agent-core/src/agent/compaction/index.ts b/packages/agent-core/src/agent/compaction/index.ts index d8da8bbd..876ee14e 100644 --- a/packages/agent-core/src/agent/compaction/index.ts +++ b/packages/agent-core/src/agent/compaction/index.ts @@ -1,3 +1,3 @@ export * from './full'; -export * from './config'; +export * from './strategy'; export * from './types'; diff --git a/packages/agent-core/src/agent/compaction/strategy.ts b/packages/agent-core/src/agent/compaction/strategy.ts new file mode 100644 index 00000000..98b3fd80 --- /dev/null +++ b/packages/agent-core/src/agent/compaction/strategy.ts @@ -0,0 +1,157 @@ +import type { Message } from "@moonshot-ai/kosong"; +import { estimateTokensForMessage } from "../../utils/tokens"; +import type { CompactionSource } from "./types"; + +export interface CompactionConfig { + triggerRatio: number; + blockRatio: number; + reservedContextSize: number; + maxCompactionPerTurn: number; + maxRecentMessages: number; + maxRecentUserMessages: number; + maxRecentSizeRatio: number; +} + +export const DEFAULT_COMPACTION_CONFIG: CompactionConfig = { + triggerRatio: 0.85, + blockRatio: 0.85, // Same as triggerRatio to disable async compaction + reservedContextSize: 50_000, + maxCompactionPerTurn: 3, + maxRecentMessages: 4, + maxRecentUserMessages: Infinity, + maxRecentSizeRatio: 0.2, +}; + +export interface CompactionStrategy { + shouldCompact(usedSize: number): boolean; + shouldBlock(usedSize: number): boolean; + computeCompactCount(messages: readonly Message[], source: CompactionSource): number; + reduceCompactOnOverflow(messages: readonly Message[]): number; + readonly checkAfterStep: boolean; + readonly maxCompactionPerTurn: number; +} + +export class DefaultCompactionStrategy implements CompactionStrategy { + constructor( + protected readonly maxSizeProvider: () => number, + protected readonly config: CompactionConfig = DEFAULT_COMPACTION_CONFIG + ) { } + + protected get maxSize(): number { + return this.maxSizeProvider(); + } + + shouldCompact(usedSize: number): boolean { + if (this.maxSize <= 0) return false; + return ( + usedSize >= this.maxSize * this.config.triggerRatio || + this.shouldUseReservedContext(usedSize) + ); + } + + shouldBlock(usedSize: number): boolean { + if (this.maxSize <= 0) return false; + return ( + usedSize >= this.maxSize * this.config.blockRatio || + this.shouldUseReservedContext(usedSize) + ); + } + + private shouldUseReservedContext(usedSize: number): boolean { + const reservedSize = this.config.reservedContextSize; + return reservedSize > 0 && reservedSize < this.maxSize && usedSize + reservedSize >= this.maxSize; + } + + computeCompactCount(messages: readonly Message[], source: CompactionSource): number { + // Return value: N messages to be compacted (0 means no compaction possible) + // LLM Input: messages.slice(0, N) + [user:instruction] + // Preserved recent messages: messages.slice(N) + + // Manual compaction + if (source === 'manual') { + for (let i = messages.length - 1; i > 0; i--) { + if (canSplitAfter(messages, i)) { + return i + 1; + } + } + return 0; + } + + // Auto compaction rules (in order of precedence): + // 1. The split after messages[N-1] must be safe per `canSplitAfter`: + // messages[N-1] is not a user or asst-with-tool-calls, and the retained + // suffix messages.slice(N) has no orphan tool result. + // 2. At least one recent message must be preserved + // 3. At most maxRecentMessages recent messages should be preserved + // 4. At most maxRecentUserMessages recent user messages should be preserved + // 5. At most maxRecentSizeRatio * maxSize recent messages should be preserved + // 6. N should be as small as possible + + let recentMessages = 1; + let recentUserMessages = 0; + let recentSize = 0; + let bestN: number | undefined; + + for (; recentMessages < messages.length; recentMessages++) { + const splitIndex = messages.length - recentMessages - 1; + const m2 = messages[messages.length - recentMessages]!; + + if (m2.role === 'user') { + recentUserMessages++; + } + recentSize += estimateTokensForMessage(m2); + + if (canSplitAfter(messages, splitIndex)) { + bestN = splitIndex + 1; + } + + const reachesMax = recentMessages >= this.config.maxRecentMessages + || recentUserMessages >= this.config.maxRecentUserMessages + || recentSize >= this.maxSize * this.config.maxRecentSizeRatio; + if (reachesMax && bestN !== undefined) { + break; + } + } + + return bestN ?? 0; + } + + reduceCompactOnOverflow(messages: readonly Message[]): number { + for (let i = messages.length - 2; i > 0; i--) { + if (canSplitAfter(messages, i)) { + return i + 1; + } + } + return messages.length; + } + + get checkAfterStep(): boolean { + return this.config.triggerRatio !== this.config.blockRatio; + } + + get maxCompactionPerTurn(): number { + return this.config.maxCompactionPerTurn; + } +} + +/** + * Decide whether a compaction split is safe to place immediately after + * `messages[index]`. A split is safe only when: + * - `messages[index]` itself is not a user message or an assistant message + * with pending tool calls (cutting either of those off from what follows + * would break the conversation), AND + * - the next message is not a tool result. The history is well-formed: + * tool results only appear after their owning `asst_w_tc` and all tool + * results for one exchange land consecutively before the next non-tool + * message. So if the suffix starts with a tool result, its `asst_w_tc` + * must be in the compacted prefix, which would orphan that result + * (e.g. splitting between tool_a and tool_b of a parallel call). + */ +function canSplitAfter(messages: readonly Message[], index: number): boolean { + const m = messages[index]; + if (m === undefined) return false; + if (m.role === 'user') return false; + if (m.role === 'assistant' && m.toolCalls.length > 0) return false; + if (messages[index + 1]?.role === 'tool') return false; + return true; +} diff --git a/packages/agent-core/src/agent/compaction/types.ts b/packages/agent-core/src/agent/compaction/types.ts index 3936faef..820365cd 100644 --- a/packages/agent-core/src/agent/compaction/types.ts +++ b/packages/agent-core/src/agent/compaction/types.ts @@ -5,7 +5,9 @@ export interface CompactionResult { tokensAfter: number; } +export type CompactionSource = 'manual' | 'auto'; + export interface CompactionBeginData { instruction?: string; - source: 'manual' | 'auto'; + source: CompactionSource; } diff --git a/packages/agent-core/src/agent/context/complete-slice.ts b/packages/agent-core/src/agent/context/complete-slice.ts deleted file mode 100644 index 258cdf90..00000000 --- a/packages/agent-core/src/agent/context/complete-slice.ts +++ /dev/null @@ -1,55 +0,0 @@ -import type { Message } from '@moonshot-ai/kosong'; - -export function sliceCompleteMessages( - messages: readonly Message[], - requestedEnd: number, -): number { - let normalized = Math.max(0, Math.min(messages.length, requestedEnd)); - - for (let i = 0; i < messages.length; i += 1) { - const message = messages[i]; - if (message?.role !== 'assistant' || message.toolCalls.length === 0) continue; - - const end = findToolExchangeEnd(messages, i); - if (end === undefined) { - if (normalized > i) { - normalized = includePromptForAssistant(messages, i); - } - continue; - } - - if (normalized > i && normalized < end) { - normalized = includePromptForAssistant(messages, i); - } - } - - return normalized; -} - -function findToolExchangeEnd( - messages: readonly Message[], - assistantIndex: number, -): number | undefined { - const assistant = messages[assistantIndex]; - if (assistant?.role !== 'assistant') return undefined; - - const pending = new Set(assistant.toolCalls.map((call) => call.id)); - if (pending.size === 0) return assistantIndex + 1; - - for (let i = assistantIndex + 1; i < messages.length; i += 1) { - const message = messages[i]; - if (message?.role !== 'tool') return undefined; - if (message.toolCallId !== undefined) { - pending.delete(message.toolCallId); - } - if (pending.size === 0) return i + 1; - } - - return undefined; -} - -function includePromptForAssistant(messages: readonly Message[], assistantIndex: number): number { - const previous = messages[assistantIndex - 1]; - if (previous?.role === 'user') return assistantIndex - 1; - return assistantIndex; -} diff --git a/packages/agent-core/src/agent/index.ts b/packages/agent-core/src/agent/index.ts index 88f95d90..4667d5e9 100644 --- a/packages/agent-core/src/agent/index.ts +++ b/packages/agent-core/src/agent/index.ts @@ -479,7 +479,7 @@ function buildLlmRequestMetadata( const estimatedInputTokens = estimateTokens(systemPrompt) + - estimateTokensForMessages([...history]) + + estimateTokensForMessages(history) + estimateTokensForTools(tools); const metadata: LlmRequestMetadata = { diff --git a/packages/agent-core/src/agent/records/index.ts b/packages/agent-core/src/agent/records/index.ts index 2e4ede74..fed074f6 100644 --- a/packages/agent-core/src/agent/records/index.ts +++ b/packages/agent-core/src/agent/records/index.ts @@ -54,7 +54,7 @@ function restoreAgentRecord(agent: Agent, input: AgentRecord): void { agent.fullCompaction.cancel(); return; case 'full_compaction.complete': - agent.fullCompaction.complete(input); + agent.fullCompaction.markCompleted(); return; case 'plan_mode.enter': agent.planMode.restoreEnter(input); diff --git a/packages/agent-core/src/agent/records/types.ts b/packages/agent-core/src/agent/records/types.ts index 3300bce5..88fbe6ef 100644 --- a/packages/agent-core/src/agent/records/types.ts +++ b/packages/agent-core/src/agent/records/types.ts @@ -63,7 +63,7 @@ export interface AgentRecordEvents { }; 'full_compaction.cancel': {}; - 'full_compaction.complete': CompactionResult; + 'full_compaction.complete': {}; 'context.append_message': { message: ContextMessage }; 'context.mark_last_user_prompt_blocked': { hookEvent: string }; diff --git a/packages/agent-core/src/errors/codes.ts b/packages/agent-core/src/errors/codes.ts index 2014dba2..97c5daad 100644 --- a/packages/agent-core/src/errors/codes.ts +++ b/packages/agent-core/src/errors/codes.ts @@ -51,6 +51,7 @@ export const ErrorCodes = { RECORDS_WRITE_FAILED: 'records.write_failed', COMPACTION_FAILED: 'compaction.failed', + COMPACTION_UNABLE: 'compaction.unable', BACKGROUND_TASK_ID_EMPTY: 'background.task_id_empty', MCP_SERVER_NOT_FOUND: 'mcp.server_not_found', @@ -307,6 +308,12 @@ export const KIMI_ERROR_INFO = { public: true, action: 'Inspect logs and consider increasing compaction limits.', }, + 'compaction.unable': { + title: 'Unable to compact', + retryable: false, + public: true, + action: 'The current history has no prefix that can be compacted (e.g. only a pending user message). Start a new turn or session instead.', + }, 'background.task_id_empty': { title: 'Background task id is empty', diff --git a/packages/agent-core/src/utils/tokens.ts b/packages/agent-core/src/utils/tokens.ts index a6547e96..1b4f7955 100644 --- a/packages/agent-core/src/utils/tokens.ts +++ b/packages/agent-core/src/utils/tokens.ts @@ -21,7 +21,7 @@ export function estimateTokens(text: string): number { return Math.ceil(asciiCount / 4) + nonAsciiCount; } -export function estimateTokensForMessages(messages: Message[]): number { +export function estimateTokensForMessages(messages: readonly Message[]): number { let total = 0; for (const message of messages) { total += estimateTokensForMessage(message); diff --git a/packages/agent-core/test/agent/compaction.test.ts b/packages/agent-core/test/agent/compaction.test.ts index 8b518981..020c58bb 100644 --- a/packages/agent-core/test/agent/compaction.test.ts +++ b/packages/agent-core/test/agent/compaction.test.ts @@ -46,7 +46,7 @@ describe('Agent compaction', () => { textMessage('user', `pending user ${'x'.repeat(1_200)}`), ]; - expect(strategy.computeCompactCount(messages, 1_000)).toBe(2); + expect(strategy.computeCompactCount(messages, 'auto')).toBe(2); }); it('keeps consecutive trailing user messages as recent', () => { @@ -58,10 +58,10 @@ describe('Agent compaction', () => { textMessage('user', `pending user two ${'x'.repeat(1_200)}`), ]; - expect(strategy.computeCompactCount(messages, 1_000)).toBe(2); + expect(strategy.computeCompactCount(messages, 'auto')).toBe(2); }); - it('does not keep an oversized completed exchange as recent', () => { + it('compacts the prefix when the trailing exchange itself is oversized', () => { const strategy = testCompactionStrategy(); const messages = [ textMessage('user', 'old user'), @@ -70,31 +70,85 @@ describe('Agent compaction', () => { textMessage('assistant', `recent assistant ${'x'.repeat(1_200)}`), ]; - expect(strategy.computeCompactCount(messages, 1_000)).toBe(messages.length); + expect(strategy.computeCompactCount(messages, 'auto')).toBe(2); + }); + + it('returns 0 when there is nothing to compact', () => { + const strategy = testCompactionStrategy(); + expect(strategy.computeCompactCount([], 'auto')).toBe(0); + expect(strategy.computeCompactCount([textMessage('user', 'only pending')], 'auto')).toBe(0); + expect( + strategy.computeCompactCount( + [ + textMessage('user', 'a'), + textMessage('user', 'b'), + textMessage('user', 'c'), + ], + 'auto', + ), + ).toBe(0); + }); + + it('returns 0 when no intermediate split exists and the last message is also unsplittable', () => { + const strategy = testCompactionStrategy(); + const messages: Message[] = [ + textMessage('user', 'inspect'), + { + role: 'assistant', + content: [], + toolCalls: [{ type: 'function', id: 'call_a', name: 'Lookup', arguments: '{}' }], + }, + ]; + + expect(strategy.computeCompactCount(messages, 'auto')).toBe(0); + }); + + it('does not split inside a parallel tool exchange', () => { + const strategy = testCompactionStrategy(); + const messages: Message[] = [ + textMessage('user', 'old user'), + textMessage('assistant', 'old assistant'), + textMessage('user', 'run both tools'), + { + role: 'assistant', + content: [], + toolCalls: [ + { type: 'function', id: 'call_a', name: 'Lookup', arguments: '{}' }, + { type: 'function', id: 'call_b', name: 'Lookup', arguments: '{}' }, + ], + }, + { role: 'tool', content: [{ type: 'text', text: 'a' }], toolCalls: [], toolCallId: 'call_a' }, + { role: 'tool', content: [{ type: 'text', text: 'b' }], toolCalls: [], toolCallId: 'call_b' }, + textMessage('user', 'next prompt'), + ]; + + // The only valid split is before the parallel exchange (after 'old assistant'), + // never between tool_a and tool_b — that would leave tool_b as an orphan. + expect(strategy.computeCompactCount(messages, 'auto')).toBe(2); }); it('reserves response context by default before the ratio threshold is reached', () => { - const strategy = new DefaultCompactionStrategy(); + const strategy = new DefaultCompactionStrategy(() => 256_000); - expect(strategy.shouldCompact(210_000, 256_000)).toBe(true); - expect(strategy.shouldBlock(210_000, 256_000)).toBe(true); + expect(strategy.shouldCompact(210_000)).toBe(true); + expect(strategy.shouldBlock(210_000)).toBe(true); }); it('ignores reserved context when the reserve is not smaller than the model window', () => { - const strategy = new DefaultCompactionStrategy({ + const strategy = new DefaultCompactionStrategy(() => 32_000, { triggerRatio: 0.85, blockRatio: 0.85, reservedContextSize: 50_000, maxCompactionPerTurn: 3, - maxRecentSteps: 3, + maxRecentMessages: 3, maxRecentUserMessages: Infinity, maxRecentSizeRatio: 0.2, }); - expect(strategy.shouldCompact(1, 32_000)).toBe(false); - expect(strategy.shouldBlock(1, 32_000)).toBe(false); - expect(strategy.shouldCompact(28_000, 32_000)).toBe(true); - expect(strategy.shouldBlock(28_000, 32_000)).toBe(true); + expect(strategy.shouldCompact(1)).toBe(false); + expect(strategy.shouldBlock(1)).toBe(false); + expect(strategy.shouldCompact(28_000)).toBe(true); + expect(strategy.shouldBlock(28_000)).toBe(true); }); it('runs manual compaction and applies the compacted context', async () => { @@ -123,12 +177,12 @@ describe('Agent compaction', () => { [wire] context.append_message { "message": { "role": "user", "content": [ { "type": "text", "text": "recent user three" } ], "toolCalls": [], "origin": { "kind": "user" } }, "time": "