The Streaming Tool Executor: How to Safely Let AI Operate Multiple Tools Simultaneously

Deep dive into StreamingToolExecutor's concurrency model — isConcurrencySafe declarations, queue scheduling, Sibling Abort cascading cancellation, progress buffering and ordered emission

The Problem

Imagine this scenario: you ask Claude Code to refactor a module. The model returns 5 tool_use calls in a single response — 3 file reads, 1 Bash command execution, and 1 file write. Now the questions arise:

  1. Should these 5 tools run serially or in parallel?
  2. If the Bash command fails, should the file reads running in parallel be cancelled?
  3. The file write depends on the Bash result — should it wait for Bash to complete before executing?
  4. The user presses ESC during tool execution — which tools should stop, and which should continue?
  5. Multiple tools produce progress messages simultaneously — how should the UI display them in order?

These questions seem simple, but each one involves core challenges of concurrency control. Serial execution is too slow — users don't want to wait for 3 independent file reads to complete one after another. Full parallelism is too dangerous — a write operation and a read operation accessing the same file simultaneously could cause a data race.

Claude Code's solution is StreamingToolExecutor — a carefully designed concurrency orchestrator that lets each tool declare whether it can run in parallel, then dynamically schedules execution based on those declarations. This article will dissect every design decision in detail.


Why a Streaming Tool Executor?

In the previous article, we covered the overall architecture of the tool system. But one key question was intentionally deferred to this article: when the model returns multiple tool calls in a single streaming response, how does the executor manage their lifecycles?

Traditional approaches fall into two extremes:

Approach A: Fully Serial

Text
1Tool 1 ──────────► Tool 2 ──────────► Tool 3 ──────────► Tool 4 ──────────►
2 Total time: 4T

Safe but extremely slow. Each tool waits for the previous one to finish before starting. For 3 independent file reads, this means 3x the wait time.

Approach B: Fully Parallel

Text
1Tool 1 ──────────►
2Tool 2 ──────────►
3Tool 3 ──────────►
4Tool 4 ──────────►
5 Total time: T

Fast but dangerous. If Tool 1 is rm -rf build/ and Tool 2 is cat build/output.js, the result of parallel execution is unpredictable.

Approach C: Claude Code's Hybrid Scheduling

Text
1Tool 1 (Read) ──────────►
2Tool 2 (Read) ──────────►
3Tool 3 (Read) ──────────►
4 Tool 4 (Bash) ──────────►
5 Total time: 2T

Reads run in parallel, writes get exclusive access. Safe and efficient.

This is the core problem StreamingToolExecutor solves.


Architecture Overview

StreamingToolExecutor lives in src/services/tools/StreamingToolExecutor.ts and is a class of roughly 530 lines. Its responsibilities are:

  1. Receive tool calls — accept tool_use blocks one by one as the streaming response arrives
  2. Determine scheduling strategy — based on each tool's concurrency safety declaration, decide whether to execute immediately or queue
  3. Manage lifecycles — track each tool from queuing to completion
  4. Handle error cascading — one tool's failure may require cancelling its sibling tools
  5. Emit results in order — progress messages are sent immediately, final results are emitted in sequence

Here is the overall architecture diagram:

...

TrackedTool: The Complete Lifecycle of a Tool

Every tool call that enters the executor is wrapped in a TrackedTool object. This structure is defined at lines 21-32 of StreamingToolExecutor.ts:

src/services/tools/StreamingToolExecutor.ts:19-32
TypeScript
19type ToolStatus = 'queued' | 'executing' | 'completed' | 'yielded'
20
21type TrackedTool = {
22 id: string
23 block: ToolUseBlock
24 assistantMessage: AssistantMessage
25 status: ToolStatus
26 isConcurrencySafe: boolean
27 promise?: Promise<void>
28 results?: Message[]
29 // Progress messages are stored separately and yielded immediately
30 pendingProgress: Message[]
31 contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>
32}

Four Lifecycle States

ToolStatus is a four-value enum, and each tool flows strictly through queued -> executing -> completed -> yielded:

...

queued (waiting): The tool was just added by addTool() and hasn't started executing yet. There may be other non-concurrency-safe tools currently running exclusively, so it must wait.

executing (running): The tool has started execution. Its promise field holds the execution Promise, and progress messages are collected in real time via the pendingProgress array.

completed (finished): Tool execution has ended (success, failure, or cancellation), and results are stored in the results field but haven't been emitted to the caller yet. This is the key to ordered emission — even if Tool 3 finishes first, it waits for Tool 1 and Tool 2's results to be emitted first.

yielded (emitted): Results have been emitted to the caller via getCompletedResults(), and this tool's lifecycle is completely over.

Key Field Analysis

pendingProgress is a field worth special attention. Progress messages (like real-time output from a Bash command) need to be shown to the user immediately and can't wait until the tool completes. So progress messages and final results are stored separately — progress messages can be emitted at any time, while final results must be emitted in order.

contextModifiers stores the tool's modifications to the execution context. For example, a tool might need to update file history state. But note an important restriction in the code (lines 391-395):

src/services/tools/StreamingToolExecutor.ts:389-395
TypeScript
389// NOTE: we currently don't support context modifiers for concurrent
390// tools. None are actively being used, but if we want to use
391// them in concurrent tools, we need to support that here.
392if (!tool.isConcurrencySafe && contextModifiers.length > 0) {
393 for (const modifier of contextModifiers) {
394 this.toolUseContext = modifier(this.toolUseContext)
395 }
396}

