The Streaming Tool Executor: How to Safely Let AI Operate Multiple Tools Simultaneously

Deep dive into StreamingToolExecutor's concurrency model — isConcurrencySafe declarations, queue scheduling, Sibling Abort cascading cancellation, progress buffering and ordered emission

The Problem

Imagine this scenario: you ask Claude Code to refactor a module. The model returns 5 tool_use calls in a single response — 3 file reads, 1 Bash command execution, and 1 file write. Now the questions arise:

  1. Should these 5 tools run serially or in parallel?
  2. If the Bash command fails, should the file reads running in parallel be cancelled?
  3. The file write depends on the Bash result — should it wait for Bash to complete before executing?
  4. The user presses ESC during tool execution — which tools should stop, and which should continue?
  5. Multiple tools produce progress messages simultaneously — how should the UI display them in order?

These questions seem simple, but each one involves core challenges of concurrency control. Serial execution is too slow — users don't want to wait for 3 independent file reads to complete one after another. Full parallelism is too dangerous — a write operation and a read operation accessing the same file simultaneously could cause a data race.

Claude Code's solution is StreamingToolExecutor — a carefully designed concurrency orchestrator that lets each tool declare whether it can run in parallel, then dynamically schedules execution based on those declarations. This article will dissect every design decision in detail.


Why a Streaming Tool Executor?

In the previous article, we covered the overall architecture of the tool system. But one key question was intentionally deferred to this article: when the model returns multiple tool calls in a single streaming response, how does the executor manage their lifecycles?

Traditional approaches fall into two extremes:

Approach A: Fully Serial

Fully Serial Execution (Total time: 4T)0T1T2T3T4TExecutionTool 1Tool 2Tool 3Tool 4

Safe but extremely slow. Each tool waits for the previous one to finish before starting. For 3 independent file reads, this means 3x the wait time.

Approach B: Fully Parallel

Fully Parallel Execution (Total time: T)0T1TExecutionTool 1Tool 2Tool 3Tool 4

Fast but dangerous. If Tool 1 is rm -rf build/ and Tool 2 is cat build/output.js, the result of parallel execution is unpredictable.

Approach C: Claude Code's Hybrid Scheduling

Hybrid Scheduling (Total time: 2T)0T1T2TParallel ReadsTool 1 (Read)Tool 2 (Read)Tool 3 (Read)Exclusive WriteTool 4 (Bash)

Reads run in parallel, writes get exclusive access. Safe and efficient.

This is the core problem StreamingToolExecutor solves.


Architecture Overview

StreamingToolExecutor lives in src/services/tools/StreamingToolExecutor.ts and is a class of roughly 530 lines. Its responsibilities are:

  1. Receive tool calls — accept tool_use blocks one by one as the streaming response arrives
  2. Determine scheduling strategy — based on each tool's concurrency safety declaration, decide whether to execute immediately or queue
  3. Manage lifecycles — track each tool from queuing to completion
  4. Handle error cascading — one tool's failure may require cancelling its sibling tools
  5. Emit results in order — progress messages are sent immediately, final results are emitted in sequence

Here is the overall architecture diagram:

...

TrackedTool: The Complete Lifecycle of a Tool

Every tool call that enters the executor is wrapped in a TrackedTool object. This structure is defined at lines 21-32 of StreamingToolExecutor.ts:

src/services/tools/StreamingToolExecutor.ts:19-32
TypeScript
19// The four possible lifecycle states of a tool within the executor.
20// Tools always progress linearly: queued -> executing -> completed -> yielded.
21// This strict ordering ensures predictable state transitions and simplifies
22// reasoning about concurrency — no tool can "go back" to a previous state.
23type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
24
25// TrackedTool wraps every tool call that enters the executor, adding the
26// metadata needed for lifecycle management, concurrency scheduling, and
27// ordered result emission. This is the central data structure of the system.
28type TrackedTool = {
29 // Unique identifier matching the tool_use block's ID from the API response
30 id: string
31 // The raw tool_use block from the streaming API (contains tool name and input)
32 block: ToolUseBlock
33 // Reference to the parent assistant message that triggered this tool call
34 assistantMessage: AssistantMessage
35 // Current lifecycle state — drives scheduling and emission decisions
36 status: ToolStatus
37 // Pre-computed concurrency safety flag (evaluated once at addTool time,
38 // not re-evaluated during execution, since input doesn't change)
39 isConcurrencySafe: boolean
40 // The execution Promise — set when the tool starts running, used by
41 // getRemainingResults() to await completion via Promise.race
42 promise?: Promise<void>
43 // Buffered final results — stored here until ordered emission by
44 // getCompletedResults(). Even if this tool finishes early, results
45 // wait until all prior tools have been emitted first.
46 results?: Message[]
47 // Progress messages are stored separately and yielded immediately
48 // (e.g., streaming Bash output). Unlike results, progress messages
49 // bypass the ordering constraint for real-time user feedback.
50 pendingProgress: Message[]
51 // Optional functions that modify the shared execution context after
52 // this tool completes. Only supported for non-concurrent tools to
53 // avoid race conditions on shared state.
54 contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
55}

Four Lifecycle States

ToolStatus is a four-value enum, and each tool flows strictly through queued -> executing -> completed -> yielded:

...

queued (waiting): The tool was just added by addTool() and hasn't started executing yet. There may be other non-concurrency-safe tools currently running exclusively, so it must wait.

executing (running): The tool has started execution. Its promise field holds the execution Promise, and progress messages are collected in real time via the pendingProgress array.

completed (finished): Tool execution has ended (success, failure, or cancellation), and results are stored in the results field but haven't been emitted to the caller yet. This is the key to ordered emission — even if Tool 3 finishes first, it waits for Tool 1 and Tool 2's results to be emitted first.

yielded (emitted): Results have been emitted to the caller via getCompletedResults(), and this tool's lifecycle is completely over.

Key Field Analysis

pendingProgress is a field worth special attention. Progress messages (like real-time output from a Bash command) need to be shown to the user immediately and can't wait until the tool completes. So progress messages and final results are stored separately — progress messages can be emitted at any time, while final results must be emitted in order.

contextModifiers stores the tool's modifications to the execution context. For example, a tool might need to update file history state. But note an important restriction in the code (lines 391-395):

src/services/tools/StreamingToolExecutor.ts:389-395
TypeScript
389// NOTE: we currently don't support context modifiers for concurrent
390// tools. None are actively being used, but if we want to use
391// them in concurrent tools, we need to support that here.
392
393// Only apply context modifications from non-concurrent (exclusive) tools.
394// This guard prevents race conditions: if two concurrent tools both tried
395// to modify toolUseContext simultaneously, the final state would be
396// non-deterministic. By restricting modifiers to exclusive tools, we
397// guarantee that context updates happen sequentially and predictably.
398if (!tool.isConcurrencySafe && contextModifiers.length > 0) {
399 // Apply each modifier in order — each one receives the context produced
400 // by the previous modifier, forming a reduction chain.
401 for (const modifier of contextModifiers) {
402 this.toolUseContext = modifier(this.toolUseContext)
403 }
404}

Only non-concurrency-safe tools can modify the context. This is a deliberate design constraint — concurrent tools modifying shared context would introduce race conditions, so it's simply prohibited.


isConcurrencySafe: Tools Decide for Themselves Whether They Can Run in Parallel

The most fundamental design principle of StreamingToolExecutor is that tools declare their own concurrency safety. Not guessed by the scheduler, not defined in a global configuration table, but implemented by each tool in its isConcurrencySafe() method.

This method is defined at line 402 of src/Tool.ts:

