query.ts
67 KB1730 lines
src/query.ts
1// biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered2import type {3 ToolResultBlockParam,4 ToolUseBlock,5} from '@anthropic-ai/sdk/resources/index.mjs'6import type { CanUseToolFn } from './hooks/useCanUseTool.js'7import { FallbackTriggeredError } from './services/api/withRetry.js'8import {9 calculateTokenWarningState,10 isAutoCompactEnabled,11 type AutoCompactTrackingState,12} from './services/compact/autoCompact.js'13import { buildPostCompactMessages } from './services/compact/compact.js'14/* eslint-disable @typescript-eslint/no-require-imports */15const reactiveCompact = feature('REACTIVE_COMPACT')16 ? (require('./services/compact/reactiveCompact.js') as typeof import('./services/compact/reactiveCompact.js'))17 : null18const contextCollapse = feature('CONTEXT_COLLAPSE')19 ? (require('./services/contextCollapse/index.js') as typeof import('./services/contextCollapse/index.js'))20 : null21/* eslint-enable @typescript-eslint/no-require-imports */22import {23 logEvent,24 type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,25} from 'src/services/analytics/index.js'26import { ImageSizeError } from './utils/imageValidation.js'27import { ImageResizeError } from './utils/imageResizer.js'28import { findToolByName, type ToolUseContext } from './Tool.js'29import { asSystemPrompt, type SystemPrompt } from './utils/systemPromptType.js'30import type {31 AssistantMessage,32 AttachmentMessage,33 Message,34 RequestStartEvent,35 StreamEvent,36 ToolUseSummaryMessage,37 UserMessage,38 TombstoneMessage,39} from './types/message.js'40import { logError } from './utils/log.js'41import {42 PROMPT_TOO_LONG_ERROR_MESSAGE,43 isPromptTooLongMessage,44} from './services/api/errors.js'45import { logAntError, logForDebugging } from './utils/debug.js'46import {47 createUserMessage,48 createUserInterruptionMessage,49 normalizeMessagesForAPI,50 createSystemMessage,51 createAssistantAPIErrorMessage,52 getMessagesAfterCompactBoundary,53 createToolUseSummaryMessage,54 createMicrocompactBoundaryMessage,55 stripSignatureBlocks,56} from './utils/messages.js'57import { generateToolUseSummary } from './services/toolUseSummary/toolUseSummaryGenerator.js'58import { prependUserContext, appendSystemContext } from './utils/api.js'59import {60 createAttachmentMessage,61 filterDuplicateMemoryAttachments,62 getAttachmentMessages,63 startRelevantMemoryPrefetch,64} from './utils/attachments.js'65/* eslint-disable @typescript-eslint/no-require-imports */66const skillPrefetch = feature('EXPERIMENTAL_SKILL_SEARCH')67 ? (require('./services/skillSearch/prefetch.js') as typeof import('./services/skillSearch/prefetch.js'))68 : null69const jobClassifier = feature('TEMPLATES')70 ? (require('./jobs/classifier.js') as typeof import('./jobs/classifier.js'))71 : null72/* eslint-enable @typescript-eslint/no-require-imports */73import {74 remove as removeFromQueue,75 getCommandsByMaxPriority,76 isSlashCommand,77} from './utils/messageQueueManager.js'78import { notifyCommandLifecycle } from './utils/commandLifecycle.js'79import { headlessProfilerCheckpoint } from './utils/headlessProfiler.js'80import {81 getRuntimeMainLoopModel,82 renderModelName,83} from './utils/model/model.js'84import {85 doesMostRecentAssistantMessageExceed200k,86 finalContextTokensFromLastResponse,87 tokenCountWithEstimation,88} from './utils/tokens.js'89import { ESCALATED_MAX_TOKENS } from './utils/context.js'90import { getFeatureValue_CACHED_MAY_BE_STALE } from './services/analytics/growthbook.js'91import { SLEEP_TOOL_NAME } from './tools/SleepTool/prompt.js'92import { executePostSamplingHooks } from './utils/hooks/postSamplingHooks.js'93import { executeStopFailureHooks } from './utils/hooks.js'94import type { QuerySource } from './constants/querySource.js'95import { createDumpPromptsFetch } from './services/api/dumpPrompts.js'96import { StreamingToolExecutor } from './services/tools/StreamingToolExecutor.js'97import { queryCheckpoint } from './utils/queryProfiler.js'98import { runTools } from './services/tools/toolOrchestration.js'99import { applyToolResultBudget } from './utils/toolResultStorage.js'100import { recordContentReplacement } from './utils/sessionStorage.js'101import { handleStopHooks } from './query/stopHooks.js'102import { buildQueryConfig } from './query/config.js'103import { productionDeps, type QueryDeps } from './query/deps.js'104import type { Terminal, Continue } from './query/transitions.js'105import { feature } from 'bun:bundle'106import {107 getCurrentTurnTokenBudget,108 getTurnOutputTokens,109 incrementBudgetContinuationCount,110} from './bootstrap/state.js'111import { createBudgetTracker, checkTokenBudget } from './query/tokenBudget.js'112import { count } from './utils/array.js'113114/* eslint-disable @typescript-eslint/no-require-imports */115const snipModule = feature('HISTORY_SNIP')116 ? (require('./services/compact/snipCompact.js') as typeof import('./services/compact/snipCompact.js'))117 : null118const taskSummaryModule = feature('BG_SESSIONS')119 ? (require('./utils/taskSummary.js') as typeof import('./utils/taskSummary.js'))120 : null121/* eslint-enable @typescript-eslint/no-require-imports */122123function* yieldMissingToolResultBlocks(124 assistantMessages: AssistantMessage[],125 errorMessage: string,126) {127 for (const assistantMessage of assistantMessages) {128 // Extract all tool use blocks from this assistant message129 const toolUseBlocks = assistantMessage.message.content.filter(130 content => content.type === 'tool_use',131 ) as ToolUseBlock[]132133 // Emit an interruption message for each tool use134 for (const toolUse of toolUseBlocks) {135 yield createUserMessage({136 content: [137 {138 type: 'tool_result',139 content: errorMessage,140 is_error: true,141 tool_use_id: toolUse.id,142 },143 ],144 toolUseResult: errorMessage,145 sourceToolAssistantUUID: assistantMessage.uuid,146 })147 }148 }149}150151/**152 * The rules of thinking are lengthy and fortuitous. They require plenty of thinking153 * of most long duration and deep meditation for a wizard to wrap one's noggin around.154 *155 * The rules follow:156 * 1. A message that contains a thinking or redacted_thinking block must be part of a query whose max_thinking_length > 0157 * 2. A thinking block may not be the last message in a block158 * 3. Thinking blocks must be preserved for the duration of an assistant trajectory (a single turn, or if that turn includes a tool_use block then also its subsequent tool_result and the following assistant message)159 *160 * Heed these rules well, young wizard. For they are the rules of thinking, and161 * the rules of thinking are the rules of the universe. If ye does not heed these162 * rules, ye will be punished with an entire day of debugging and hair pulling.163 */164const MAX_OUTPUT_TOKENS_RECOVERY_LIMIT = 3165166/**167 * Is this a max_output_tokens error message? If so, the streaming loop should168 * withhold it from SDK callers until we know whether the recovery loop can169 * continue. Yielding early leaks an intermediate error to SDK callers (e.g.170 * cowork/desktop) that terminate the session on any `error` field — the171 * recovery loop keeps running but nobody is listening.172 *173 * Mirrors reactiveCompact.isWithheldPromptTooLong.174 */175function isWithheldMaxOutputTokens(176 msg: Message | StreamEvent | undefined,177): msg is AssistantMessage {178 return msg?.type === 'assistant' && msg.apiError === 'max_output_tokens'179}180181export type QueryParams = {182 messages: Message[]183 systemPrompt: SystemPrompt184 userContext: { [k: string]: string }185 systemContext: { [k: string]: string }186 canUseTool: CanUseToolFn187 toolUseContext: ToolUseContext188 fallbackModel?: string189 querySource: QuerySource190 maxOutputTokensOverride?: number191 maxTurns?: number192 skipCacheWrite?: boolean193 // API task_budget (output_config.task_budget, beta task-budgets-2026-03-13).194 // Distinct from the tokenBudget +500k auto-continue feature. `total` is the195 // budget for the whole agentic turn; `remaining` is computed per iteration196 // from cumulative API usage. See configureTaskBudgetParams in claude.ts.197 taskBudget?: { total: number }198 deps?: QueryDeps199}200201// -- query loop state202203// Mutable state carried between loop iterations204type State = {205 messages: Message[]206 toolUseContext: ToolUseContext207 autoCompactTracking: AutoCompactTrackingState | undefined208 maxOutputTokensRecoveryCount: number209 hasAttemptedReactiveCompact: boolean210 maxOutputTokensOverride: number | undefined211 pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined212 stopHookActive: boolean | undefined213 turnCount: number214 // Why the previous iteration continued. Undefined on first iteration.215 // Lets tests assert recovery paths fired without inspecting message contents.216 transition: Continue | undefined217}218219export async function* query(220 params: QueryParams,221): AsyncGenerator<222 | StreamEvent223 | RequestStartEvent224 | Message225 | TombstoneMessage226 | ToolUseSummaryMessage,227 Terminal228> {229 const consumedCommandUuids: string[] = []230 const terminal = yield* queryLoop(params, consumedCommandUuids)231 // Only reached if queryLoop returned normally. Skipped on throw (error232 // propagates through yield*) and on .return() (Return completion closes233 // both generators). This gives the same asymmetric started-without-completed234 // signal as print.ts's drainCommandQueue when the turn fails.235 for (const uuid of consumedCommandUuids) {236 notifyCommandLifecycle(uuid, 'completed')237 }238 return terminal239}240241async function* queryLoop(242 params: QueryParams,243 consumedCommandUuids: string[],244): AsyncGenerator<245 | StreamEvent246 | RequestStartEvent247 | Message248 | TombstoneMessage249 | ToolUseSummaryMessage,250 Terminal251> {252 // Immutable params — never reassigned during the query loop.253 const {254 systemPrompt,255 userContext,256 systemContext,257 canUseTool,258 fallbackModel,259 querySource,260 maxTurns,261 skipCacheWrite,262 } = params263 const deps = params.deps ?? productionDeps()264265 // Mutable cross-iteration state. The loop body destructures this at the top266 // of each iteration so reads stay bare-name (`messages`, `toolUseContext`).267 // Continue sites write `state = { ... }` instead of 9 separate assignments.268 let state: State = {269 messages: params.messages,270 toolUseContext: params.toolUseContext,271 maxOutputTokensOverride: params.maxOutputTokensOverride,272 autoCompactTracking: undefined,273 stopHookActive: undefined,274 maxOutputTokensRecoveryCount: 0,275 hasAttemptedReactiveCompact: false,276 turnCount: 1,277 pendingToolUseSummary: undefined,278 transition: undefined,279 }280 const budgetTracker = feature('TOKEN_BUDGET') ? createBudgetTracker() : null281282 // task_budget.remaining tracking across compaction boundaries. Undefined283 // until first compact fires — while context is uncompacted the server can284 // see the full history and handles the countdown from {total} itself (see285 // api/api/sampling/prompt/renderer.py:292). After a compact, the server sees286 // only the summary and would under-count spend; remaining tells it the287 // pre-compact final window that got summarized away. Cumulative across288 // multiple compacts: each subtracts the final context at that compact's289 // trigger point. Loop-local (not on State) to avoid touching the 7 continue290 // sites.291 let taskBudgetRemaining: number | undefined = undefined292293 // Snapshot immutable env/statsig/session state once at entry. See QueryConfig294 // for what's included and why feature() gates are intentionally excluded.295 const config = buildQueryConfig()296297 // Fired once per user turn — the prompt is invariant across loop iterations,298 // so per-iteration firing would ask sideQuery the same question N times.299 // Consume point polls settledAt (never blocks). `using` disposes on all300 // generator exit paths — see MemoryPrefetch for dispose/telemetry semantics.301 using pendingMemoryPrefetch = startRelevantMemoryPrefetch(302 state.messages,303 state.toolUseContext,304 )305306 // eslint-disable-next-line no-constant-condition307 while (true) {308 // Destructure state at the top of each iteration. toolUseContext alone309 // is reassigned within an iteration (queryTracking, messages updates);310 // the rest are read-only between continue sites.311 let { toolUseContext } = state312 const {313 messages,314 autoCompactTracking,315 maxOutputTokensRecoveryCount,316 hasAttemptedReactiveCompact,317 maxOutputTokensOverride,318 pendingToolUseSummary,319 stopHookActive,320 turnCount,321 } = state322323 // Skill discovery prefetch — per-iteration (uses findWritePivot guard324 // that returns early on non-write iterations). Discovery runs while the325 // model streams and tools execute; awaited post-tools alongside the326 // memory prefetch consume. Replaces the blocking assistant_turn path327 // that ran inside getAttachmentMessages (97% of those calls found328 // nothing in prod). Turn-0 user-input discovery still blocks in329 // userInputAttachments — that's the one signal where there's no prior330 // work to hide under.331 const pendingSkillPrefetch = skillPrefetch?.startSkillDiscoveryPrefetch(332 null,333 messages,334 toolUseContext,335 )336337 yield { type: 'stream_request_start' }338339 queryCheckpoint('query_fn_entry')340341 // Record query start for headless latency tracking (skip for subagents)342 if (!toolUseContext.agentId) {343 headlessProfilerCheckpoint('query_started')344 }345346 // Initialize or increment query chain tracking347 const queryTracking = toolUseContext.queryTracking348 ? {349 chainId: toolUseContext.queryTracking.chainId,350 depth: toolUseContext.queryTracking.depth + 1,351 }352 : {353 chainId: deps.uuid(),354 depth: 0,355 }356357 const queryChainIdForAnalytics =358 queryTracking.chainId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS359360 toolUseContext = {361 ...toolUseContext,362 queryTracking,363 }364365 let messagesForQuery = [...getMessagesAfterCompactBoundary(messages)]366367 let tracking = autoCompactTracking368369 // Enforce per-message budget on aggregate tool result size. Runs BEFORE370 // microcompact — cached MC operates purely by tool_use_id (never inspects371 // content), so content replacement is invisible to it and the two compose372 // cleanly. No-ops when contentReplacementState is undefined (feature off).373 // Persist only for querySources that read records back on resume: agentId374 // routes to sidechain file (AgentTool resume) or session file (/resume).375 // Ephemeral runForkedAgent callers (agent_summary etc.) don't persist.376 const persistReplacements =377 querySource.startsWith('agent:') ||378 querySource.startsWith('repl_main_thread')379 messagesForQuery = await applyToolResultBudget(380 messagesForQuery,381 toolUseContext.contentReplacementState,382 persistReplacements383 ? records =>384 void recordContentReplacement(385 records,386 toolUseContext.agentId,387 ).catch(logError)388 : undefined,389 new Set(390 toolUseContext.options.tools391 .filter(t => !Number.isFinite(t.maxResultSizeChars))392 .map(t => t.name),393 ),394 )395396 // Apply snip before microcompact (both may run — they are not mutually exclusive).397 // snipTokensFreed is plumbed to autocompact so its threshold check reflects398 // what snip removed; tokenCountWithEstimation alone can't see it (reads usage399 // from the protected-tail assistant, which survives snip unchanged).400 let snipTokensFreed = 0401 if (feature('HISTORY_SNIP')) {402 queryCheckpoint('query_snip_start')403 const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery)404 messagesForQuery = snipResult.messages405 snipTokensFreed = snipResult.tokensFreed406 if (snipResult.boundaryMessage) {407 yield snipResult.boundaryMessage408 }409 queryCheckpoint('query_snip_end')410 }411412 // Apply microcompact before autocompact413 queryCheckpoint('query_microcompact_start')414 const microcompactResult = await deps.microcompact(415 messagesForQuery,416 toolUseContext,417 querySource,418 )419 messagesForQuery = microcompactResult.messages420 // For cached microcompact (cache editing), defer boundary message until after421 // the API response so we can use actual cache_deleted_input_tokens.422 // Gated behind feature() so the string is eliminated from external builds.423 const pendingCacheEdits = feature('CACHED_MICROCOMPACT')424 ? microcompactResult.compactionInfo?.pendingCacheEdits425 : undefined426 queryCheckpoint('query_microcompact_end')427428 // Project the collapsed context view and maybe commit more collapses.429 // Runs BEFORE autocompact so that if collapse gets us under the430 // autocompact threshold, autocompact is a no-op and we keep granular431 // context instead of a single summary.432 //433 // Nothing is yielded — the collapsed view is a read-time projection434 // over the REPL's full history. Summary messages live in the collapse435 // store, not the REPL array. This is what makes collapses persist436 // across turns: projectView() replays the commit log on every entry.437 // Within a turn, the view flows forward via state.messages at the438 // continue site (query.ts:1192), and the next projectView() no-ops439 // because the archived messages are already gone from its input.440 if (feature('CONTEXT_COLLAPSE') && contextCollapse) {441 const collapseResult = await contextCollapse.applyCollapsesIfNeeded(442 messagesForQuery,443 toolUseContext,444 querySource,445 )446 messagesForQuery = collapseResult.messages447 }448449 const fullSystemPrompt = asSystemPrompt(450 appendSystemContext(systemPrompt, systemContext),451 )452453 queryCheckpoint('query_autocompact_start')454 const { compactionResult, consecutiveFailures } = await deps.autocompact(455 messagesForQuery,456 toolUseContext,457 {458 systemPrompt,459 userContext,460 systemContext,461 toolUseContext,462 forkContextMessages: messagesForQuery,463 },464 querySource,465 tracking,466 snipTokensFreed,467 )468 queryCheckpoint('query_autocompact_end')469470 if (compactionResult) {471 const {472 preCompactTokenCount,473 postCompactTokenCount,474 truePostCompactTokenCount,475 compactionUsage,476 } = compactionResult477478 logEvent('tengu_auto_compact_succeeded', {479 originalMessageCount: messages.length,480 compactedMessageCount:481 compactionResult.summaryMessages.length +482 compactionResult.attachments.length +483 compactionResult.hookResults.length,484 preCompactTokenCount,485 postCompactTokenCount,486 truePostCompactTokenCount,487 compactionInputTokens: compactionUsage?.input_tokens,488 compactionOutputTokens: compactionUsage?.output_tokens,489 compactionCacheReadTokens:490 compactionUsage?.cache_read_input_tokens ?? 0,491 compactionCacheCreationTokens:492 compactionUsage?.cache_creation_input_tokens ?? 0,493 compactionTotalTokens: compactionUsage494 ? compactionUsage.input_tokens +495 (compactionUsage.cache_creation_input_tokens ?? 0) +496 (compactionUsage.cache_read_input_tokens ?? 0) +497 compactionUsage.output_tokens498 : 0,499500 queryChainId: queryChainIdForAnalytics,501 queryDepth: queryTracking.depth,502 })503504 // task_budget: capture pre-compact final context window before505 // messagesForQuery is replaced with postCompactMessages below.506 // iterations[-1] is the authoritative final window (post server tool507 // loops); see #304930.508 if (params.taskBudget) {509 const preCompactContext =510 finalContextTokensFromLastResponse(messagesForQuery)511 taskBudgetRemaining = Math.max(512 0,513 (taskBudgetRemaining ?? params.taskBudget.total) - preCompactContext,514 )515 }516517 // Reset on every compact so turnCounter/turnId reflect the MOST RECENT518 // compact. recompactionInfo (autoCompact.ts:190) already captured the519 // old values for turnsSincePreviousCompact/previousCompactTurnId before520 // the call, so this reset doesn't lose those.521 tracking = {522 compacted: true,523 turnId: deps.uuid(),524 turnCounter: 0,525 consecutiveFailures: 0,526 }527528 const postCompactMessages = buildPostCompactMessages(compactionResult)529530 for (const message of postCompactMessages) {531 yield message532 }533534 // Continue on with the current query call using the post compact messages535 messagesForQuery = postCompactMessages536 } else if (consecutiveFailures !== undefined) {537 // Autocompact failed — propagate failure count so the circuit breaker538 // can stop retrying on the next iteration.539 tracking = {540 ...(tracking ?? { compacted: false, turnId: '', turnCounter: 0 }),541 consecutiveFailures,542 }543 }544545 //TODO: no need to set toolUseContext.messages during set-up since it is updated here546 toolUseContext = {547 ...toolUseContext,548 messages: messagesForQuery,549 }550551 const assistantMessages: AssistantMessage[] = []552 const toolResults: (UserMessage | AttachmentMessage)[] = []553 // @see https://docs.claude.com/en/docs/build-with-claude/tool-use554 // Note: stop_reason === 'tool_use' is unreliable -- it's not always set correctly.555 // Set during streaming whenever a tool_use block arrives — the sole556 // loop-exit signal. If false after streaming, we're done (modulo stop-hook retry).557 const toolUseBlocks: ToolUseBlock[] = []558 let needsFollowUp = false559560 queryCheckpoint('query_setup_start')561 const useStreamingToolExecution = config.gates.streamingToolExecution562 let streamingToolExecutor = useStreamingToolExecution563 ? new StreamingToolExecutor(564 toolUseContext.options.tools,565 canUseTool,566 toolUseContext,567 )568 : null569570 const appState = toolUseContext.getAppState()571 const permissionMode = appState.toolPermissionContext.mode572 let currentModel = getRuntimeMainLoopModel({573 permissionMode,574 mainLoopModel: toolUseContext.options.mainLoopModel,575 exceeds200kTokens:576 permissionMode === 'plan' &&577 doesMostRecentAssistantMessageExceed200k(messagesForQuery),578 })579580 queryCheckpoint('query_setup_end')581582 // Create fetch wrapper once per query session to avoid memory retention.583 // Each call to createDumpPromptsFetch creates a closure that captures the request body.584 // Creating it once means only the latest request body is retained (~700KB),585 // instead of all request bodies from the session (~500MB for long sessions).586 // Note: agentId is effectively constant during a query() call - it only changes587 // between queries (e.g., /clear command or session resume).588 const dumpPromptsFetch = config.gates.isAnt589 ? createDumpPromptsFetch(toolUseContext.agentId ?? config.sessionId)590 : undefined591592 // Block if we've hit the hard blocking limit (only applies when auto-compact is OFF)593 // This reserves space so users can still run /compact manually594 // Skip this check if compaction just happened - the compaction result is already595 // validated to be under the threshold, and tokenCountWithEstimation would use596 // stale input_tokens from kept messages that reflect pre-compaction context size.597 // Same staleness applies to snip: subtract snipTokensFreed (otherwise we'd598 // falsely block in the window where snip brought us under autocompact threshold599 // but the stale usage is still above blocking limit — before this PR that600 // window never existed because autocompact always fired on the stale count).601 // Also skip for compact/session_memory queries — these are forked agents that602 // inherit the full conversation and would deadlock if blocked here (the compact603 // agent needs to run to REDUCE the token count).604 // Also skip when reactive compact is enabled and automatic compaction is605 // allowed — the preempt's synthetic error returns before the API call,606 // so reactive compact would never see a prompt-too-long to react to.607 // Widened to walrus so RC can act as fallback when proactive fails.608 //609 // Same skip for context-collapse: its recoverFromOverflow drains610 // staged collapses on a REAL API 413, then falls through to611 // reactiveCompact. A synthetic preempt here would return before the612 // API call and starve both recovery paths. The isAutoCompactEnabled()613 // conjunct preserves the user's explicit "no automatic anything"614 // config — if they set DISABLE_AUTO_COMPACT, they get the preempt.615 let collapseOwnsIt = false616 if (feature('CONTEXT_COLLAPSE')) {617 collapseOwnsIt =618 (contextCollapse?.isContextCollapseEnabled() ?? false) &&619 isAutoCompactEnabled()620 }621 // Hoist media-recovery gate once per turn. Withholding (inside the622 // stream loop) and recovery (after) must agree; CACHED_MAY_BE_STALE can623 // flip during the 5-30s stream, and withhold-without-recover would eat624 // the message. PTL doesn't hoist because its withholding is ungated —625 // it predates the experiment and is already the control-arm baseline.626 const mediaRecoveryEnabled =627 reactiveCompact?.isReactiveCompactEnabled() ?? false628 if (629 !compactionResult &&630 querySource !== 'compact' &&631 querySource !== 'session_memory' &&632 !(633 reactiveCompact?.isReactiveCompactEnabled() && isAutoCompactEnabled()634 ) &&635 !collapseOwnsIt636 ) {637 const { isAtBlockingLimit } = calculateTokenWarningState(638 tokenCountWithEstimation(messagesForQuery) - snipTokensFreed,639 toolUseContext.options.mainLoopModel,640 )641 if (isAtBlockingLimit) {642 yield createAssistantAPIErrorMessage({643 content: PROMPT_TOO_LONG_ERROR_MESSAGE,644 error: 'invalid_request',645 })646 return { reason: 'blocking_limit' }647 }648 }649650 let attemptWithFallback = true651652 queryCheckpoint('query_api_loop_start')653 try {654 while (attemptWithFallback) {655 attemptWithFallback = false656 try {657 let streamingFallbackOccured = false658 queryCheckpoint('query_api_streaming_start')659 for await (const message of deps.callModel({660 messages: prependUserContext(messagesForQuery, userContext),661 systemPrompt: fullSystemPrompt,662 thinkingConfig: toolUseContext.options.thinkingConfig,663 tools: toolUseContext.options.tools,664 signal: toolUseContext.abortController.signal,665 options: {666 async getToolPermissionContext() {667 const appState = toolUseContext.getAppState()668 return appState.toolPermissionContext669 },670 model: currentModel,671 ...(config.gates.fastModeEnabled && {672 fastMode: appState.fastMode,673 }),674 toolChoice: undefined,675 isNonInteractiveSession:676 toolUseContext.options.isNonInteractiveSession,677 fallbackModel,678 onStreamingFallback: () => {679 streamingFallbackOccured = true680 },681 querySource,682 agents: toolUseContext.options.agentDefinitions.activeAgents,683 allowedAgentTypes:684 toolUseContext.options.agentDefinitions.allowedAgentTypes,685 hasAppendSystemPrompt:686 !!toolUseContext.options.appendSystemPrompt,687 maxOutputTokensOverride,688 fetchOverride: dumpPromptsFetch,689 mcpTools: appState.mcp.tools,690 hasPendingMcpServers: appState.mcp.clients.some(691 c => c.type === 'pending',692 ),693 queryTracking,694 effortValue: appState.effortValue,695 advisorModel: appState.advisorModel,696 skipCacheWrite,697 agentId: toolUseContext.agentId,698 addNotification: toolUseContext.addNotification,699 ...(params.taskBudget && {700 taskBudget: {701 total: params.taskBudget.total,702 ...(taskBudgetRemaining !== undefined && {703 remaining: taskBudgetRemaining,704 }),705 },706 }),707 },708 })) {709 // We won't use the tool_calls from the first attempt710 // We could.. but then we'd have to merge assistant messages711 // with different ids and double up on full the tool_results712 if (streamingFallbackOccured) {713 // Yield tombstones for orphaned messages so they're removed from UI and transcript.714 // These partial messages (especially thinking blocks) have invalid signatures715 // that would cause "thinking blocks cannot be modified" API errors.716 for (const msg of assistantMessages) {717 yield { type: 'tombstone' as const, message: msg }718 }719 logEvent('tengu_orphaned_messages_tombstoned', {720 orphanedMessageCount: assistantMessages.length,721 queryChainId: queryChainIdForAnalytics,722 queryDepth: queryTracking.depth,723 })724725 assistantMessages.length = 0726 toolResults.length = 0727 toolUseBlocks.length = 0728 needsFollowUp = false729730 // Discard pending results from the failed streaming attempt and create731 // a fresh executor. This prevents orphan tool_results (with old tool_use_ids)732 // from being yielded after the fallback response arrives.733 if (streamingToolExecutor) {734 streamingToolExecutor.discard()735 streamingToolExecutor = new StreamingToolExecutor(736 toolUseContext.options.tools,737 canUseTool,738 toolUseContext,739 )740 }741 }742 // Backfill tool_use inputs on a cloned message before yield so743 // SDK stream output and transcript serialization see legacy/derived744 // fields. The original `message` is left untouched for745 // assistantMessages.push below — it flows back to the API and746 // mutating it would break prompt caching (byte mismatch).747 let yieldMessage: typeof message = message748 if (message.type === 'assistant') {749 let clonedContent: typeof message.message.content | undefined750 for (let i = 0; i < message.message.content.length; i++) {751 const block = message.message.content[i]!752 if (753 block.type === 'tool_use' &&754 typeof block.input === 'object' &&755 block.input !== null756 ) {757 const tool = findToolByName(758 toolUseContext.options.tools,759 block.name,760 )761 if (tool?.backfillObservableInput) {762 const originalInput = block.input as Record<string, unknown>763 const inputCopy = { ...originalInput }764 tool.backfillObservableInput(inputCopy)765 // Only yield a clone when backfill ADDED fields; skip if766 // it only OVERWROTE existing ones (e.g. file tools767 // expanding file_path). Overwrites change the serialized768 // transcript and break VCR fixture hashes on resume,769 // while adding nothing the SDK stream needs — hooks get770 // the expanded path via toolExecution.ts separately.771 const addedFields = Object.keys(inputCopy).some(772 k => !(k in originalInput),773 )774 if (addedFields) {775 clonedContent ??= [...message.message.content]776 clonedContent[i] = { ...block, input: inputCopy }777 }778 }779 }780 }781 if (clonedContent) {782 yieldMessage = {783 ...message,784 message: { ...message.message, content: clonedContent },785 }786 }787 }788 // Withhold recoverable errors (prompt-too-long, max-output-tokens)789 // until we know whether recovery (collapse drain / reactive790 // compact / truncation retry) can succeed. Still pushed to791 // assistantMessages so the recovery checks below find them.792 // Either subsystem's withhold is sufficient — they're793 // independent so turning one off doesn't break the other's794 // recovery path.795 //796 // feature() only works in if/ternary conditions (bun:bundle797 // tree-shaking constraint), so the collapse check is nested798 // rather than composed.799 let withheld = false800 if (feature('CONTEXT_COLLAPSE')) {801 if (802 contextCollapse?.isWithheldPromptTooLong(803 message,804 isPromptTooLongMessage,805 querySource,806 )807 ) {808 withheld = true809 }810 }811 if (reactiveCompact?.isWithheldPromptTooLong(message)) {812 withheld = true813 }814 if (815 mediaRecoveryEnabled &&816 reactiveCompact?.isWithheldMediaSizeError(message)817 ) {818 withheld = true819 }820 if (isWithheldMaxOutputTokens(message)) {821 withheld = true822 }823 if (!withheld) {824 yield yieldMessage825 }826 if (message.type === 'assistant') {827 assistantMessages.push(message)828829 const msgToolUseBlocks = message.message.content.filter(830 content => content.type === 'tool_use',831 ) as ToolUseBlock[]832 if (msgToolUseBlocks.length > 0) {833 toolUseBlocks.push(...msgToolUseBlocks)834 needsFollowUp = true835 }836837 if (838 streamingToolExecutor &&839 !toolUseContext.abortController.signal.aborted840 ) {841 for (const toolBlock of msgToolUseBlocks) {842 streamingToolExecutor.addTool(toolBlock, message)843 }844 }845 }846847 if (848 streamingToolExecutor &&849 !toolUseContext.abortController.signal.aborted850 ) {851 for (const result of streamingToolExecutor.getCompletedResults()) {852 if (result.message) {853 yield result.message854 toolResults.push(855 ...normalizeMessagesForAPI(856 [result.message],857 toolUseContext.options.tools,858 ).filter(_ => _.type === 'user'),859 )860 }861 }862 }863 }864 queryCheckpoint('query_api_streaming_end')865866 // Yield deferred microcompact boundary message using actual API-reported867 // token deletion count instead of client-side estimates.868 // Entire block gated behind feature() so the excluded string869 // is eliminated from external builds.870 if (feature('CACHED_MICROCOMPACT') && pendingCacheEdits) {871 const lastAssistant = assistantMessages.at(-1)872 // The API field is cumulative/sticky across requests, so we873 // subtract the baseline captured before this request to get the delta.874 const usage = lastAssistant?.message.usage875 const cumulativeDeleted = usage876 ? ((usage as unknown as Record<string, number>)877 .cache_deleted_input_tokens ?? 0)878 : 0879 const deletedTokens = Math.max(880 0,881 cumulativeDeleted - pendingCacheEdits.baselineCacheDeletedTokens,882 )883 if (deletedTokens > 0) {884 yield createMicrocompactBoundaryMessage(885 pendingCacheEdits.trigger,886 0,887 deletedTokens,888 pendingCacheEdits.deletedToolIds,889 [],890 )891 }892 }893 } catch (innerError) {894 if (innerError instanceof FallbackTriggeredError && fallbackModel) {895 // Fallback was triggered - switch model and retry896 currentModel = fallbackModel897 attemptWithFallback = true898899 // Clear assistant messages since we'll retry the entire request900 yield* yieldMissingToolResultBlocks(901 assistantMessages,902 'Model fallback triggered',903 )904 assistantMessages.length = 0905 toolResults.length = 0906 toolUseBlocks.length = 0907 needsFollowUp = false908909 // Discard pending results from the failed attempt and create a910 // fresh executor. This prevents orphan tool_results (with old911 // tool_use_ids) from leaking into the retry.912 if (streamingToolExecutor) {913 streamingToolExecutor.discard()914 streamingToolExecutor = new StreamingToolExecutor(915 toolUseContext.options.tools,916 canUseTool,917 toolUseContext,918 )919 }920921 // Update tool use context with new model922 toolUseContext.options.mainLoopModel = fallbackModel923924 // Thinking signatures are model-bound: replaying a protected-thinking925 // block (e.g. capybara) to an unprotected fallback (e.g. opus) 400s.926 // Strip before retry so the fallback model gets clean history.927 if (process.env.USER_TYPE === 'ant') {928 messagesForQuery = stripSignatureBlocks(messagesForQuery)929 }930931 // Log the fallback event932 logEvent('tengu_model_fallback_triggered', {933 original_model:934 innerError.originalModel as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,935 fallback_model:936 fallbackModel as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,937 entrypoint:938 'cli' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,939 queryChainId: queryChainIdForAnalytics,940 queryDepth: queryTracking.depth,941 })942943 // Yield system message about fallback — use 'warning' level so944 // users see the notification without needing verbose mode945 yield createSystemMessage(946 `Switched to ${renderModelName(innerError.fallbackModel)} due to high demand for ${renderModelName(innerError.originalModel)}`,947 'warning',948 )949950 continue951 }952 throw innerError953 }954 }955 } catch (error) {956 logError(error)957 const errorMessage =958 error instanceof Error ? error.message : String(error)959 logEvent('tengu_query_error', {960 assistantMessages: assistantMessages.length,961 toolUses: assistantMessages.flatMap(_ =>962 _.message.content.filter(content => content.type === 'tool_use'),963 ).length,964965 queryChainId: queryChainIdForAnalytics,966 queryDepth: queryTracking.depth,967 })968969 // Handle image size/resize errors with user-friendly messages970 if (971 error instanceof ImageSizeError ||972 error instanceof ImageResizeError973 ) {974 yield createAssistantAPIErrorMessage({975 content: error.message,976 })977 return { reason: 'image_error' }978 }979980 // Generally queryModelWithStreaming should not throw errors but instead981 // yield them as synthetic assistant messages. However if it does throw982 // due to a bug, we may end up in a state where we have already emitted983 // a tool_use block but will stop before emitting the tool_result.984 yield* yieldMissingToolResultBlocks(assistantMessages, errorMessage)985986 // Surface the real error instead of a misleading "[Request interrupted987 // by user]" — this path is a model/runtime failure, not a user action.988 // SDK consumers were seeing phantom interrupts on e.g. Node 18's missing989 // Array.prototype.with(), masking the actual cause.990 yield createAssistantAPIErrorMessage({991 content: errorMessage,992 })993994 // To help track down bugs, log loudly for ants995 logAntError('Query error', error)996 return { reason: 'model_error', error }997 }998999 // Execute post-sampling hooks after model response is complete1000 if (assistantMessages.length > 0) {1001 void executePostSamplingHooks(1002 [...messagesForQuery, ...assistantMessages],1003 systemPrompt,1004 userContext,1005 systemContext,1006 toolUseContext,1007 querySource,1008 )1009 }10101011 // We need to handle a streaming abort before anything else.1012 // When using streamingToolExecutor, we must consume getRemainingResults() so the1013 // executor can generate synthetic tool_result blocks for queued/in-progress tools.1014 // Without this, tool_use blocks would lack matching tool_result blocks.1015 if (toolUseContext.abortController.signal.aborted) {1016 if (streamingToolExecutor) {1017 // Consume remaining results - executor generates synthetic tool_results for1018 // aborted tools since it checks the abort signal in executeTool()1019 for await (const update of streamingToolExecutor.getRemainingResults()) {1020 if (update.message) {1021 yield update.message1022 }1023 }1024 } else {1025 yield* yieldMissingToolResultBlocks(1026 assistantMessages,1027 'Interrupted by user',1028 )1029 }1030 // chicago MCP: auto-unhide + lock release on interrupt. Same cleanup1031 // as the natural turn-end path in stopHooks.ts. Main thread only —1032 // see stopHooks.ts for the subagent-releasing-main's-lock rationale.1033 if (feature('CHICAGO_MCP') && !toolUseContext.agentId) {1034 try {1035 const { cleanupComputerUseAfterTurn } = await import(1036 './utils/computerUse/cleanup.js'1037 )1038 await cleanupComputerUseAfterTurn(toolUseContext)1039 } catch {1040 // Failures are silent — this is dogfooding cleanup, not critical path1041 }1042 }10431044 // Skip the interruption message for submit-interrupts — the queued1045 // user message that follows provides sufficient context.1046 if (toolUseContext.abortController.signal.reason !== 'interrupt') {1047 yield createUserInterruptionMessage({1048 toolUse: false,1049 })1050 }1051 return { reason: 'aborted_streaming' }1052 }10531054 // Yield tool use summary from previous turn — haiku (~1s) resolved during model streaming (5-30s)1055 if (pendingToolUseSummary) {1056 const summary = await pendingToolUseSummary1057 if (summary) {1058 yield summary1059 }1060 }10611062 if (!needsFollowUp) {1063 const lastMessage = assistantMessages.at(-1)10641065 // Prompt-too-long recovery: the streaming loop withheld the error1066 // (see withheldByCollapse / withheldByReactive above). Try collapse1067 // drain first (cheap, keeps granular context), then reactive compact1068 // (full summary). Single-shot on each — if a retry still 413's,1069 // the next stage handles it or the error surfaces.1070 const isWithheld413 =1071 lastMessage?.type === 'assistant' &&1072 lastMessage.isApiErrorMessage &&1073 isPromptTooLongMessage(lastMessage)1074 // Media-size rejections (image/PDF/many-image) are recoverable via1075 // reactive compact's strip-retry. Unlike PTL, media errors skip the1076 // collapse drain — collapse doesn't strip images. mediaRecoveryEnabled1077 // is the hoisted gate from before the stream loop (same value as the1078 // withholding check — these two must agree or a withheld message is1079 // lost). If the oversized media is in the preserved tail, the1080 // post-compact turn will media-error again; hasAttemptedReactiveCompact1081 // prevents a spiral and the error surfaces.1082 const isWithheldMedia =1083 mediaRecoveryEnabled &&1084 reactiveCompact?.isWithheldMediaSizeError(lastMessage)1085 if (isWithheld413) {1086 // First: drain all staged context-collapses. Gated on the PREVIOUS1087 // transition not being collapse_drain_retry — if we already drained1088 // and the retry still 413'd, fall through to reactive compact.1089 if (1090 feature('CONTEXT_COLLAPSE') &&1091 contextCollapse &&1092 state.transition?.reason !== 'collapse_drain_retry'1093 ) {1094 const drained = contextCollapse.recoverFromOverflow(1095 messagesForQuery,1096 querySource,1097 )1098 if (drained.committed > 0) {1099 const next: State = {1100 messages: drained.messages,1101 toolUseContext,1102 autoCompactTracking: tracking,1103 maxOutputTokensRecoveryCount,1104 hasAttemptedReactiveCompact,1105 maxOutputTokensOverride: undefined,1106 pendingToolUseSummary: undefined,1107 stopHookActive: undefined,1108 turnCount,1109 transition: {1110 reason: 'collapse_drain_retry',1111 committed: drained.committed,1112 },1113 }1114 state = next1115 continue1116 }1117 }1118 }1119 if ((isWithheld413 || isWithheldMedia) && reactiveCompact) {1120 const compacted = await reactiveCompact.tryReactiveCompact({1121 hasAttempted: hasAttemptedReactiveCompact,1122 querySource,1123 aborted: toolUseContext.abortController.signal.aborted,1124 messages: messagesForQuery,1125 cacheSafeParams: {1126 systemPrompt,1127 userContext,1128 systemContext,1129 toolUseContext,1130 forkContextMessages: messagesForQuery,1131 },1132 })11331134 if (compacted) {1135 // task_budget: same carryover as the proactive path above.1136 // messagesForQuery still holds the pre-compact array here (the1137 // 413-failed attempt's input).1138 if (params.taskBudget) {1139 const preCompactContext =1140 finalContextTokensFromLastResponse(messagesForQuery)1141 taskBudgetRemaining = Math.max(1142 0,1143 (taskBudgetRemaining ?? params.taskBudget.total) -1144 preCompactContext,1145 )1146 }11471148 const postCompactMessages = buildPostCompactMessages(compacted)1149 for (const msg of postCompactMessages) {1150 yield msg1151 }1152 const next: State = {1153 messages: postCompactMessages,1154 toolUseContext,1155 autoCompactTracking: undefined,1156 maxOutputTokensRecoveryCount,1157 hasAttemptedReactiveCompact: true,1158 maxOutputTokensOverride: undefined,1159 pendingToolUseSummary: undefined,1160 stopHookActive: undefined,1161 turnCount,1162 transition: { reason: 'reactive_compact_retry' },1163 }1164 state = next1165 continue1166 }11671168 // No recovery — surface the withheld error and exit. Do NOT fall1169 // through to stop hooks: the model never produced a valid response,1170 // so hooks have nothing meaningful to evaluate. Running stop hooks1171 // on prompt-too-long creates a death spiral: error → hook blocking1172 // → retry → error → … (the hook injects more tokens each cycle).1173 yield lastMessage1174 void executeStopFailureHooks(lastMessage, toolUseContext)1175 return { reason: isWithheldMedia ? 'image_error' : 'prompt_too_long' }1176 } else if (feature('CONTEXT_COLLAPSE') && isWithheld413) {1177 // reactiveCompact compiled out but contextCollapse withheld and1178 // couldn't recover (staged queue empty/stale). Surface. Same1179 // early-return rationale — don't fall through to stop hooks.1180 yield lastMessage1181 void executeStopFailureHooks(lastMessage, toolUseContext)1182 return { reason: 'prompt_too_long' }1183 }11841185 // Check for max_output_tokens and inject recovery message. The error1186 // was withheld from the stream above; only surface it if recovery1187 // exhausts.1188 if (isWithheldMaxOutputTokens(lastMessage)) {1189 // Escalating retry: if we used the capped 8k default and hit the1190 // limit, retry the SAME request at 64k — no meta message, no1191 // multi-turn dance. This fires once per turn (guarded by the1192 // override check), then falls through to multi-turn recovery if1193 // 64k also hits the cap.1194 // 3P default: false (not validated on Bedrock/Vertex)1195 const capEnabled = getFeatureValue_CACHED_MAY_BE_STALE(1196 'tengu_otk_slot_v1',1197 false,1198 )1199 if (1200 capEnabled &&1201 maxOutputTokensOverride === undefined &&1202 !process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS1203 ) {1204 logEvent('tengu_max_tokens_escalate', {1205 escalatedTo: ESCALATED_MAX_TOKENS,1206 })1207 const next: State = {1208 messages: messagesForQuery,1209 toolUseContext,1210 autoCompactTracking: tracking,1211 maxOutputTokensRecoveryCount,1212 hasAttemptedReactiveCompact,1213 maxOutputTokensOverride: ESCALATED_MAX_TOKENS,1214 pendingToolUseSummary: undefined,1215 stopHookActive: undefined,1216 turnCount,1217 transition: { reason: 'max_output_tokens_escalate' },1218 }1219 state = next1220 continue1221 }12221223 if (maxOutputTokensRecoveryCount < MAX_OUTPUT_TOKENS_RECOVERY_LIMIT) {1224 const recoveryMessage = createUserMessage({1225 content:1226 `Output token limit hit. Resume directly — no apology, no recap of what you were doing. ` +1227 `Pick up mid-thought if that is where the cut happened. Break remaining work into smaller pieces.`,1228 isMeta: true,1229 })12301231 const next: State = {1232 messages: [1233 ...messagesForQuery,1234 ...assistantMessages,1235 recoveryMessage,1236 ],1237 toolUseContext,1238 autoCompactTracking: tracking,1239 maxOutputTokensRecoveryCount: maxOutputTokensRecoveryCount + 1,1240 hasAttemptedReactiveCompact,1241 maxOutputTokensOverride: undefined,1242 pendingToolUseSummary: undefined,1243 stopHookActive: undefined,1244 turnCount,1245 transition: {1246 reason: 'max_output_tokens_recovery',1247 attempt: maxOutputTokensRecoveryCount + 1,1248 },1249 }1250 state = next1251 continue1252 }12531254 // Recovery exhausted — surface the withheld error now.1255 yield lastMessage1256 }12571258 // Skip stop hooks when the last message is an API error (rate limit,1259 // prompt-too-long, auth failure, etc.). The model never produced a1260 // real response — hooks evaluating it create a death spiral:1261 // error → hook blocking → retry → error → …1262 if (lastMessage?.isApiErrorMessage) {1263 void executeStopFailureHooks(lastMessage, toolUseContext)1264 return { reason: 'completed' }1265 }12661267 const stopHookResult = yield* handleStopHooks(1268 messagesForQuery,1269 assistantMessages,1270 systemPrompt,1271 userContext,1272 systemContext,1273 toolUseContext,1274 querySource,1275 stopHookActive,1276 )12771278 if (stopHookResult.preventContinuation) {1279 return { reason: 'stop_hook_prevented' }1280 }12811282 if (stopHookResult.blockingErrors.length > 0) {1283 const next: State = {1284 messages: [1285 ...messagesForQuery,1286 ...assistantMessages,1287 ...stopHookResult.blockingErrors,1288 ],1289 toolUseContext,1290 autoCompactTracking: tracking,1291 maxOutputTokensRecoveryCount: 0,1292 // Preserve the reactive compact guard — if compact already ran and1293 // couldn't recover from prompt-too-long, retrying after a stop-hook1294 // blocking error will produce the same result. Resetting to false1295 // here caused an infinite loop: compact → still too long → error →1296 // stop hook blocking → compact → … burning thousands of API calls.1297 hasAttemptedReactiveCompact,1298 maxOutputTokensOverride: undefined,1299 pendingToolUseSummary: undefined,1300 stopHookActive: true,1301 turnCount,1302 transition: { reason: 'stop_hook_blocking' },1303 }1304 state = next1305 continue1306 }13071308 if (feature('TOKEN_BUDGET')) {1309 const decision = checkTokenBudget(1310 budgetTracker!,1311 toolUseContext.agentId,1312 getCurrentTurnTokenBudget(),1313 getTurnOutputTokens(),1314 )13151316 if (decision.action === 'continue') {1317 incrementBudgetContinuationCount()1318 logForDebugging(1319 `Token budget continuation #${decision.continuationCount}: ${decision.pct}% (${decision.turnTokens.toLocaleString()} / ${decision.budget.toLocaleString()})`,1320 )1321 state = {1322 messages: [1323 ...messagesForQuery,1324 ...assistantMessages,1325 createUserMessage({1326 content: decision.nudgeMessage,1327 isMeta: true,1328 }),1329 ],1330 toolUseContext,1331 autoCompactTracking: tracking,1332 maxOutputTokensRecoveryCount: 0,1333 hasAttemptedReactiveCompact: false,1334 maxOutputTokensOverride: undefined,1335 pendingToolUseSummary: undefined,1336 stopHookActive: undefined,1337 turnCount,1338 transition: { reason: 'token_budget_continuation' },1339 }1340 continue1341 }13421343 if (decision.completionEvent) {1344 if (decision.completionEvent.diminishingReturns) {1345 logForDebugging(1346 `Token budget early stop: diminishing returns at ${decision.completionEvent.pct}%`,1347 )1348 }1349 logEvent('tengu_token_budget_completed', {1350 ...decision.completionEvent,1351 queryChainId: queryChainIdForAnalytics,1352 queryDepth: queryTracking.depth,1353 })1354 }1355 }13561357 return { reason: 'completed' }1358 }13591360 let shouldPreventContinuation = false1361 let updatedToolUseContext = toolUseContext13621363 queryCheckpoint('query_tool_execution_start')136413651366 if (streamingToolExecutor) {1367 logEvent('tengu_streaming_tool_execution_used', {1368 tool_count: toolUseBlocks.length,1369 queryChainId: queryChainIdForAnalytics,1370 queryDepth: queryTracking.depth,1371 })1372 } else {1373 logEvent('tengu_streaming_tool_execution_not_used', {1374 tool_count: toolUseBlocks.length,1375 queryChainId: queryChainIdForAnalytics,1376 queryDepth: queryTracking.depth,1377 })1378 }13791380 const toolUpdates = streamingToolExecutor1381 ? streamingToolExecutor.getRemainingResults()1382 : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)13831384 for await (const update of toolUpdates) {1385 if (update.message) {1386 yield update.message13871388 if (1389 update.message.type === 'attachment' &&1390 update.message.attachment.type === 'hook_stopped_continuation'1391 ) {1392 shouldPreventContinuation = true1393 }13941395 toolResults.push(1396 ...normalizeMessagesForAPI(1397 [update.message],1398 toolUseContext.options.tools,1399 ).filter(_ => _.type === 'user'),1400 )1401 }1402 if (update.newContext) {1403 updatedToolUseContext = {1404 ...update.newContext,1405 queryTracking,1406 }1407 }1408 }1409 queryCheckpoint('query_tool_execution_end')14101411 // Generate tool use summary after tool batch completes — passed to next recursive call1412 let nextPendingToolUseSummary:1413 | Promise<ToolUseSummaryMessage | null>1414 | undefined1415 if (1416 config.gates.emitToolUseSummaries &&1417 toolUseBlocks.length > 0 &&1418 !toolUseContext.abortController.signal.aborted &&1419 !toolUseContext.agentId // subagents don't surface in mobile UI — skip the Haiku call1420 ) {1421 // Extract the last assistant text block for context1422 const lastAssistantMessage = assistantMessages.at(-1)1423 let lastAssistantText: string | undefined1424 if (lastAssistantMessage) {1425 const textBlocks = lastAssistantMessage.message.content.filter(1426 block => block.type === 'text',1427 )1428 if (textBlocks.length > 0) {1429 const lastTextBlock = textBlocks.at(-1)1430 if (lastTextBlock && 'text' in lastTextBlock) {1431 lastAssistantText = lastTextBlock.text1432 }1433 }1434 }14351436 // Collect tool info for summary generation1437 const toolUseIds = toolUseBlocks.map(block => block.id)1438 const toolInfoForSummary = toolUseBlocks.map(block => {1439 // Find the corresponding tool result1440 const toolResult = toolResults.find(1441 result =>1442 result.type === 'user' &&1443 Array.isArray(result.message.content) &&1444 result.message.content.some(1445 content =>1446 content.type === 'tool_result' &&1447 content.tool_use_id === block.id,1448 ),1449 )1450 const resultContent =1451 toolResult?.type === 'user' &&1452 Array.isArray(toolResult.message.content)1453 ? toolResult.message.content.find(1454 (c): c is ToolResultBlockParam =>1455 c.type === 'tool_result' && c.tool_use_id === block.id,1456 )1457 : undefined1458 return {1459 name: block.name,1460 input: block.input,1461 output:1462 resultContent && 'content' in resultContent1463 ? resultContent.content1464 : null,1465 }1466 })14671468 // Fire off summary generation without blocking the next API call1469 nextPendingToolUseSummary = generateToolUseSummary({1470 tools: toolInfoForSummary,1471 signal: toolUseContext.abortController.signal,1472 isNonInteractiveSession: toolUseContext.options.isNonInteractiveSession,1473 lastAssistantText,1474 })1475 .then(summary => {1476 if (summary) {1477 return createToolUseSummaryMessage(summary, toolUseIds)1478 }1479 return null1480 })1481 .catch(() => null)1482 }14831484 // We were aborted during tool calls1485 if (toolUseContext.abortController.signal.aborted) {1486 // chicago MCP: auto-unhide + lock release when aborted mid-tool-call.1487 // This is the most likely Ctrl+C path for CU (e.g. slow screenshot).1488 // Main thread only — see stopHooks.ts for the subagent rationale.1489 if (feature('CHICAGO_MCP') && !toolUseContext.agentId) {1490 try {1491 const { cleanupComputerUseAfterTurn } = await import(1492 './utils/computerUse/cleanup.js'1493 )1494 await cleanupComputerUseAfterTurn(toolUseContext)1495 } catch {1496 // Failures are silent — this is dogfooding cleanup, not critical path1497 }1498 }1499 // Skip the interruption message for submit-interrupts — the queued1500 // user message that follows provides sufficient context.1501 if (toolUseContext.abortController.signal.reason !== 'interrupt') {1502 yield createUserInterruptionMessage({1503 toolUse: true,1504 })1505 }1506 // Check maxTurns before returning when aborted1507 const nextTurnCountOnAbort = turnCount + 11508 if (maxTurns && nextTurnCountOnAbort > maxTurns) {1509 yield createAttachmentMessage({1510 type: 'max_turns_reached',1511 maxTurns,1512 turnCount: nextTurnCountOnAbort,1513 })1514 }1515 return { reason: 'aborted_tools' }1516 }15171518 // If a hook indicated to prevent continuation, stop here1519 if (shouldPreventContinuation) {1520 return { reason: 'hook_stopped' }1521 }15221523 if (tracking?.compacted) {1524 tracking.turnCounter++1525 logEvent('tengu_post_autocompact_turn', {1526 turnId:1527 tracking.turnId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,1528 turnCounter: tracking.turnCounter,15291530 queryChainId: queryChainIdForAnalytics,1531 queryDepth: queryTracking.depth,1532 })1533 }15341535 // Be careful to do this after tool calls are done, because the API1536 // will error if we interleave tool_result messages with regular user messages.15371538 // Instrumentation: Track message count before attachments1539 logEvent('tengu_query_before_attachments', {1540 messagesForQueryCount: messagesForQuery.length,1541 assistantMessagesCount: assistantMessages.length,1542 toolResultsCount: toolResults.length,1543 queryChainId: queryChainIdForAnalytics,1544 queryDepth: queryTracking.depth,1545 })15461547 // Get queued commands snapshot before processing attachments.1548 // These will be sent as attachments so Claude can respond to them in the current turn.1549 //1550 // Drain pending notifications. LocalShellTask completions are 'next'1551 // (when MONITOR_TOOL is on) and drain without Sleep. Other task types1552 // (agent/workflow/framework) still default to 'later' — the Sleep flush1553 // covers those. If all task types move to 'next', this branch could go.1554 //1555 // Slash commands are excluded from mid-turn drain — they must go through1556 // processSlashCommand after the turn ends (via useQueueProcessor), not be1557 // sent to the model as text. Bash-mode commands are already excluded by1558 // INLINE_NOTIFICATION_MODES in getQueuedCommandAttachments.1559 //1560 // Agent scoping: the queue is a process-global singleton shared by the1561 // coordinator and all in-process subagents. Each loop drains only what's1562 // addressed to it — main thread drains agentId===undefined, subagents1563 // drain their own agentId. User prompts (mode:'prompt') still go to main1564 // only; subagents never see the prompt stream.1565 // eslint-disable-next-line custom-rules/require-tool-match-name -- ToolUseBlock.name has no aliases1566 const sleepRan = toolUseBlocks.some(b => b.name === SLEEP_TOOL_NAME)1567 const isMainThread =1568 querySource.startsWith('repl_main_thread') || querySource === 'sdk'1569 const currentAgentId = toolUseContext.agentId1570 const queuedCommandsSnapshot = getCommandsByMaxPriority(1571 sleepRan ? 'later' : 'next',1572 ).filter(cmd => {1573 if (isSlashCommand(cmd)) return false1574 if (isMainThread) return cmd.agentId === undefined1575 // Subagents only drain task-notifications addressed to them — never1576 // user prompts, even if someone stamps an agentId on one.1577 return cmd.mode === 'task-notification' && cmd.agentId === currentAgentId1578 })15791580 for await (const attachment of getAttachmentMessages(1581 null,1582 updatedToolUseContext,1583 null,1584 queuedCommandsSnapshot,1585 [...messagesForQuery, ...assistantMessages, ...toolResults],1586 querySource,1587 )) {1588 yield attachment1589 toolResults.push(attachment)1590 }15911592 // Memory prefetch consume: only if settled and not already consumed on1593 // an earlier iteration. If not settled yet, skip (zero-wait) and retry1594 // next iteration — the prefetch gets as many chances as there are loop1595 // iterations before the turn ends. readFileState (cumulative across1596 // iterations) filters out memories the model already Read/Wrote/Edited1597 // — including in earlier iterations, which the per-iteration1598 // toolUseBlocks array would miss.1599 if (1600 pendingMemoryPrefetch &&1601 pendingMemoryPrefetch.settledAt !== null &&1602 pendingMemoryPrefetch.consumedOnIteration === -11603 ) {1604 const memoryAttachments = filterDuplicateMemoryAttachments(1605 await pendingMemoryPrefetch.promise,1606 toolUseContext.readFileState,1607 )1608 for (const memAttachment of memoryAttachments) {1609 const msg = createAttachmentMessage(memAttachment)1610 yield msg1611 toolResults.push(msg)1612 }1613 pendingMemoryPrefetch.consumedOnIteration = turnCount - 11614 }161516161617 // Inject prefetched skill discovery. collectSkillDiscoveryPrefetch emits1618 // hidden_by_main_turn — true when the prefetch resolved before this point1619 // (should be >98% at AKI@250ms / Haiku@573ms vs turn durations of 2-30s).1620 if (skillPrefetch && pendingSkillPrefetch) {1621 const skillAttachments =1622 await skillPrefetch.collectSkillDiscoveryPrefetch(pendingSkillPrefetch)1623 for (const att of skillAttachments) {1624 const msg = createAttachmentMessage(att)1625 yield msg1626 toolResults.push(msg)1627 }1628 }16291630 // Remove only commands that were actually consumed as attachments.1631 // Prompt and task-notification commands are converted to attachments above.1632 const consumedCommands = queuedCommandsSnapshot.filter(1633 cmd => cmd.mode === 'prompt' || cmd.mode === 'task-notification',1634 )1635 if (consumedCommands.length > 0) {1636 for (const cmd of consumedCommands) {1637 if (cmd.uuid) {1638 consumedCommandUuids.push(cmd.uuid)1639 notifyCommandLifecycle(cmd.uuid, 'started')1640 }1641 }1642 removeFromQueue(consumedCommands)1643 }16441645 // Instrumentation: Track file change attachments after they're added1646 const fileChangeAttachmentCount = count(1647 toolResults,1648 tr =>1649 tr.type === 'attachment' && tr.attachment.type === 'edited_text_file',1650 )16511652 logEvent('tengu_query_after_attachments', {1653 totalToolResultsCount: toolResults.length,1654 fileChangeAttachmentCount,1655 queryChainId: queryChainIdForAnalytics,1656 queryDepth: queryTracking.depth,1657 })16581659 // Refresh tools between turns so newly-connected MCP servers become available1660 if (updatedToolUseContext.options.refreshTools) {1661 const refreshedTools = updatedToolUseContext.options.refreshTools()1662 if (refreshedTools !== updatedToolUseContext.options.tools) {1663 updatedToolUseContext = {1664 ...updatedToolUseContext,1665 options: {1666 ...updatedToolUseContext.options,1667 tools: refreshedTools,1668 },1669 }1670 }1671 }16721673 const toolUseContextWithQueryTracking = {1674 ...updatedToolUseContext,1675 queryTracking,1676 }16771678 // Each time we have tool results and are about to recurse, that's a turn1679 const nextTurnCount = turnCount + 116801681 // Periodic task summary for `claude ps` — fires mid-turn so a1682 // long-running agent still refreshes what it's working on. Gated1683 // only on !agentId so every top-level conversation (REPL, SDK, HFI,1684 // remote) generates summaries; subagents/forks don't.1685 if (feature('BG_SESSIONS')) {1686 if (1687 !toolUseContext.agentId &&1688 taskSummaryModule!.shouldGenerateTaskSummary()1689 ) {1690 taskSummaryModule!.maybeGenerateTaskSummary({1691 systemPrompt,1692 userContext,1693 systemContext,1694 toolUseContext,1695 forkContextMessages: [1696 ...messagesForQuery,1697 ...assistantMessages,1698 ...toolResults,1699 ],1700 })1701 }1702 }17031704 // Check if we've reached the max turns limit1705 if (maxTurns && nextTurnCount > maxTurns) {1706 yield createAttachmentMessage({1707 type: 'max_turns_reached',1708 maxTurns,1709 turnCount: nextTurnCount,1710 })1711 return { reason: 'max_turns', turnCount: nextTurnCount }1712 }17131714 queryCheckpoint('query_recursive_call')1715 const next: State = {1716 messages: [...messagesForQuery, ...assistantMessages, ...toolResults],1717 toolUseContext: toolUseContextWithQueryTracking,1718 autoCompactTracking: tracking,1719 turnCount: nextTurnCount,1720 maxOutputTokensRecoveryCount: 0,1721 hasAttemptedReactiveCompact: false,1722 pendingToolUseSummary: nextPendingToolUseSummary,1723 maxOutputTokensOverride: undefined,1724 stopHookActive,1725 transition: { reason: 'next_turn' },1726 }1727 state = next1728 } // while (true)1729}1730