Only non-concurrency-safe tools can modify the context. This is a deliberate design constraint — concurrent tools modifying shared context would introduce race conditions, so it's simply prohibited.


isConcurrencySafe: Tools Decide for Themselves Whether They Can Run in Parallel

The most fundamental design principle of StreamingToolExecutor is that tools declare their own concurrency safety. Not guessed by the scheduler, not defined in a global configuration table, but implemented by each tool in its isConcurrencySafe() method.

This method is defined at line 402 of src/Tool.ts:

src/Tool.ts:402
TypeScript
402isConcurrencySafe(input: z.infer<Input>): boolean

Note that it accepts an input parameter — this means the same tool may have different concurrency safety depending on the input.

Concurrency Safety Declarations Across Tools

Let's look at how various tools actually declare themselves in the code:

FileReadTool (file reading) — always concurrency-safe:

src/tools/FileReadTool/FileReadTool.ts:373-375
TypeScript
373isConcurrencySafe() {
374 return true
375},

File reading is a purely read-only operation; multiple reads running simultaneously produce no side effects.

GrepTool (search) — always concurrency-safe:

src/tools/GrepTool/GrepTool.ts:183-185
TypeScript
183isConcurrencySafe() {
184 return true
185},

Search operations are likewise read-only, naturally supporting parallelism.

AgentTool (sub-agent) — always concurrency-safe:

src/tools/AgentTool/AgentTool.tsx:1273-1275
TypeScript
1273isConcurrencySafe() {
1274 return true;
1275},

The sub-agent tool declares itself as concurrency-safe because each sub-agent runs in its own isolated context.

BashTool (command execution) — depends on input:

src/tools/BashTool/BashTool.tsx:434-436
TypeScript
434isConcurrencySafe(input) {
435 return this.isReadOnly?.(input) ?? false;
436},

This is the most interesting case. The Bash tool's concurrency safety depends on whether the command itself is read-only. ls, cat, grep are read-only and can run in parallel; rm, mv, git commit have side effects and must run exclusively.

Default behavior — assume unsafe (line 759):

src/Tool.ts:757-759
TypeScript
757const TOOL_DEFAULTS = {
758 // ...
759 isConcurrencySafe: (_input?: unknown) => false,
760 // ...
761}

Tools built through buildTool() that don't explicitly declare isConcurrencySafe default to returning false. This is a conservatively safe design — better to sacrifice performance than risk concurrency issues.

Safety Calculation in addTool

When a tool is added to the executor, the isConcurrencySafe calculation process is worth careful examination. See lines 104-121 of StreamingToolExecutor.ts:

src/services/tools/StreamingToolExecutor.ts:104-121
TypeScript
104const parsedInput = toolDefinition.inputSchema.safeParse(block.input)
105const isConcurrencySafe = parsedInput?.success
106 ? (() => {
107 try {
108 return Boolean(toolDefinition.isConcurrencySafe(parsedInput.data))
109 } catch {
110 return false
111 }
112 })()
113 : false
114this.tools.push({
115 id: block.id,
116 block,
117 assistantMessage,
118 status: 'queued',
119 isConcurrencySafe,
120 pendingProgress: [],
121})

There are three layers of defense here:

  1. Input validation: First validate input using the Zod schema. If the input format is invalid, it's immediately marked as non-concurrency-safe.
  2. try-catch wrapper: Even if the input is valid, isConcurrencySafe() itself might throw an exception (e.g., a bug in the tool definition). Any exception falls back to false.
  3. Boolean coercion: The result is wrapped in Boolean() to prevent tools from accidentally returning truthy values (like non-empty strings).

This "defense in depth" pattern is ubiquitous in Claude Code — on code paths related to concurrency and safety, always assume the worst case.


canExecuteTool: The Core Scheduling Decision

Given each tool's concurrency safety declaration, how does the scheduler decide whether a tool can execute immediately? The logic is remarkably concise, just 6 lines of code (lines 129-135):