src/Tool.ts:402
TypeScript
402// Each tool implements this method to declare whether it can safely run
403// alongside other tools. The input parameter is key — it allows the SAME
404// tool to return different answers depending on what it's being asked to do
405// (e.g., a Bash tool running "ls" is safe, but "rm -rf" is not).
406isConcurrencySafe(input: z.infer<Input>): boolean

Note that it accepts an input parameter — this means the same tool may have different concurrency safety depending on the input.

Concurrency Safety Declarations Across Tools

Let's look at how various tools actually declare themselves in the code:

FileReadTool (file reading) — always concurrency-safe:

src/tools/FileReadTool/FileReadTool.ts:373-375
TypeScript
373// File reads are purely read-only — no side effects, no shared state mutation.
374isConcurrencySafe() {
375 return true
376},

File reading is a purely read-only operation; multiple reads running simultaneously produce no side effects.

GrepTool (search) — always concurrency-safe:

src/tools/GrepTool/GrepTool.ts:183-185
TypeScript
183// Search/grep is read-only, safe to run multiple searches in parallel.
184isConcurrencySafe() {
185 return true
186},

Search operations are likewise read-only, naturally supporting parallelism.

AgentTool (sub-agent) — always concurrency-safe:

src/tools/AgentTool/AgentTool.tsx:1273-1275
TypeScript
1273// Sub-agents run in fully isolated contexts (separate message history,
1274// separate tool instances), so they cannot interfere with each other.
1275isConcurrencySafe() {
1276 return true;
1277},

The sub-agent tool declares itself as concurrency-safe because each sub-agent runs in its own isolated context.

BashTool (command execution) — depends on input:

src/tools/BashTool/BashTool.tsx:434-436
TypeScript
434// The Bash tool's safety depends on the COMMAND being run. The isReadOnly
435// helper analyzes the command string to determine if it only reads data
436// (e.g., ls, cat, grep) or has side effects (e.g., rm, mv, git commit).
437// If isReadOnly is not defined or throws, the nullish coalescing (??)
438// falls back to false — the conservative "assume unsafe" default.
439isConcurrencySafe(input) {
440 return this.isReadOnly?.(input) ?? false;
441},

This is the most interesting case. The Bash tool's concurrency safety depends on whether the command itself is read-only. ls, cat, grep are read-only and can run in parallel; rm, mv, git commit have side effects and must run exclusively.

Default behavior — assume unsafe (line 759):

src/Tool.ts:757-759
TypeScript
757// Default values applied to all tools created via buildTool().
758// The underscore prefix on _input signals that the parameter is
759// intentionally unused — the default always returns false.
760const TOOL_DEFAULTS = {
761 // ...
762 // Conservative default: any tool that doesn't explicitly opt in to
763 // concurrency safety is treated as unsafe and will run exclusively.
764 // This "fail-safe" approach means new tools are safe by default —
765 // developers must consciously declare parallelism support.
766 isConcurrencySafe: (_input?: unknown) => false,
767 // ...
768}

Tools built through buildTool() that don't explicitly declare isConcurrencySafe default to returning false. This is a conservatively safe design — better to sacrifice performance than risk concurrency issues.

Safety Calculation in addTool

When a tool is added to the executor, the isConcurrencySafe calculation process is worth careful examination. See lines 104-121 of StreamingToolExecutor.ts:

src/services/tools/StreamingToolExecutor.ts:104-121
TypeScript
104// LAYER 1: Validate the tool's input against its Zod schema.
105// safeParse returns { success: true, data } or { success: false, error }
106// without throwing, so invalid input is handled gracefully.
107const parsedInput = toolDefinition.inputSchema.safeParse(block.input)
108
109// Determine concurrency safety through a three-layer defensive chain:
110const isConcurrencySafe = parsedInput?.success
111 ? (() => {
112 try {
113 // LAYER 2: Call the tool's isConcurrencySafe with validated input.
114 // Wrapped in try-catch because a bug in the tool's implementation
115 // could throw — we never want a tool definition error to crash
116 // the entire executor.
117 // LAYER 3: Boolean() coercion prevents truthy-but-not-true values
118 // (e.g., a non-empty string) from being treated as safe.
119 return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data))
120 } catch {
121 // Any exception in the safety check defaults to unsafe —
122 // if we can't determine safety, assume the worst.
123 return false
124 }
125 })()
126 : false // Invalid input => treat as unsafe (will likely error during execution anyway)
127
128// Create the TrackedTool entry and add it to the queue.
129// Every tool starts in the 'queued' state and waits for processQueue()
130// to determine when it can begin executing.
131this.tools.push({
132 id: block.id,
133 block,
134 assistantMessage,
135 status: 'queued',
136 isConcurrencySafe,
137 pendingProgress: [],
138})

There are three layers of defense here:

  1. Input validation: First validate input using the Zod schema. If the input format is invalid, it's immediately marked as non-concurrency-safe.
  2. try-catch wrapper: Even if the input is valid, isConcurrencySafe() itself might throw an exception (e.g., a bug in the tool definition). Any exception falls back to false.
  3. Boolean coercion: The result is wrapped in Boolean() to prevent tools from accidentally returning truthy values (like non-empty strings).

This "defense in depth" pattern is ubiquitous in Claude Code — on code paths related to concurrency and safety, always assume the worst case.


canExecuteTool: The Core Scheduling Decision

Given each tool's concurrency safety declaration, how does the scheduler decide whether a tool can execute immediately? The logic is remarkably concise, just 6 lines of code (lines 129-135):