src/services/tools/StreamingToolExecutor.ts:129-135
TypeScript
129private canExecuteTool(isConcurrencySafe: boolean): boolean {
130 const executingTools = this.tools.filter(t => t.status === 'executing')
131 return (
132 executingTools.length === 0 ||
133 (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
134 )
135}

In plain language: a tool can execute if and only if one of the following two conditions holds:

  1. No tools are currently executing (idle state, any tool can start)
  2. The current tool is concurrency-safe, and all currently executing tools are also concurrency-safe

This logic implies an important corollary: as long as any non-concurrency-safe tool is executing, all other tools must wait. Non-concurrency-safe tools get exclusive access.

Let's visualize with a table:

Currently Executing ToolsNew Tool (safe)New Tool (unsafe)
None (idle)Can executeCan execute
All safeCan executeWait
Includes unsafeWaitWait

This is a classic read-write lock pattern: concurrency-safe tools are like read locks (multiple can coexist), non-concurrency-safe tools are like write locks (must be exclusive).


processQueue: The Subtleties of Queue Scheduling

The processQueue() method (lines 140-151) is responsible for traversing the queue and starting executable tools:

src/services/tools/StreamingToolExecutor.ts:140-151
TypeScript
140private async processQueue(): Promise<void> {
141 for (const tool of this.tools) {
142 if (tool.status !== 'queued') continue
143
144 if (this.canExecuteTool(tool.isConcurrencySafe)) {
145 await this.executeTool(tool)
146 } else {
147 // Can't execute this tool yet, and since we need to maintain
148 // order for non-concurrent tools, stop here
149 if (!tool.isConcurrencySafe) break
150 }
151 }
152}

This code has an easily overlooked but critically important detail — the break statement. When it encounters a non-concurrency-safe tool that can't execute, the scheduler stops traversal. Why?

Consider the following tool sequence:

Text
1[Read A] [Read B] [Bash "git add ."] [Read C] [Bash "git commit"]
2 safe safe unsafe safe unsafe

Without the break, the scheduler would skip Bash "git add ." when it can't execute and continue checking Read C. Read C is concurrency-safe and might be started. But this is problematic — Read C would execute before git add ., potentially reading file contents not yet staged.

The break ensures ordering between non-concurrency-safe tools. Once a queued non-concurrency-safe tool is encountered, no subsequent tools (safe or not) will be started.

Conversely: what if the tool that can't execute is a concurrency-safe one? It's simply skipped (continue) and doesn't prevent scheduling of subsequent tools. When would a concurrency-safe tool be unable to execute? When a non-concurrency-safe tool currently has exclusive access. Once the exclusive tool completes, all queued concurrency-safe tools can start together.

When processQueue Is Triggered

processQueue() is called in two places:

  1. In addTool() (line 123): every time a new tool is added, immediately try to schedule it.
  2. When executeTool() completes (lines 402-404): after a tool finishes, trigger a new round of scheduling.
src/services/tools/StreamingToolExecutor.ts:398-404
TypeScript
398const promise = collectResults()
399tool.promise = promise
400
401// Process more queue when done
402void promise.finally(() => {
403 void this.processQueue()
404})

This creates a self-driving loop: tool completes -> try to schedule -> new tool starts -> new tool completes -> schedule again... until the queue is empty.


Sibling AbortController: Cascading Cancellation of Errors

One of the trickiest problems with concurrent execution is error handling. When multiple tools are running in parallel, how should one tool's failure affect the others?

Claude Code's design is: only Bash tool errors cascade-cancel sibling tools. This design stems from a practical observation — Bash commands often have implicit dependency chains (mkdir fails, so the subsequent cd and touch are pointless), while Read, Grep, WebFetch and other tools are independent — one file read failure shouldn't affect another file's read.

Three-Layer AbortController Architecture

Error cascading relies on a carefully designed three-layer AbortController architecture:

...

Layer 1: Query-Level AbortController (toolUseContext.abortController)

This is the lifecycle controller for the entire query turn. When the user presses ESC or submits a new message, this controller is aborted, causing the entire turn to end.

Layer 2: Sibling-Level AbortController (siblingAbortController)

This is created by StreamingToolExecutor during construction as a child controller of the query-level controller (lines 59-61):

src/services/tools/StreamingToolExecutor.ts:59-61
TypeScript
59this.siblingAbortController = createChildAbortController(
60 toolUseContext.abortController,
61)

Key property: aborting the sibling-level controller does not abort the parent controller. This means a Bash error can cancel all sibling tools without terminating the entire query turn — the model will still receive the error information and continue reasoning.

Layer 3: Tool-Level AbortController (toolAbortController)

Each tool creates its own controller during execution as a child of the sibling-level controller (lines 301-302):

src/services/tools/StreamingToolExecutor.ts:301-302
TypeScript
301const toolAbortController = createChildAbortController(
302 this.siblingAbortController,
303)

Bash Error Cascade Path

When a Bash tool execution fails, the complete cascade path is as follows (lines 354-363):

src/services/tools/StreamingToolExecutor.ts:354-363
TypeScript
354if (isErrorResult) {
355 thisToolErrored = true
356 // Only Bash errors cancel siblings. Bash commands often have implicit
357 // dependency chains (e.g. mkdir fails -> subsequent commands pointless).
358 // Read/WebFetch/etc are independent — one failure shouldn't nuke the rest.
359 if (tool.block.name === BASH_TOOL_NAME) {
360 this.hasErrored = true
361 this.erroredToolDescription = this.getToolDescription(tool)
362 this.siblingAbortController.abort('sibling_error')
363 }
364}

Execution flow:

  1. The Bash tool's execution result contains a tool_result with is_error: true
  2. The hasErrored flag is set to true
  3. erroredToolDescription records the description of the errored tool (e.g., Bash(mkdir /tmp/test...))
  4. siblingAbortController.abort('sibling_error') is called
  5. This abort signal propagates through createChildAbortController's parent-child relationship to all other tools' toolAbortController
  6. Executing tools that receive the abort signal generate synthetic error messages (lines 189-204)

Tool-Level Abort Upward Propagation

The tool-level AbortController has a subtle event listener (lines 304-317) that handles a special case — when permission dialog denial occurs:

src/services/tools/StreamingToolExecutor.ts:304-317
TypeScript
304toolAbortController.signal.addEventListener(
305 'abort',
306 () => {
307 if (
308 toolAbortController.signal.reason !== 'sibling_error' &&
309 !this.toolUseContext.abortController.signal.aborted &&
310 !this.discarded
311 ) {
312 this.toolUseContext.abortController.abort(
313 toolAbortController.signal.reason,
314 )
315 }
316 },
317 { once: true },
318)

This code means: if the tool is aborted for a reason other than a sibling error (such as permission denial), then this abort needs to bubble up to the query-level controller to terminate the entire turn. The code comments mention #21056 regression — this upward bubbling logic was added to fix a specific regression bug.

Synthetic Error Messages

Cancelled tools aren't simply discarded — they receive a synthetic error message so the model knows these tools didn't execute successfully. The createSyntheticErrorMessage method (lines 153-205) generates different error messages based on the cancellation reason:

src/services/tools/StreamingToolExecutor.ts:153-205
TypeScript
153private createSyntheticErrorMessage(
154 toolUseId: string,
155 reason: 'sibling_error' | 'user_interrupted' | 'streaming_fallback',
156 assistantMessage: AssistantMessage,
157): Message {
158 if (reason === 'user_interrupted') {
159 return createUserMessage({
160 content: [{
161 type: 'tool_result',
162 content: withMemoryCorrectionHint(REJECT_MESSAGE),
163 is_error: true,
164 tool_use_id: toolUseId,
165 }],
166 toolUseResult: 'User rejected tool use',
167 // ...
168 })
169 }
170 if (reason === 'streaming_fallback') {
171 return createUserMessage({
172 content: [{
173 type: 'tool_result',
174 content: '<tool_use_error>Error: Streaming fallback - tool execution discarded</tool_use_error>',
175 is_error: true,
176 tool_use_id: toolUseId,
177 }],
178 // ...
179 })
180 }
181 // sibling_error
182 const desc = this.erroredToolDescription
183 const msg = desc
184 ? `Cancelled: parallel tool call ${desc} errored`
185 : 'Cancelled: parallel tool call errored'
186 return createUserMessage({
187 content: [{
188 type: 'tool_result',
189 content: `<tool_use_error>${msg}</tool_use_error>`,
190 is_error: true,
191 tool_use_id: toolUseId,
192 }],
193 // ...
194 })
195}

Three cancellation reasons produce three different messages:

ReasonMessage ContentPurpose
sibling_errorCancelled: parallel tool call Bash(mkdir...) erroredModel knows which sibling tool failed
user_interruptedUser rejected tool use + memory correction hintModel knows the user actively cancelled
streaming_fallbackStreaming fallback - tool execution discardedSilent cancellation during streaming fallback

Preventing Duplicate Error Messages

There's an elegant deduplication logic in the code — the thisToolErrored flag (lines 330-345):

src/services/tools/StreamingToolExecutor.ts:328-345
TypeScript
328// Track if this specific tool has produced an error result.
329// This prevents the tool from receiving a duplicate "sibling error"
330// message when it is the one that caused the error.
331let thisToolErrored = false
332
333for await (const update of generator) {
334 const abortReason = this.getAbortReason(tool)
335 if (abortReason && !thisToolErrored) {
336 messages.push(
337 this.createSyntheticErrorMessage(
338 tool.id,
339 abortReason,
340 tool.assistantMessage,
341 ),
342 )
343 break
344 }
345 // ...
346 if (isErrorResult) {
347 thisToolErrored = true
348 // ...
349 }
350}

If Tool A is a Bash tool that errors, it triggers siblingAbortController.abort(). At this point, getAbortReason() would also return sibling_error for Tool A itself. But because thisToolErrored has already been set to true, Tool A won't receive an additional synthetic error message — it already has its own real error result.


Progress Buffering and Ordered Emission

Concurrent execution introduces an output ordering problem. Suppose Tool 1 and Tool 2 are running in parallel, and Tool 2 finishes first — should its results be emitted before Tool 1's?

Claude Code's answer is to treat two types of output differently:

  1. Progress messages: emitted immediately, no ordering required
  2. Final results: must be emitted in tool addition order

Immediate Emission of Progress Messages

In the execution loop of the executeTool() method (lines 366-374), progress messages are stored in the pendingProgress array:

src/services/tools/StreamingToolExecutor.ts:366-374
TypeScript
366if (update.message) {
367 // Progress messages go to pendingProgress for immediate yielding
368 if (update.message.type === 'progress') {
369 tool.pendingProgress.push(update.message)
370 // Signal that progress is available
371 if (this.progressAvailableResolve) {
372 this.progressAvailableResolve()
373 this.progressAvailableResolve = undefined
374 }
375 } else {
376 messages.push(update.message)
377 }
378}

Note the progressAvailableResolve semaphore — when new progress messages arrive, it wakes up the waiting getRemainingResults().

Ordered Emission of Results

The getCompletedResults() method (lines 412-440) implements ordered emission logic:

src/services/tools/StreamingToolExecutor.ts:412-440
TypeScript
412*getCompletedResults(): Generator<MessageUpdate, void> {
413 if (this.discarded) {
414 return
415 }
416
417 for (const tool of this.tools) {
418 // Always yield pending progress messages immediately,
419 // regardless of tool status
420 while (tool.pendingProgress.length > 0) {
421 const progressMessage = tool.pendingProgress.shift()!
422 yield { message: progressMessage, newContext: this.toolUseContext }
423 }
424
425 if (tool.status === 'yielded') {
426 continue
427 }
428
429 if (tool.status === 'completed' && tool.results) {
430 tool.status = 'yielded'
431
432 for (const message of tool.results) {
433 yield { message, newContext: this.toolUseContext }
434 }
435
436 markToolUseAsComplete(this.toolUseContext, tool.id)
437 } else if (tool.status === 'executing' && !tool.isConcurrencySafe) {
438 break
439 }
440 }
441}

The traversal logic in this code is quite elegant. Let's illustrate with an example:

Text
1tools = [
2 Tool 1 (Read, safe) - status: yielded
3 Tool 2 (Read, safe) - status: completed <- results pending emission
4 Tool 3 (Read, safe) - status: executing
5 Tool 4 (Bash, unsafe) - status: queued
6]

Traversal process:

  1. Tool 1: yielded, skip (but emit any pending progress first)
  2. Tool 2: completed, emit results, mark as yielded
  3. Tool 3: executing, concurrency-safe, don't break, continue traversal (emit pending progress)
  4. Tool 4: queued, doesn't match any condition, natural end

What if Tool 3 were non-concurrency-safe?

Text
1tools = [
2 Tool 1 (Read, safe) - status: yielded
3 Tool 2 (Read, safe) - status: completed
4 Tool 3 (Bash, unsafe) - status: executing <- still running
5 Tool 4 (Read, safe) - status: completed
6]

Traversal process:

  1. Tool 1: yielded, skip
  2. Tool 2: completed, emit results
  3. Tool 3: executing and !isConcurrencySafe, break!
  4. Tool 4's results will NOT be emitted, even though it's already completed

Why? Because the non-concurrency-safe tool's results may have changed the context (via contextModifiers), and Tool 4's results might depend on this modified context. So we must wait for Tool 3 to complete and the context to update before emitting Tool 4's results.

getRemainingResults Wait Mechanism

getRemainingResults() is an AsyncGenerator (lines 453-490) that continuously waits until all tools have finished:

src/services/tools/StreamingToolExecutor.ts:453-490
TypeScript
453async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
454 if (this.discarded) {
455 return
456 }
457
458 while (this.hasUnfinishedTools()) {
459 await this.processQueue()
460
461 for (const result of this.getCompletedResults()) {
462 yield result
463 }
464
465 if (
466 this.hasExecutingTools() &&
467 !this.hasCompletedResults() &&
468 !this.hasPendingProgress()
469 ) {
470 const executingPromises = this.tools
471 .filter(t => t.status === 'executing' && t.promise)
472 .map(t => t.promise!)
473
474 const progressPromise = new Promise<void>(resolve => {
475 this.progressAvailableResolve = resolve
476 })
477
478 if (executingPromises.length > 0) {
479 await Promise.race([...executingPromises, progressPromise])
480 }
481 }
482 }
483
484 for (const result of this.getCompletedResults()) {
485 yield result
486 }
487}