src/services/tools/StreamingToolExecutor.ts:129-135
TypeScript
129// The core scheduling decision — implements a read-write lock pattern
130// in just 6 lines. This is called for every queued tool to determine
131// if it can start executing right now.
132private canExecuteTool(isConcurrencySafe: boolean): boolean {
133 // Snapshot of all currently running tools
134 const executingTools = this.tools.filter(t => t.status === 'executing')
135 return (
136 // Condition 1: No tools running — the executor is idle, so any
137 // tool (safe or unsafe) can start immediately.
138 executingTools.length === 0 ||
139 // Condition 2: Both the new tool AND all running tools are
140 // concurrency-safe. This is the "multiple readers" case —
141 // safe tools can freely coexist. If ANY running tool is unsafe,
142 // this condition fails and the new tool must wait.
143 (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
144 )
145}

In plain language: a tool can execute if and only if one of the following two conditions holds:

  1. No tools are currently executing (idle state, any tool can start)
  2. The current tool is concurrency-safe, and all currently executing tools are also concurrency-safe

This logic implies an important corollary: as long as any non-concurrency-safe tool is executing, all other tools must wait. Non-concurrency-safe tools get exclusive access.

Let's visualize with a table:

Currently Executing ToolsNew Tool (safe)New Tool (unsafe)
None (idle)Can executeCan execute
All safeCan executeWait
Includes unsafeWaitWait

This is a classic read-write lock pattern: concurrency-safe tools are like read locks (multiple can coexist), non-concurrency-safe tools are like write locks (must be exclusive).


processQueue: The Subtleties of Queue Scheduling

The processQueue() method (lines 140-151) is responsible for traversing the queue and starting executable tools:

src/services/tools/StreamingToolExecutor.ts:140-151
TypeScript
140// Scans the tool queue front-to-back, starting any tools that are
141// eligible to execute. Called both when new tools are added and when
142// running tools complete — creating a self-driving scheduling loop.
143private async processQueue(): Promise<void> {
144 for (const tool of this.tools) {
145 // Skip tools that are already running, finished, or emitted —
146 // only 'queued' tools need scheduling decisions.
147 if (tool.status !== 'queued') continue
148
149 if (this.canExecuteTool(tool.isConcurrencySafe)) {
150 // Tool can run now — start it. Note: executeTool() sets up the
151 // Promise but does NOT await completion, so multiple tools can
152 // be kicked off within a single processQueue() pass.
153 await this.executeTool(tool)
154 } else {
155 // Can't execute this tool yet, and since we need to maintain
156 // order for non-concurrent tools, stop here
157 //
158 // CRITICAL: For unsafe tools, we BREAK — not continue. This prevents
159 // tools AFTER this unsafe tool from being scheduled out of order.
160 // Without this break, a later safe tool might leapfrog this unsafe
161 // tool and execute before it, violating ordering guarantees.
162 // Safe tools just skip (implicit continue) because they don't
163 // impose ordering constraints on subsequent tools.
164 if (!tool.isConcurrencySafe) break
165 }
166 }
167}

This code has an easily overlooked but critically important detail — the break statement. When it encounters a non-concurrency-safe tool that can't execute, the scheduler stops traversal. Why?

Consider the following tool sequence:

Tool Sequence with Concurrency Safety0T1T2T3T4TsafeRead ARead BRead CunsafeBash "git add ."Bash "git commit"

Without the break, the scheduler would skip Bash "git add ." when it can't execute and continue checking Read C. Read C is concurrency-safe and might be started. But this is problematic — Read C would execute before git add ., potentially reading file contents not yet staged.

The break ensures ordering between non-concurrency-safe tools. Once a queued non-concurrency-safe tool is encountered, no subsequent tools (safe or not) will be started.

Conversely: what if the tool that can't execute is a concurrency-safe one? It's simply skipped (continue) and doesn't prevent scheduling of subsequent tools. When would a concurrency-safe tool be unable to execute? When a non-concurrency-safe tool currently has exclusive access. Once the exclusive tool completes, all queued concurrency-safe tools can start together.

When processQueue Is Triggered

processQueue() is called in two places:

  1. In addTool() (line 123): every time a new tool is added, immediately try to schedule it.
  2. When executeTool() completes (lines 402-404): after a tool finishes, trigger a new round of scheduling.
src/services/tools/StreamingToolExecutor.ts:398-404
TypeScript
398// Start the tool's execution and capture the resulting Promise.
399// This Promise resolves when the tool finishes (success, error, or abort).
400const promise = collectResults()
401// Store the Promise on the TrackedTool so getRemainingResults() can
402// use Promise.race() to efficiently wait for the next tool to complete.
403tool.promise = promise
404
405// Process more queue when done
406// This .finally() callback creates the self-driving scheduling loop:
407// when this tool completes, it triggers another scheduling pass, which
408// may start queued tools that were waiting for this one to finish.
409// The void keyword discards the returned Promise — we don't need to
410// await the next processQueue() call here.
411void promise.finally(() => {
412 void this.processQueue()
413})

This creates a self-driving loop: tool completes -> try to schedule -> new tool starts -> new tool completes -> schedule again... until the queue is empty.


Sibling AbortController: Cascading Cancellation of Errors

One of the trickiest problems with concurrent execution is error handling. When multiple tools are running in parallel, how should one tool's failure affect the others?

Claude Code's design is: only Bash tool errors cascade-cancel sibling tools. This design stems from a practical observation — Bash commands often have implicit dependency chains (mkdir fails, so the subsequent cd and touch are pointless), while Read, Grep, WebFetch and other tools are independent — one file read failure shouldn't affect another file's read.

Three-Layer AbortController Architecture

Error cascading relies on a carefully designed three-layer AbortController architecture:

...

Layer 1: Query-Level AbortController (toolUseContext.abortController)

This is the lifecycle controller for the entire query turn. When the user presses ESC or submits a new message, this controller is aborted, causing the entire turn to end.

Layer 2: Sibling-Level AbortController (siblingAbortController)

This is created by StreamingToolExecutor during construction as a child controller of the query-level controller (lines 59-61):

src/services/tools/StreamingToolExecutor.ts:59-61
TypeScript
59// Create the sibling-level AbortController as a CHILD of the query-level
60// controller. This parent-child relationship means:
61// - If the query-level controller aborts (user presses ESC), the sibling
62// controller is also aborted, cascading to all tools.
63// - If the sibling controller aborts (Bash error), it does NOT propagate
64// upward — the query continues so the model can see the error and react.
65this.siblingAbortController = createChildAbortController(
66 toolUseContext.abortController,
67)

Key property: aborting the sibling-level controller does not abort the parent controller. This means a Bash error can cancel all sibling tools without terminating the entire query turn — the model will still receive the error information and continue reasoning.

Layer 3: Tool-Level AbortController (toolAbortController)

Each tool creates its own controller during execution as a child of the sibling-level controller (lines 301-302):

src/services/tools/StreamingToolExecutor.ts:301-302
TypeScript
301// Each tool gets its own AbortController, created as a child of the
302// sibling-level controller. This gives each tool an independent abort
303// handle while still responding to group-level and query-level aborts
304// via the parent-child chain: query -> sibling -> tool.
305const toolAbortController = createChildAbortController(
306 this.siblingAbortController,
307)

Bash Error Cascade Path

When a Bash tool execution fails, the complete cascade path is as follows (lines 354-363):

src/services/tools/StreamingToolExecutor.ts:354-363
TypeScript
354// When a tool produces an error result, determine whether it should
355// cascade-cancel sibling tools or just fail in isolation.
356if (isErrorResult) {
357 // Mark this tool as errored so it won't also receive a duplicate
358 // synthetic "sibling_error" message (see deduplication logic below).
359 thisToolErrored = true
360 // Only Bash errors cancel siblings. Bash commands often have implicit
361 // dependency chains (e.g. mkdir fails -> subsequent commands pointless).
362 // Read/WebFetch/etc are independent — one failure shouldn't nuke the rest.
363 if (tool.block.name === BASH_TOOL_NAME) {
364 // Set executor-wide error state so getAbortReason() returns
365 // 'sibling_error' for all other tools on their next check.
366 this.hasErrored = true
367 // Record which tool caused the error, so synthetic error messages
368 // for cancelled siblings can include a helpful description like
369 // "Cancelled: parallel tool call Bash(mkdir /tmp/test) errored".
370 this.erroredToolDescription = this.getToolDescription(tool)
371 // Abort the sibling controller — this cascades to ALL tool-level
372 // controllers via the parent-child chain, signaling every other
373 // running tool to stop. Does NOT propagate to the query-level
374 // controller, so the model can still process the error.
375 this.siblingAbortController.abort('sibling_error')
376 }
377}

Execution flow:

  1. The Bash tool's execution result contains a tool_result with is_error: true
  2. The hasErrored flag is set to true
  3. erroredToolDescription records the description of the errored tool (e.g., Bash(mkdir /tmp/test...))
  4. siblingAbortController.abort('sibling_error') is called
  5. This abort signal propagates through createChildAbortController's parent-child relationship to all other tools' toolAbortController
  6. Executing tools that receive the abort signal generate synthetic error messages (lines 189-204)

Tool-Level Abort Upward Propagation

The tool-level AbortController has a subtle event listener (lines 304-317) that handles a special case — when permission dialog denial occurs:

src/services/tools/StreamingToolExecutor.ts:304-317
TypeScript
304// Listen for this tool's abort signal to handle UPWARD propagation —
305// certain abort reasons need to bubble up from tool-level to query-level.
306// This was added to fix regression #21056 where permission denials
307// weren't properly terminating the query turn.
308toolAbortController.signal.addEventListener(
309 'abort',
310 () => {
311 // Only propagate upward if ALL three conditions are met:
312 if (
313 // 1. The abort was NOT caused by a sibling error (those are
314 // intentionally contained at the sibling level).
315 toolAbortController.signal.reason !== 'sibling_error' &&
316 // 2. The query-level controller is not already aborted
317 // (no need to abort something already aborted).
318 !this.toolUseContext.abortController.signal.aborted &&
319 // 3. The executor is not in discard mode (streaming fallback
320 // handles cleanup differently).
321 !this.discarded
322 ) {
323 // Propagate the abort reason (e.g., permission denial) up to
324 // the query-level controller, terminating the entire turn.
325 this.toolUseContext.abortController.abort(
326 toolAbortController.signal.reason,
327 )
328 }
329 },
330 // { once: true } ensures this handler fires at most once and is
331 // automatically removed, preventing memory leaks.
332 { once: true },
333)

This code means: if the tool is aborted for a reason other than a sibling error (such as permission denial), then this abort needs to bubble up to the query-level controller to terminate the entire turn. The code comments mention #21056 regression — this upward bubbling logic was added to fix a specific regression bug.

Synthetic Error Messages

Cancelled tools aren't simply discarded — they receive a synthetic error message so the model knows these tools didn't execute successfully. The createSyntheticErrorMessage method (lines 153-205) generates different error messages based on the cancellation reason:

src/services/tools/StreamingToolExecutor.ts:153-205
TypeScript
153// Generates a synthetic tool_result error message for tools that were
154// cancelled before completing. The model receives these messages in its
155// conversation history so it understands WHY tools didn't produce results.
156// Each cancellation reason produces a distinct message tailored to help
157// the model make the best next decision.
158private createSyntheticErrorMessage(
159 toolUseId: string,
160 reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback',
161 assistantMessage: AssistantMessage,
162): Message {
163 // Case 1: User actively cancelled (pressed ESC or submitted new message).
164 // The memory correction hint tells the model not to retry the same action,
165 // since the user explicitly chose to reject it.
166 if (reason === 'user_interrupted') {
167 return createUserMessage({
168 content: [{
169 type: 'tool_result',
170 content: withMemoryCorrectionHint(REJECT_MESSAGE),
171 is_error: true,
172 tool_use_id: toolUseId,
173 }],
174 toolUseResult: 'User rejected tool use',
175 // ...
176 })
177 }
178 // Case 2: Streaming connection failed and the executor is discarding
179 // all in-flight work. This is a transient infrastructure error, not
180 // a tool logic error — the model will typically retry.
181 if (reason === 'streaming_fallback') {
182 return createUserMessage({
183 content: [{
184 type: 'tool_result',
185 content: '<tool_use_error>Error: Streaming fallback - tool execution discarded</tool_use_error>',
186 is_error: true,
187 tool_use_id: toolUseId,
188 }],
189 // ...
190 })
191 }
192 // Case 3: A sibling Bash tool errored, causing this tool to be cancelled.
193 // Include the description of the ERRORED tool so the model knows which
194 // sibling caused the cascade and can reason about the dependency.
195 // sibling_error
196 const desc = this.erroredToolDescription
197 const msg = desc
198 ? `Cancelled: parallel tool call ${desc} errored`
199 : 'Cancelled: parallel tool call errored'
200 return createUserMessage({
201 content: [{
202 type: 'tool_result',
203 content: `<tool_use_error>${msg}</tool_use_error>`,
204 is_error: true,
205 tool_use_id: toolUseId,
206 }],
207 // ...
208 })
209}

Three cancellation reasons produce three different messages:

ReasonMessage ContentPurpose
sibling_errorCancelled: parallel tool call Bash(mkdir...) erroredModel knows which sibling tool failed
user_interruptedUser rejected tool use + memory correction hintModel knows the user actively cancelled
streaming_fallbackStreaming fallback - tool execution discardedSilent cancellation during streaming fallback

Preventing Duplicate Error Messages

There's an elegant deduplication logic in the code — the thisToolErrored flag (lines 330-345):

src/services/tools/StreamingToolExecutor.ts:328-345
TypeScript
328// Track if this specific tool has produced an error result.
329// This prevents the tool from receiving a duplicate "sibling error"
330// message when it is the one that caused the error.
331// Without this flag, a Bash tool that errors would: (1) produce its own
332// error result, (2) trigger siblingAbortController.abort(), (3) then on
333// the next loop iteration, getAbortReason() returns 'sibling_error' for
334// THIS tool too, generating a SECOND spurious error. The flag prevents step 3.
335let thisToolErrored = false
336
337// Consume the tool's execution generator, processing each update as it arrives.
338// This is the main per-tool execution loop where progress, results, and abort
339// signals are all handled in real time.
340for await (const update of generator) {
341 // On every iteration, check if this tool should be cancelled.
342 // getAbortReason() checks (in priority order): discard mode, sibling
343 // errors, and user interrupts.
344 const abortReason = this.getAbortReason(tool)
345 // Only inject a synthetic error if the tool hasn't already produced its
346 // own error. This is the deduplication guard — the originating error tool
347 // already has a real error message and doesn't need a synthetic one.
348 if (abortReason && !thisToolErrored) {
349 messages.push(
350 this.createSyntheticErrorMessage(
351 tool.id,
352 abortReason,
353 tool.assistantMessage,
354 ),
355 )
356 // Stop consuming updates — this tool is done.
357 break
358 }
359 // ...
360 if (isErrorResult) {
361 // Mark this tool as errored BEFORE the abort cascades, so the
362 // deduplication guard above will prevent a synthetic message.
363 thisToolErrored = true
364 // ...
365 }
366}

If Tool A is a Bash tool that errors, it triggers siblingAbortController.abort(). At this point, getAbortReason() would also return sibling_error for Tool A itself. But because thisToolErrored has already been set to true, Tool A won't receive an additional synthetic error message — it already has its own real error result.


Progress Buffering and Ordered Emission

Concurrent execution introduces an output ordering problem. Suppose Tool 1 and Tool 2 are running in parallel, and Tool 2 finishes first — should its results be emitted before Tool 1's?

Claude Code's answer is to treat two types of output differently:

  1. Progress messages: emitted immediately, no ordering required
  2. Final results: must be emitted in tool addition order

Immediate Emission of Progress Messages

In the execution loop of the executeTool() method (lines 366-374), progress messages are stored in the pendingProgress array:

src/services/tools/StreamingToolExecutor.ts:366-374
TypeScript
366// Route each message to the appropriate buffer based on its type.
367// This two-path design is fundamental to the output strategy:
368// progress is real-time, results are ordered.
369if (update.message) {
370 // Progress messages go to pendingProgress for immediate yielding
371 // (e.g., streaming Bash stdout lines). Users see these in real time.
372 if (update.message.type === 'progress') {
373 tool.pendingProgress.push(update.message)
374 // Signal that progress is available — this resolves the Promise
375 // that getRemainingResults() is awaiting via Promise.race(),
376 // waking it up to emit the new progress immediately.
377 if (this.progressAvailableResolve) {
378 this.progressAvailableResolve()
379 // Clear the resolver so the next wait creates a fresh Promise.
380 this.progressAvailableResolve = undefined
381 }
382 } else {
383 // Non-progress messages (tool_result, etc.) are final results —
384 // buffer them for ordered emission by getCompletedResults().
385 messages.push(update.message)
386 }
387}

Note the progressAvailableResolve semaphore — when new progress messages arrive, it wakes up the waiting getRemainingResults().

Ordered Emission of Results

The getCompletedResults() method (lines 412-440) implements ordered emission logic:

src/services/tools/StreamingToolExecutor.ts:412-440
TypeScript
412// A synchronous generator that emits completed tool results in the order
413// tools were added (not the order they completed). This preserves a
414// deterministic output order regardless of parallel execution timing.
415*getCompletedResults(): Generator<MessageUpdate, void> {
416 // If the executor has been discarded (streaming fallback), produce nothing.
417 // Any residual results would be stale and potentially confusing.
418 if (this.discarded) {
419 return
420 }
421
422 // Walk through tools in insertion order (the order the model requested them).
423 for (const tool of this.tools) {
424 // Always yield pending progress messages immediately,
425 // regardless of tool status.
426 // Progress is always emitted eagerly — even for tools that haven't
427 // completed yet or have already been yielded. shift() drains the
428 // buffer one message at a time.
429 while (tool.pendingProgress.length > 0) {
430 const progressMessage = tool.pendingProgress.shift()!
431 yield { message: progressMessage, newContext: this.toolUseContext }
432 }
433
434 // Already emitted — nothing more to do for this tool.
435 if (tool.status === 'yielded') {
436 continue
437 }
438
439 if (tool.status === 'completed' && tool.results) {
440 // Transition to 'yielded' — this tool's lifecycle is now complete.
441 tool.status = 'yielded'
442
443 // Emit all buffered final results for this tool.
444 for (const message of tool.results) {
445 yield { message, newContext: this.toolUseContext }
446 }
447
448 // Notify the context that this tool is done (used for UI updates,
449 // permission tracking, etc.)
450 markToolUseAsComplete(this.toolUseContext, tool.id)
451 } else if (tool.status === 'executing' && !tool.isConcurrencySafe) {
452 // CRITICAL BREAK: An unsafe tool is still running. We must NOT emit
453 // any results from tools that come AFTER it, because the unsafe tool's
454 // contextModifiers may change the context that later results depend on.
455 // Wait for it to complete before continuing emission.
456 break
457 }
458 // If a safe tool is still executing, we simply skip past it (no break)
459 // and continue checking later tools. Safe tools don't modify context,
460 // so there's no ordering dependency.
461 }
462}

The traversal logic in this code is quite elegant. Let's illustrate with an example:

ToolTypeConcurrencyStatusNote
Tool 1Readsafeyielded
Tool 2Readsafecompletedresults pending emission
Tool 3Readsafeexecuting
Tool 4Bashunsafequeued

Traversal process:

  1. Tool 1: yielded, skip (but emit any pending progress first)
  2. Tool 2: completed, emit results, mark as yielded
  3. Tool 3: executing, concurrency-safe, don't break, continue traversal (emit pending progress)
  4. Tool 4: queued, doesn't match any condition, natural end

What if Tool 3 were non-concurrency-safe?

ToolTypeConcurrencyStatusNote
Tool 1Readsafeyielded
Tool 2Readsafecompleted
Tool 3Bashunsafeexecutingstill running
Tool 4Readsafecompleted

Traversal process:

  1. Tool 1: yielded, skip
  2. Tool 2: completed, emit results
  3. Tool 3: executing and !isConcurrencySafe, break!
  4. Tool 4's results will NOT be emitted, even though it's already completed

Why? Because the non-concurrency-safe tool's results may have changed the context (via contextModifiers), and Tool 4's results might depend on this modified context. So we must wait for Tool 3 to complete and the context to update before emitting Tool 4's results.

getRemainingResults Wait Mechanism

getRemainingResults() is an AsyncGenerator (lines 453-490) that continuously waits until all tools have finished:

src/services/tools/StreamingToolExecutor.ts:453-490
TypeScript
453// The main consumer-facing async generator. Callers use
454// `for await (const result of executor.getRemainingResults()) { ... }`
455// to receive all results and progress as they become available.
456// This method drives the entire execution loop to completion.
457async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
458 // Early exit if executor was discarded (streaming fallback).
459 if (this.discarded) {
460 return
461 }
462
463 // Main loop: keep running as long as any tools are queued or executing.
464 while (this.hasUnfinishedTools()) {
465 // Trigger scheduling — may start new tools if queue conditions allow.
466 await this.processQueue()
467
468 // Emit any results and progress that are ready right now.
469 for (const result of this.getCompletedResults()) {
470 yield result
471 }
472
473 // If tools are still executing but there's nothing new to emit,
474 // we need to WAIT efficiently rather than busy-polling.
475 if (
476 this.hasExecutingTools() &&
477 !this.hasCompletedResults() &&
478 !this.hasPendingProgress()
479 ) {
480 // Collect all executing tools' Promises — when any one resolves,
481 // it means a tool has finished and we should check for new results.
482 const executingPromises = this.tools
483 .filter(t => t.status === 'executing' && t.promise)
484 .map(t => t.promise!)
485
486 // Create a Promise that resolves when ANY tool produces new progress.
487 // This is the "semaphore" that executeTool() resolves when it pushes
488 // a progress message into pendingProgress.
489 const progressPromise = new Promise<void>(resolve => {
490 this.progressAvailableResolve = resolve
491 })
492
493 if (executingPromises.length > 0) {
494 // Promise.race: wake up when EITHER a tool completes OR new
495 // progress arrives — whichever comes first. This is event-driven,
496 // not polling, so there's zero CPU waste while waiting.
497 await Promise.race([...executingPromises, progressPromise])
498 }
499 }
500 }
501
502 // Final drain: emit any remaining results after all tools have finished.
503 // This catches results from the last tool to complete.
504 for (const result of this.getCompletedResults()) {
505 yield result
506 }
507}