Promise.race is the key — it simultaneously waits for two types of events:

  1. Any executing tool to complete
  2. Any tool to produce new progress messages

Whichever happens first wakes up the loop, allowing it to emit new results or progress. This implements an event-driven reactive loop — not polling, but passively waiting for notifications.


interruptBehavior: Strategy Selection on User Interruption

When a user presses ESC or submits a new message during tool execution, different tools should react differently. Some tools should stop immediately (like a long-running search), while others should continue running to completion (like a file write in progress — stopping midway could corrupt the file).

cancel vs block

The interruptBehavior method is defined at lines 408-416 of src/Tool.ts:

src/Tool.ts:408-416
TypeScript
408/**
409 * What should happen when the user submits a new message while this tool
410 * is running.
411 *
412 * - 'cancel' — stop the tool and discard its result
413 * - 'block' — keep running; the new message waits
414 *
415 * Defaults to 'block' when not implemented.
416 */
417interruptBehavior?(): 'cancel' | 'block'
  • cancel: The tool can safely stop midway. On user interruption, a synthetic error message is generated and partial results are discarded.
  • block: The tool is performing a non-interruptible operation. The user's new message must wait until this tool completes before being sent.

The default behavior is block, which is again a conservatively safe design.

Implementation in StreamingToolExecutor