Promise.race is the key — it simultaneously waits for two types of events:

  1. Any executing tool to complete
  2. Any tool to produce new progress messages

Whichever happens first wakes up the loop, allowing it to emit new results or progress. This implements an event-driven reactive loop — not polling, but passively waiting for notifications.


interruptBehavior: Strategy Selection on User Interruption

When a user presses ESC or submits a new message during tool execution, different tools should react differently. Some tools should stop immediately (like a long-running search), while others should continue running to completion (like a file write in progress — stopping midway could corrupt the file).

cancel vs block

The interruptBehavior method is defined at lines 408-416 of src/Tool.ts:

src/Tool.ts:408-416
TypeScript
408// Declares how this tool should behave when the user interrupts execution
409// (e.g., pressing ESC or submitting a new message). This is optional —
410// tools that don't implement it inherit the safe default of 'block'.
411/**
412 * What should happen when the user submits a new message while this tool
413 * is running.
414 *
415 * - 'cancel' — stop the tool and discard its result
416 * - 'block' — keep running; the new message waits
417 *
418 * Defaults to 'block' when not implemented.
419 */
420interruptBehavior?(): 'cancel' | 'block'
  • cancel: The tool can safely stop midway. On user interruption, a synthetic error message is generated and partial results are discarded.
  • block: The tool is performing a non-interruptible operation. The user's new message must wait until this tool completes before being sent.