The getAbortReason() method (lines 210-230) handles interruptBehavior:

src/services/tools/StreamingToolExecutor.ts:210-230
TypeScript
210private getAbortReason(
211 tool: TrackedTool,
212): 'sibling_error' | 'user_interrupted' | 'streaming_fallback' | null {
213 if (this.discarded) {
214 return 'streaming_fallback'
215 }
216 if (this.hasErrored) {
217 return 'sibling_error'
218 }
219 if (this.toolUseContext.abortController.signal.aborted) {
220 if (this.toolUseContext.abortController.signal.reason === 'interrupt') {
221 return this.getToolInterruptBehavior(tool) === 'cancel'
222 ? 'user_interrupted'
223 : null
224 }
225 return 'user_interrupted'
226 }
227 return null
228}

Note the priority hierarchy here:

  1. First check discarded (streaming fallback) — highest priority
  2. Then check hasErrored (sibling error) — second highest
  3. Finally check the abort signal:
    • If the reason is 'interrupt' (user submitted a new message), only cancel tools will be cancelled
    • If the reason is something else (user pressed ESC), all tools will be cancelled

Interruptible State Updates

The updateInterruptibleState() method (lines 254-260) maintains a global state that tells the UI whether all tools can currently be interrupted:

src/services/tools/StreamingToolExecutor.ts:254-260
TypeScript
254private updateInterruptibleState(): void {
255 const executing = this.tools.filter(t => t.status === 'executing')
256 this.toolUseContext.setHasInterruptibleToolInProgress?.(
257 executing.length > 0 &&
258 executing.every(t => this.getToolInterruptBehavior(t) === 'cancel'),
259 )
260}