The default behavior is block, which is again a conservatively safe design.

Implementation in StreamingToolExecutor

The getAbortReason() method (lines 210-230) handles interruptBehavior:

src/services/tools/StreamingToolExecutor.ts:210-230
TypeScript
210// Checks whether a tool should be aborted and returns the reason why.
211// Called on every iteration of the execution loop to detect cancellation
212// as early as possible. Returns null if the tool should keep running.
213// The priority order matters — higher-priority reasons are checked first.
214private getAbortReason(
215 tool: TrackedTool,
216): 'sibling_error' | 'user_interrupted' | 'streaming_fallback' | null {
217 // PRIORITY 1 (highest): Streaming fallback — the entire streaming
218 // connection failed, so all tools must be discarded unconditionally.
219 if (this.discarded) {
220 return 'streaming_fallback'
221 }
222 // PRIORITY 2: A sibling Bash tool errored — cascade-cancel all tools.
223 if (this.hasErrored) {
224 return 'sibling_error'
225 }
226 // PRIORITY 3: The query-level abort signal fired (user action).
227 if (this.toolUseContext.abortController.signal.aborted) {
228 // Special case: if the reason is 'interrupt' (user submitted a new
229 // message), we check the tool's interruptBehavior. Tools that declare
230 // 'block' return null here — they continue running and the user's
231 // new message waits. Only 'cancel' tools get interrupted.
232 if (this.toolUseContext.abortController.signal.reason === 'interrupt') {
233 return this.getToolInterruptBehavior(tool) === 'cancel'
234 ? 'user_interrupted'
235 : null // Tool declares 'block' — let it finish
236 }
237 // For any other abort reason (e.g., ESC pressed), all tools are cancelled.
238 return 'user_interrupted'
239 }
240 // No abort condition detected — tool should continue executing.
241 return null
242}