Only when all executing tools are of the cancel type does the UI show an "interruptible" indicator. If any block tool is running, the entire turn is considered non-interruptible.


Discardable Mode: Tool Discard During Streaming Fallback

Claude Code uses streaming to receive model responses, but streaming can fail (network errors, server issues, etc.). When a streaming fallback occurs, the executor needs to discard results from tools that have already started but haven't completed.

The discard() method (lines 69-71) is very simple:

src/services/tools/StreamingToolExecutor.ts:64-71
TypeScript
64/**
65 * Discards all pending and in-progress tools. Called when streaming fallback
66 * occurs and results from the failed attempt should be abandoned.
67 * Queued tools won't start, and in-progress tools will receive synthetic errors.
68 */
69discard(): void {
70 this.discarded = true
71}

It only sets a flag. This flag propagates to all tools through getAbortReason():

  • Queued tools: processQueue() -> executeTool() -> detects abort reason -> immediately generates synthetic error
  • Executing tools: detects abort reason in the next iteration loop -> generates synthetic error and breaks
  • Completed tools: getCompletedResults() checks this.discarded and returns immediately

getRemainingResults() also checks this.discarded (lines 454-456):

src/services/tools/StreamingToolExecutor.ts:453-456
TypeScript
453async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
454 if (this.discarded) {
455 return
456 }
457 // ...
458}

This guarantees that after a streaming fallback, no residual results leak into subsequent processing.


Complete Execution Flow

Let's tie all the components together with an end-to-end example. Suppose the model returns the following tool calls:

Text
11. Read("src/main.ts") — concurrency-safe
22. Grep("TODO") — concurrency-safe
33. Read("src/utils.ts") — concurrency-safe
44. Bash("npm test") — not concurrency-safe
55. Edit("src/main.ts", ...) — not concurrency-safe

Phase 1: Concurrent Reads

Text
1t=0 addTool(Read main.ts) -> queued -> processQueue() -> canExecute? YES -> executing
2t=1 addTool(Grep TODO) -> queued -> processQueue() -> canExecute? YES -> executing
3t=2 addTool(Read utils.ts) -> queued -> processQueue() -> canExecute? YES -> executing

Three concurrency-safe tools begin executing simultaneously.

Phase 2: Bash Queued

Text
1t=3 addTool(Bash npm test) -> queued -> processQueue() -> canExecute? NO (safe tools executing, but self is unsafe)

The Bash tool enters the queue.

Phase 3: Edit Queued

Text
1t=4 addTool(Edit main.ts) -> queued -> processQueue() -> canExecute? NO -> break (unsafe tool blocks)

The Edit tool also enters the queue. Encountering the queued Bash (unsafe), break prevents further processing.

Phase 4: Reads Complete, Bash Starts

Text
1t=5 Read(main.ts) completed -> processQueue() -> Bash canExecute? YES -> executing
2t=6 Grep completed
3t=7 Read(utils.ts) completed

When the first read completes, it triggers processQueue(). At this point all reads are completed or completing, and Bash is the first queued tool. Whether Bash can execute depends on whether any other tools are still executing. Assuming all reads happen to have completed by time step 5, Bash can begin exclusive execution.

Phase 5: Ordered Result Emission

Text
1getRemainingResults() emits:
2 -> Read(main.ts) result
3 -> Grep(TODO) result
4 -> Read(utils.ts) result
5 (waiting for Bash to complete...)
6 -> Bash(npm test) result
7 (waiting for Edit to complete...)
8 -> Edit(src/main.ts) result

Results are emitted strictly in the order tools were added.

Exception Path: Bash Fails

If npm test fails:

Text
1t=8 Bash(npm test) returns is_error: true
2 -> hasErrored = true
3 -> siblingAbortController.abort('sibling_error')
4 -> Edit(src/main.ts) detects abort at executeTool() entry
5 -> Generates synthetic error: "Cancelled: parallel tool call Bash(npm test) errored"

The Edit tool won't be executed, and the model will receive two error messages — one with Bash's real error, and one with Edit's cancellation notice. The model can then decide its next steps based on this information.


Comparison with toolOrchestration

There's another tool orchestration implementation in src/services/tools/toolOrchestration.ts called runTools(). How does it differ from StreamingToolExecutor?

runTools() uses a partition-batch model (lines 19-80):

src/services/tools/toolOrchestration.ts:19-30
TypeScript
19export async function* runTools(
20 toolUseMessages: ToolUseBlock[],
21 assistantMessages: AssistantMessage[],
22 canUseTool: CanUseToolFn,
23 toolUseContext: ToolUseContext,
24): AsyncGenerator<MessageUpdate, void> {
25 let currentContext = toolUseContext
26 for (const { isConcurrencySafe, blocks } of partitionToolCalls(
27 toolUseMessages,
28 currentContext,
29 )) {

It first partitions all tool calls by concurrency safety, then executes them batch by batch. This is a simpler model — but it requires all tool calls to be known before execution begins.

StreamingToolExecutor's advantage is its support for incremental addition — tool calls are added one by one as the streaming response arrives, without waiting for all tool calls to be parsed. This is critical in streaming scenarios, because the model may still be generating the 5th tool call while the first 3 can already start executing.

FeaturerunTools()StreamingToolExecutor
Tool addition timingAll at onceIncremental
Scheduling strategyPartition-batchReal-time queue scheduling
Progress messagesNo special handlingSeparate storage, immediate emission
Error cascadingNoneSibling AbortController
Discard modeNoneSupported
Interrupt behaviorNonecancel/block strategy

Memory Safety of createChildAbortController

StreamingToolExecutor makes extensive use of createChildAbortController() (defined in src/utils/abortController.ts). This utility method deserves a closer look because it solves an easily overlooked memory leak problem.

The standard parent-child AbortController relationship is typically implemented like this:

TypeScript
1// Naive implementation
2parent.signal.addEventListener('abort', () => {
3 child.abort(parent.signal.reason)
4})

The problem is: parent holds a strong reference to child through the closure. Even if child is discarded at the application level, as long as parent is alive, child can't be garbage collected. In StreamingToolExecutor, each tool creates a toolAbortController (child), while siblingAbortController (parent) lives throughout the entire tool execution phase. If the model returns 20 tool calls, there are 20 children strongly held by the parent.

createChildAbortController() solves this with WeakRef (lines 68-99):

src/utils/abortController.ts:68-99
TypeScript
68export function createChildAbortController(
69 parent: AbortController,
70 maxListeners?: number,
71): AbortController {
72 const child = createAbortController(maxListeners)
73
74 if (parent.signal.aborted) {
75 child.abort(parent.signal.reason)
76 return child
77 }
78
79 const weakChild = new WeakRef(child)
80 const weakParent = new WeakRef(parent)
81 const handler = propagateAbort.bind(weakParent, weakChild)
82
83 parent.signal.addEventListener('abort', handler, { once: true })
84
85 // Auto-cleanup: remove parent listener when child is aborted
86 child.signal.addEventListener(
87 'abort',
88 removeAbortHandler.bind(weakParent, new WeakRef(handler)),
89 { once: true },
90 )
91
92 return child
93}

Key design decisions:

  1. WeakRef holds child: The parent's event listener references child through WeakRef, not preventing GC
  2. WeakRef holds parent: The child's cleanup logic also references parent through WeakRef, avoiding reverse strong references
  3. Auto-cleanup: When child is aborted, it automatically removes its listener from parent, preventing listener accumulation
  4. {once: true}: Ensures the event handler is called only once

These measures ensure no memory leaks occur in high-concurrency tool execution scenarios.


Transferable Patterns: Implementing Similar Architecture in Your Projects

StreamingToolExecutor's concurrency model isn't unique to Claude Code — it's fundamentally a declarative concurrency scheduler. If you need to implement similar tool orchestration in your own projects, here are the core patterns you can adopt:

Pattern 1: Self-Declared Concurrency Safety

Let each operation declare for itself whether it can run in parallel, rather than hard-coding rules in the scheduler:

TypeScript
1interface Operation {
2 // The operation decides for itself whether it can run in parallel
3 isConcurrencySafe(input: unknown): boolean
4 execute(input: unknown, signal: AbortSignal): Promise<Result>
5}

Benefit: the scheduler doesn't need to understand the details of each operation, and adding new operations doesn't require modifying the scheduler code.

Pattern 2: Read-Write Lock Scheduling

TypeScript
1function canExecute(
2 newOp: Operation,
3 executingOps: Operation[]
4): boolean {
5 // No operations executing: always allowed
6 if (executingOps.length === 0) return true
7 // New operation and all executing operations are concurrency-safe: allowed
8 if (newOp.isSafe && executingOps.every(op => op.isSafe)) return true
9 // Otherwise: wait
10 return false
11}

Pattern 3: Layered AbortController

TypeScript
1class OperationGroup {
2 private groupController: AbortController
3 private operations: Map<string, AbortController> = new Map()
4
5 constructor(parentController: AbortController) {
6 // group controller is a child of the parent controller
7 this.groupController = createChild(parentController)
8 }
9
10 addOperation(id: string): AbortSignal {
11 // each operation's controller is a child of the group
12 const opController = createChild(this.groupController)
13 this.operations.set(id, opController)
14 return opController.signal
15 }
16
17 cancelGroup(reason: string): void {
18 // cancel all operations in the group without affecting the parent
19 this.groupController.abort(reason)
20 }
21}

Pattern 4: Separating Progress from Results

TypeScript
1interface TrackedOperation {
2 status: 'queued' | 'executing' | 'completed' | 'yielded'
3 // Progress messages stored separately, can be emitted out of order
4 pendingProgress: ProgressEvent[]
5 // Final results emitted in order
6 results?: Result[]
7}
8
9function* yieldInOrder(operations: TrackedOperation[]) {
10 for (const op of operations) {
11 // Progress is always emitted immediately
12 yield* op.pendingProgress.splice(0)
13
14 if (op.status === 'completed') {
15 yield* op.results!
16 op.status = 'yielded'
17 } else if (op.status === 'executing' && !op.isSafe) {
18 // Non-safe operations block subsequent result emission
19 break
20 }
21 }
22}

Pattern 5: Conservative Defaults

TypeScript
1const DEFAULTS = {
2 isConcurrencySafe: () => false, // Default to unsafe
3 interruptBehavior: () => 'block', // Default to non-interruptible
4}

In safety-related scenarios, always make the default behavior the most conservative. Tool developers must proactively declare safety, rather than safety being assumed by default.

Complete Mini Implementation

Combining the patterns above, a minimal viable concurrency scheduler is roughly 200 lines of code:

TypeScript
1type OperationStatus = 'queued' | 'executing' | 'completed' | 'yielded'
2
3interface TrackedOp<T> {
4 id: string
5 isSafe: boolean
6 status: OperationStatus
7 execute: (signal: AbortSignal) => Promise<T>
8 result?: T
9 error?: Error
10 promise?: Promise<void>
11}
12
13class ConcurrentScheduler<T> {
14 private ops: TrackedOp<T>[] = []
15 private groupAbort = new AbortController()
16
17 add(op: TrackedOp<T>): void {
18 this.ops.push({ ...op, status: 'queued' })
19 this.processQueue()
20 }
21
22 private canExecute(isSafe: boolean): boolean {
23 const executing = this.ops.filter(o => o.status === 'executing')
24 return executing.length === 0 ||
25 (isSafe && executing.every(o => o.isSafe))
26 }
27
28 private processQueue(): void {
29 for (const op of this.ops) {
30 if (op.status !== 'queued') continue
31 if (this.canExecute(op.isSafe)) {
32 this.executeOp(op)
33 } else if (!op.isSafe) {
34 break // Maintain order for non-safe operations
35 }
36 }
37 }
38
39 private async executeOp(op: TrackedOp<T>): Promise<void> {
40 op.status = 'executing'
41 try {
42 op.result = await op.execute(this.groupAbort.signal)
43 } catch (e) {
44 op.error = e as Error
45 if (!op.isSafe) {
46 this.groupAbort.abort('operation_error')
47 }
48 }
49 op.status = 'completed'
50 this.processQueue()
51 }
52
53 *getResults(): Generator<{ id: string; result?: T; error?: Error }> {
54 for (const op of this.ops) {
55 if (op.status === 'yielded') continue
56 if (op.status === 'completed') {
57 op.status = 'yielded'
58 yield { id: op.id, result: op.result, error: op.error }
59 } else if (op.status === 'executing' && !op.isSafe) {
60 break
61 }
62 }
63 }
64}

Design Trade-offs

Looking back at the entire StreamingToolExecutor design, there are several trade-offs worth discussing:

Why Do Only Bash Errors Cascade?

The code comment says it clearly (lines 357-359):

Bash commands often have implicit dependency chains (e.g. mkdir fails -> subsequent commands pointless). Read/WebFetch/etc are independent — one failure shouldn't nuke the rest.

This is a pragmatic choice. In theory, each tool could declare "whether my errors should cascade," but in practice, only the Bash tool has this kind of implicit dependency relationship. Over-engineering would only increase the cognitive burden on tool developers.

Why Not Support contextModifier for Concurrent Tools?

The code comment (lines 389-390) acknowledges this is a feature gap:

NOTE: we currently don't support context modifiers for concurrent tools. None are actively being used, but if we want to use them in concurrent tools, we need to support that here.

Concurrent tools modifying shared context requires solving race conditions — what happens when two tools simultaneously modify the same context field? The current approach simply prohibits it, waiting for actual demand before designing a solution. This is a textbook application of "YAGNI" (You Aren't Gonna Need It).

Why Does interruptBehavior Default to block?

Because cancelling a write operation midway could cause data corruption. block means "let the tool finish," which in the worst case only means waiting a few more seconds. cancel in the worst case could result in a half-written file. Safety > performance.

Why Generators Instead of Callbacks?

getCompletedResults() returns a Generator, and getRemainingResults() returns an AsyncGenerator. This design lets callers naturally consume results using for...of and for await...of, without needing to register callbacks. The lazy evaluation property of Generators also means unneeded results won't be computed.


Summary

StreamingToolExecutor is an elegant concurrency orchestration component in Claude Code that solves the seemingly simple but actually complex problem of "letting AI operate multiple tools simultaneously." Its core design principles include:

  1. Self-declared concurrency safety: Tools know whether they can run in parallel; the scheduler merely executes their declarations
  2. Read-write lock scheduling: Concurrency-safe tools share access, non-concurrency-safe tools get exclusive access
  3. Layered cancellation: Three-layer AbortController architecture for precise error cascading
  4. Ordered emission: Progress is immediately visible, results are output in order
  5. Conservative defaults: Without a declaration, assume unsafe and non-interruptible

These principles apply not only to AI tool orchestration but to any system requiring mixed concurrency strategies — database operation scheduling, microservice orchestration, CI/CD pipeline management, and more. The 530 lines of StreamingToolExecutor distill the core wisdom of production-grade concurrency orchestration.

In the next article, we'll dive into the permission system — exploring how Claude Code ensures every tool call undergoes a security review through its six-layer evaluation chain.