Note the priority hierarchy here:

  1. First check discarded (streaming fallback) — highest priority
  2. Then check hasErrored (sibling error) — second highest
  3. Finally check the abort signal:
    • If the reason is 'interrupt' (user submitted a new message), only cancel tools will be cancelled
    • If the reason is something else (user pressed ESC), all tools will be cancelled

Interruptible State Updates

The updateInterruptibleState() method (lines 254-260) maintains a global state that tells the UI whether all tools can currently be interrupted:

src/services/tools/StreamingToolExecutor.ts:254-260
TypeScript
254// Updates the UI-facing flag that indicates whether the current execution
255// can be interrupted. The UI uses this to decide whether to show an
256// "interrupt" button or a "please wait" indicator.
257private updateInterruptibleState(): void {
258 const executing = this.tools.filter(t => t.status === 'executing')
259 // The entire turn is interruptible ONLY if:
260 // 1. At least one tool is executing (otherwise there's nothing to interrupt)
261 // 2. ALL executing tools support cancellation ('cancel' behavior)
262 // Even one 'block' tool makes the whole turn non-interruptible — we can't
263 // partially interrupt, so we wait for the blocking tool to finish.
264 this.toolUseContext.setHasInterruptibleToolInProgress?.(
265 executing.length > 0 &&
266 executing.every(t => this.getToolInterruptBehavior(t) === 'cancel'),
267 )
268}

Only when all executing tools are of the cancel type does the UI show an "interruptible" indicator. If any block tool is running, the entire turn is considered non-interruptible.


Discardable Mode: Tool Discard During Streaming Fallback

Claude Code uses streaming to receive model responses, but streaming can fail (network errors, server issues, etc.). When a streaming fallback occurs, the executor needs to discard results from tools that have already started but haven't completed.

The discard() method (lines 69-71) is very simple:

src/services/tools/StreamingToolExecutor.ts:64-71
TypeScript
64/**
65 * Discards all pending and in-progress tools. Called when streaming fallback
66 * occurs and results from the failed attempt should be abandoned.
67 * Queued tools won't start, and in-progress tools will receive synthetic errors.
68 */
69// Remarkably simple: just sets a boolean flag. The actual cleanup happens
70// lazily — each tool checks this flag via getAbortReason() on its next
71// loop iteration. This avoids the complexity of forcefully terminating
72// running tools and instead lets them self-terminate gracefully.
73discard(): void {
74 this.discarded = true
75}

It only sets a flag. This flag propagates to all tools through getAbortReason():

  • Queued tools: processQueue() -> executeTool() -> detects abort reason -> immediately generates synthetic error
  • Executing tools: detects abort reason in the next iteration loop -> generates synthetic error and breaks
  • Completed tools: getCompletedResults() checks this.discarded and returns immediately

getRemainingResults() also checks this.discarded (lines 454-456):

src/services/tools/StreamingToolExecutor.ts:453-456
TypeScript
453// Early exit guard: if the executor has been discarded, produce no output.
454// The async generator immediately returns (closes), so the caller's
455// for-await-of loop ends without yielding any stale results.
456async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
457 if (this.discarded) {
458 return
459 }
460 // ...
461}

This guarantees that after a streaming fallback, no residual results leak into subsequent processing.


Complete Execution Flow

Let's tie all the components together with an end-to-end example. Suppose the model returns the following tool calls:

Complete Execution Flow: Scheduling 5 Tools0T1T2T3T4T5TConcurrency-safe (Phase 1)Read("src/main.ts")Grep("TODO")Read("src/utils.ts")Not concurrency-safe (Phase 4-5)Bash("npm test")Edit("src/main.ts")

Phase 1-3: Concurrent Reads + Queuing

Three concurrency-safe tools Read and Grep pass through addTool()processQueue()canExecuteTool() and begin executing simultaneously. The subsequently arriving Bash("npm test") (unsafe) and Edit("src/main.ts") (unsafe) enter the queue — Bash can't acquire exclusive access while safe tools are executing, and Edit is blocked behind the queued Bash due to break.

Phase 4: Reads Complete, Bash Starts

Once all reads complete, processQueue() triggers. The execution queue is now empty, so Bash can acquire exclusive execution access.

Phase 5: Ordered Result Emission

getRemainingResults() emits results strictly in tool addition order: Read → Grep → Read → wait for Bash → Bash result → wait for Edit → Edit result.

Exception Path: Bash Fails

If npm test returns is_error: true:

Exception Path: Cascading Cancellation After Bash Failure0T1T2T3T4T5TCompletedRead("src/main.ts")Grep("TODO")Read("src/utils.ts")FailedBash("npm test")CancelledEdit("src/main.ts")

hasErrored = truesiblingAbortController.abort('sibling_error') → Edit detects abort at executeTool() entry → generates synthetic error message "Cancelled: parallel tool call Bash(npm test) errored". The model receives two error messages — one with Bash's real error, one with Edit's cancellation notice — and decides its next steps accordingly.


Comparison with toolOrchestration

There's another tool orchestration implementation in src/services/tools/toolOrchestration.ts called runTools(). How does it differ from StreamingToolExecutor?

runTools() uses a partition-batch model (lines 19-80):

src/services/tools/toolOrchestration.ts:19-30
TypeScript
19// A simpler, non-streaming alternative to StreamingToolExecutor.
20// This function requires ALL tool calls upfront (no incremental addition)
21// and uses a batch-partition model: group tools by concurrency safety,
22// then execute each group sequentially (with safe groups running in parallel).
23export async function* runTools(
24 toolUseMessages: ToolUseBlock[], // All tool calls, known in advance
25 assistantMessages: AssistantMessage[],
26 canUseTool: CanUseToolFn,
27 toolUseContext: ToolUseContext,
28): AsyncGenerator<MessageUpdate, void> {
29 // Mutable context reference — updated by non-concurrent tools' modifiers
30 let currentContext = toolUseContext
31 // partitionToolCalls groups consecutive tools: safe tools are batched
32 // together for parallel execution, unsafe tools form single-item groups.
33 // Each partition is then executed as a unit.
34 for (const { isConcurrencySafe, blocks } of partitionToolCalls(
35 toolUseMessages,
36 currentContext,
37 )) {

It first partitions all tool calls by concurrency safety, then executes them batch by batch. This is a simpler model — but it requires all tool calls to be known before execution begins.

StreamingToolExecutor's advantage is its support for incremental addition — tool calls are added one by one as the streaming response arrives, without waiting for all tool calls to be parsed. This is critical in streaming scenarios, because the model may still be generating the 5th tool call while the first 3 can already start executing.

FeaturerunTools()StreamingToolExecutor
Tool addition timingAll at onceIncremental
Scheduling strategyPartition-batchReal-time queue scheduling
Progress messagesNo special handlingSeparate storage, immediate emission
Error cascadingNoneSibling AbortController
Discard modeNoneSupported
Interrupt behaviorNonecancel/block strategy

Memory Safety of createChildAbortController

StreamingToolExecutor makes extensive use of createChildAbortController() (defined in src/utils/abortController.ts). This utility method deserves a closer look because it solves an easily overlooked memory leak problem.

The standard parent-child AbortController relationship is typically implemented like this:

TypeScript
1// Naive implementation — demonstrates the memory leak problem.
2// The closure captures `child`, creating a strong reference from parent to child.
3// As long as `parent` is alive, `child` can never be garbage collected,
4// even if nothing else references it.
5parent.signal.addEventListener('abort', () => {
6 child.abort(parent.signal.reason)
7})

The problem is: parent holds a strong reference to child through the closure. Even if child is discarded at the application level, as long as parent is alive, child can't be garbage collected. In StreamingToolExecutor, each tool creates a toolAbortController (child), while siblingAbortController (parent) lives throughout the entire tool execution phase. If the model returns 20 tool calls, there are 20 children strongly held by the parent.

createChildAbortController() solves this with WeakRef (lines 68-99):

src/utils/abortController.ts:68-99
TypeScript
68// Creates a child AbortController that is automatically aborted when the
69// parent is aborted, but uses WeakRef to avoid memory leaks. This is the
70// foundation of the three-layer abort architecture used by StreamingToolExecutor.
71export function createChildAbortController(
72 parent: AbortController,
73 maxListeners?: number,
74): AbortController {
75 // Create a fresh controller, optionally with a higher max listener count
76 // (Node.js warns at 10 listeners by default; high-concurrency tools may exceed this).
77 const child = createAbortController(maxListeners)
78
79 // Edge case: if the parent is ALREADY aborted at creation time,
80 // immediately abort the child and return — no need to set up listeners.
81 if (parent.signal.aborted) {
82 child.abort(parent.signal.reason)
83 return child
84 }
85
86 // KEY INSIGHT: Use WeakRef for BOTH directions to prevent memory leaks.
87 // Without WeakRef, the parent's event listener closure would hold a strong
88 // reference to child, preventing GC even after the child is no longer needed.
89 const weakChild = new WeakRef(child)
90 const weakParent = new WeakRef(parent)
91 // propagateAbort is a standalone function (not a closure) that dereferences
92 // the WeakRefs at call time. If the child has been GC'd, it's a no-op.
93 // Using .bind() avoids creating a closure that captures strong references.
94 const handler = propagateAbort.bind(weakParent, weakChild)
95
96 // When parent aborts, propagate to child. { once: true } auto-removes
97 // the listener after firing, preventing duplicate abort propagation.
98 parent.signal.addEventListener('abort', handler, { once: true })
99
100 // Auto-cleanup: remove parent listener when child is aborted
101 // If the child is aborted independently (not via parent), we clean up
102 // the listener we added to parent. This prevents listener accumulation
103 // when many short-lived children are created for a long-lived parent.
104 child.signal.addEventListener(
105 'abort',
106 removeAbortHandler.bind(weakParent, new WeakRef(handler)),
107 { once: true },
108 )
109
110 return child
111}

Key design decisions:

  1. WeakRef holds child: The parent's event listener references child through WeakRef, not preventing GC
  2. WeakRef holds parent: The child's cleanup logic also references parent through WeakRef, avoiding reverse strong references
  3. Auto-cleanup: When child is aborted, it automatically removes its listener from parent, preventing listener accumulation
  4. {once: true}: Ensures the event handler is called only once

These measures ensure no memory leaks occur in high-concurrency tool execution scenarios.


Transferable Patterns: Implementing Similar Architecture in Your Projects

StreamingToolExecutor's concurrency model isn't unique to Claude Code — it's fundamentally a declarative concurrency scheduler. If you need to implement similar tool orchestration in your own projects, here are the core patterns you can adopt:

Pattern 1: Self-Declared Concurrency Safety

Let each operation declare for itself whether it can run in parallel, rather than hard-coding rules in the scheduler:

TypeScript
1// Pattern: each operation encapsulates its own concurrency knowledge.
2// The scheduler never needs to know implementation details — it just
3// asks "can you run in parallel?" and respects the answer.
4interface Operation {
5 // The operation decides for itself whether it can run in parallel.
6 // Accepts input so the decision can be context-dependent
7 // (e.g., read-only commands vs. write commands).
8 isConcurrencySafe(input: unknown): boolean
9 // AbortSignal allows the scheduler to cancel the operation externally
10 // when errors cascade or the user interrupts.
11 execute(input: unknown, signal: AbortSignal): Promise<Result>
12}

Benefit: the scheduler doesn't need to understand the details of each operation, and adding new operations doesn't require modifying the scheduler code.

Pattern 2: Read-Write Lock Scheduling

TypeScript
1// Implements the read-write lock pattern: safe operations are "readers"
2// that can coexist, unsafe operations are "writers" that need exclusive access.
3// This single function encapsulates the entire scheduling policy.
4function canExecute(
5 newOp: Operation,
6 executingOps: Operation[]
7): boolean {
8 // No operations executing: always allowed (idle state)
9 if (executingOps.length === 0) return true
10 // New operation and all executing operations are concurrency-safe: allowed
11 // (multiple readers can coexist without conflict)
12 if (newOp.isSafe && executingOps.every(op => op.isSafe)) return true
13 // Otherwise: wait (a writer is running, or we are a writer)
14 return false
15}

Pattern 3: Layered AbortController

TypeScript
1// Demonstrates the three-layer AbortController pattern:
2// parent (query-level) -> group (sibling-level) -> operation (tool-level).
3// Aborting a layer cancels all its descendants but NOT its ancestors.
4class OperationGroup {
5 private groupController: AbortController
6 private operations: Map<string, AbortController> = new Map()
7
8 constructor(parentController: AbortController) {
9 // group controller is a child of the parent controller
10 // If the parent aborts (e.g., user cancels everything), this group
11 // and all its operations are automatically aborted via the parent-child chain.
12 this.groupController = createChild(parentController)
13 }
14
15 addOperation(id: string): AbortSignal {
16 // each operation's controller is a child of the group
17 // This gives each operation an independent abort handle while still
18 // responding to group-level cancellation.
19 const opController = createChild(this.groupController)
20 this.operations.set(id, opController)
21 // Return the signal (not the controller) — operations can listen
22 // for abort but cannot abort themselves (principle of least privilege).
23 return opController.signal
24 }
25
26 cancelGroup(reason: string): void {
27 // cancel all operations in the group without affecting the parent
28 // This is the key property: error cascading is SCOPED to this group.
29 // The parent (and other groups) continue unaffected.
30 this.groupController.abort(reason)
31 }
32}

Pattern 4: Separating Progress from Results

TypeScript
1// The dual-channel output pattern: progress is real-time (out of order),
2// results are sequential (in insertion order). This gives users immediate
3// feedback while maintaining deterministic final output.
4interface TrackedOperation {
5 status: 'queued' | 'executing' | 'completed' | 'yielded'
6 // Progress messages stored separately, can be emitted out of order
7 // (e.g., streaming stdout from a long-running command)
8 pendingProgress: ProgressEvent[]
9 // Final results emitted in order — buffered until all prior
10 // operations have been yielded
11 results?: Result[]
12}
13
14// Generator that walks operations in order, emitting what's ready
15// and stopping at ordering barriers.
16function* yieldInOrder(operations: TrackedOperation[]) {
17 for (const op of operations) {
18 // Progress is always emitted immediately — splice(0) drains and
19 // clears the array in one atomic operation
20 yield* op.pendingProgress.splice(0)
21
22 if (op.status === 'completed') {
23 // This operation is done — emit its results and mark as yielded
24 yield* op.results!
25 op.status = 'yielded'
26 } else if (op.status === 'executing' && !op.isSafe) {
27 // Non-safe operations block subsequent result emission.
28 // We can't emit later results because this unsafe operation
29 // might modify shared context that later results depend on.
30 break
31 }
32 // Safe operations that are still executing are simply skipped —
33 // they don't block emission of later completed operations.
34 }
35}

Pattern 5: Conservative Defaults

TypeScript
1// Conservative defaults: in safety-critical systems, the safest behavior
2// should require zero configuration. Developers must explicitly OPT IN
3// to more permissive behavior (parallelism, interruptibility).
4const DEFAULTS = {
5 isConcurrencySafe: () => false, // Default to unsafe — runs exclusively
6 interruptBehavior: () => 'block', // Default to non-interruptible — finishes before new input
7}

In safety-related scenarios, always make the default behavior the most conservative. Tool developers must proactively declare safety, rather than safety being assumed by default.

Complete Mini Implementation

Combining the patterns above, a minimal viable concurrency scheduler is roughly 200 lines of code:

TypeScript
1// A minimal but complete implementation combining all patterns above.
2// This ~60-line scheduler demonstrates the core concepts of
3// StreamingToolExecutor without the streaming, progress, or interrupt
4// complexity. Suitable as a starting point for your own projects.
5
6// Four-state lifecycle — same as StreamingToolExecutor's TrackedTool
7type OperationStatus = 'queued' | 'executing' | 'completed' | 'yielded'
8
9interface TrackedOp<T> {
10 id: string
11 isSafe: boolean // Concurrency safety declaration
12 status: OperationStatus // Current lifecycle state
13 execute: (signal: AbortSignal) => Promise<T> // The actual work to perform
14 result?: T // Buffered result (set on completion)
15 error?: Error // Captured error (set on failure)
16 promise?: Promise<void> // Execution handle for awaiting
17}
18
19// Generic scheduler parameterized by result type T.
20// Implements the read-write lock pattern with ordered result emission.
21class ConcurrentScheduler<T> {
22 private ops: TrackedOp<T>[] = []
23 // Group-level AbortController — aborting this cancels all operations,
24 // analogous to StreamingToolExecutor's siblingAbortController.
25 private groupAbort = new AbortController()
26
27 // Add an operation and immediately attempt to schedule it.
28 // Operations can be added incrementally (one at a time) — the scheduler
29 // handles ordering and concurrency automatically.
30 add(op: TrackedOp<T>): void {
31 this.ops.push({ ...op, status: 'queued' })
32 this.processQueue()
33 }
34
35 // Read-write lock check: safe ops can coexist, unsafe ops need exclusivity.
36 private canExecute(isSafe: boolean): boolean {
37 const executing = this.ops.filter(o => o.status === 'executing')
38 return executing.length === 0 ||
39 (isSafe && executing.every(o => o.isSafe))
40 }
41
42 // Scan the queue and start eligible operations. The break on unsafe ops
43 // preserves ordering — no operation after an unsafe one can leapfrog it.
44 private processQueue(): void {
45 for (const op of this.ops) {
46 if (op.status !== 'queued') continue
47 if (this.canExecute(op.isSafe)) {
48 this.executeOp(op)
49 } else if (!op.isSafe) {
50 break // Maintain order for non-safe operations
51 }
52 }
53 }
54
55 // Execute a single operation with error cascading for unsafe operations.
56 private async executeOp(op: TrackedOp<T>): Promise<void> {
57 op.status = 'executing'
58 try {
59 // Pass the group abort signal so operations can check for cancellation
60 op.result = await op.execute(this.groupAbort.signal)
61 } catch (e) {
62 op.error = e as Error
63 // Only unsafe operation errors cascade — safe operations fail in isolation
64 if (!op.isSafe) {
65 this.groupAbort.abort('operation_error')
66 }
67 }
68 op.status = 'completed'
69 // Self-driving loop: completion triggers another scheduling pass,
70 // which may unblock queued operations waiting for this one.
71 this.processQueue()
72 }
73
74 // Emit results in insertion order, with the same barrier logic as
75 // StreamingToolExecutor's getCompletedResults().
76 *getResults(): Generator<{ id: string; result?: T; error?: Error }> {
77 for (const op of this.ops) {
78 if (op.status === 'yielded') continue
79 if (op.status === 'completed') {
80 op.status = 'yielded'
81 yield { id: op.id, result: op.result, error: op.error }
82 } else if (op.status === 'executing' && !op.isSafe) {
83 // Unsafe operation still running — stop emission to preserve
84 // ordering. Results after this point may depend on its outcome.
85 break
86 }
87 }
88 }
89}

Design Trade-offs

Looking back at the entire StreamingToolExecutor design, there are several trade-offs worth discussing:

Why Do Only Bash Errors Cascade?

The code comment says it clearly (lines 357-359):

Bash commands often have implicit dependency chains (e.g. mkdir fails -> subsequent commands pointless). Read/WebFetch/etc are independent — one failure shouldn't nuke the rest.

This is a pragmatic choice. In theory, each tool could declare "whether my errors should cascade," but in practice, only the Bash tool has this kind of implicit dependency relationship. Over-engineering would only increase the cognitive burden on tool developers.

Why Not Support contextModifier for Concurrent Tools?

The code comment (lines 389-390) acknowledges this is a feature gap:

NOTE: we currently don't support context modifiers for concurrent tools. None are actively being used, but if we want to use them in concurrent tools, we need to support that here.

Concurrent tools modifying shared context requires solving race conditions — what happens when two tools simultaneously modify the same context field? The current approach simply prohibits it, waiting for actual demand before designing a solution. This is a textbook application of "YAGNI" (You Aren't Gonna Need It).

Why Does interruptBehavior Default to block?

Because cancelling a write operation midway could cause data corruption. block means "let the tool finish," which in the worst case only means waiting a few more seconds. cancel in the worst case could result in a half-written file. Safety > performance.

Why Generators Instead of Callbacks?

getCompletedResults() returns a Generator, and getRemainingResults() returns an AsyncGenerator. This design lets callers naturally consume results using for...of and for await...of, without needing to register callbacks. The lazy evaluation property of Generators also means unneeded results won't be computed.


Summary

StreamingToolExecutor is an elegant concurrency orchestration component in Claude Code that solves the seemingly simple but actually complex problem of "letting AI operate multiple tools simultaneously." Its core design principles include:

  1. Self-declared concurrency safety: Tools know whether they can run in parallel; the scheduler merely executes their declarations
  2. Read-write lock scheduling: Concurrency-safe tools share access, non-concurrency-safe tools get exclusive access
  3. Layered cancellation: Three-layer AbortController architecture for precise error cascading
  4. Ordered emission: Progress is immediately visible, results are output in order
  5. Conservative defaults: Without a declaration, assume unsafe and non-interruptible

These principles apply not only to AI tool orchestration but to any system requiring mixed concurrency strategies — database operation scheduling, microservice orchestration, CI/CD pipeline management, and more. The 530 lines of StreamingToolExecutor distill the core wisdom of production-grade concurrency orchestration.

In the next article, we'll dive into the permission system — exploring how Claude Code ensures every tool call undergoes a security review through its six-layer evaluation chain.