Status: Approved (Sections 1-5)
Date: 2026-06-10
Status: Approved (Sections 1-5)
Scope: Implementation of the executeLocal() harness loop in PumpStation.kt
Context
PumpStation is the agentic-loop container in TPipe — the harness that drives a judge/dispatch/path cycle over multiple turns. The class has all the configuration scaffolding in place (agents, DITL hooks, events, loop guards, memory modes, pause/resume, snapshot/restore) but the runtime loop body that orchestrates these is unwritten. executeLocal() currently does a partial health check then returns the input as-is (PumpStation.kt:1444-1445 has the TODO marker).
This design specifies exactly what the loop must do, in what order, with what data flowing at each phase boundary, and what scaffolding is missing to make it work.
Design Philosophy
PumpStation is “quick and dirty” — auto-inject what the developer doesn’t supply. Unlike Manifold (which has “magic contracts” the developer must remember and inject into their agents), PumpStation:
- Auto-injects default system prompts when the developer doesn’t supply custom ones
- Reads standard
MultimodalContentflags (terminatePipeline,passPipeline,interuptPipeline) for loop control — no magic contracts - Auto-invokes builder functions per turn so fresh, thread-safe agents are created when needed
- Provides sensible defaults for every config field; the developer overrides only what they need
The pipe class’s prompt injection system is the primary mechanism for prompt composition. enableHarnessMode() auto-injects the path descriptor protocol. setSystemPrompt() + applySystemPrompt() handle the layered injection. The pump station just orchestrates the pipe’s existing capabilities.
MultimodalContent flags are the canonical control pattern TPipe developers are taught early. The loop leans on these flags as much as possible.
Two-tier history management: turnHistory is the curated, compacted, agent-facing history. rawTurnHistory is the full, append-only event log used by the goal agent, DITL hooks, and developers.
Section 1: Architecture Overview
Main Loop Structure
runHarnessLoop()
├── runPreInitPhase(content) ← ONCE at start
├── while (turnIndex < maxTurns) {
│ ├── checkPauseGuards(BeforeJudge)
│ ├── runTurn() ← returns Continue | Halt(reason)
│ └── turnIndex++
}
└── runFinalizationPhase() ← ONCE at end
runTurn() Body
private suspend fun runTurn(): TurnResult {
refreshAgentInstances() // R.1
refreshPipelinesPrompts() // R.2
refreshSettingsPropagation() // R.3
runHealthCheckPhase()
detectAndHandleContextBlowout(PumpStationPhase.HealthCheck)
val judgeVerdict = runJudgePhase()
detectAndHandleContextBlowout(PumpStationPhase.Judge)
if (judgeVerdict.shouldHalt) return TurnResult.Halt(judgeVerdict.reason)
if (judgeVerdict.isComplete) return runExitFlow()
val pathRequest = runDispatchPhase() ?: return TurnResult.Continue
detectAndHandleContextBlowout(PumpStationPhase.Dispatch)
if (pathRequest.pathName.isBlank()) return TurnResult.Continue
if (!checkPauseGuards(PumpStationPausePhase.BeforePathExecution)) {
return TurnResult.Halt(PumpStationExitReason.KillSwitchTripped)
}
runPathFlow(pathRequest)
detectAndHandleContextBlowout(PumpStationPhase.PathExecution)
runForegroundAgentsPhase()
detectAndHandleContextBlowout(PumpStationPhase.ForegroundAgents)
runBackgroundAgentsPhase()
runMemoryUpdatePhase()
runCompactionPhase()
return TurnResult.Continue
}
Per-Turn Refresh (R.1, R.2, R.3)
Why per-turn, not just PreInit: Lazy-loaded paths, reserve path reveals, todo list updates, builder function results, and memory agent outputs all change during a turn. The dispatch pipe’s applySystemPrompt() reads the current state — so the loop must call it every turn to refresh.
R.1 — Refresh Agent Instances:
private fun refreshAgentInstances() {
judgeAgentBuilderFunction?.let { fn ->
judgeAgent = fn(this)
judgeAgent?.setParentInterface(this)
judgeAgent?.P2PInit()
}
dispatchAgentBuilderFunction?.let { fn ->
dispatchAgent = fn(this)
dispatchAgent?.setParentInterface(this)
dispatchAgent?.P2PInit()
}
}
R.2 — Refresh Pipeline Prompts:
private fun refreshPipelinesPrompts() {
applyPromptsToPipeline(judgeAgent, buildJudgeSystemPrompt(), buildJudgeFooter())
applyPromptsToPipeline(dispatchAgent, buildDispatchSystemPrompt(), buildDispatchFooter())
if (goalAgent is Pipeline) applyPromptsToPipeline(goalAgent as Pipeline, buildGoalSystemPrompt(), null)
}
private fun applyPromptsToPipeline(agent: Pipeline?, customPrompt: String?, customFooter: String?) {
if (agent == null) return
for (pipe in agent.getPipes()) {
pipe.setSystemPrompt(customPrompt ?: defaultPromptFor(agent))
pipe.setFooterPrompt(customFooter ?: defaultFooterFor(agent))
if (agent == dispatchAgent) pipe.enableHarnessMode()
pipe.applySystemPrompt() // refreshes layered injections
}
}
R.3 — Refresh Settings: calls existing propagateSettingsToAllAgents().
Stage 1: Pre-Initialization
private suspend fun runPreInitPhase(content: MultimodalContent) {
taskState.originalInput = content
taskState.latestContent = content
taskState.status = PumpStationStatus.Running
taskState.phase = PumpStationPhase.PreInit
refreshAgentInstances() // initial builder-function invocation
refreshPipelinesPrompts() // initial prompt application
refreshSettingsPropagation() // initial budget/pipe settings
if (preInitAgent != null) taskState.latestContent = preInitAgent!!.executeLocal(content)
if (preInitFunction != null) taskState.latestContent = preInitFunction!!.invoke(content, this)
emit(HarnessStarted(runId, turnIndex = 0, originalInput = content))
taskState.phase = PumpStationPhase.Judge
}
Stage 3: Exit Flow
private suspend fun runExitFlow(): TurnResult {
if (!checkPauseGuards(PumpStationPausePhase.BeforeGoalValidation)) {
return TurnResult.Halt(PumpStationExitReason.KillSwitchTripped)
}
if (goalAgent == null) return TurnResult.Halt(PumpStationExitReason.JudgeComplete)
val agent = goalAgentBuilderFunction?.invoke(this) ?: goalAgent!!
agent.setParentInterface(this)
agent.P2PInit()
refreshPipelinesPrompts()
emit(GoalValidationStarted(...))
val goalContent = buildGoalContent()
val result = agent.executeLocal(goalContent)
emit(GoalValidationCompleted(...))
if (result.terminatePipeline) {
turnHistory.add(ConverseData(role = ConverseRole.assistant, content = result))
taskState.goalFailCount++
if (taskState.goalFailCount > maxGoalFailAttempts) {
return TurnResult.Halt(PumpStationExitReason.GoalValidationFailed)
}
return TurnResult.Continue // loop again with goal's feedback in history
}
return TurnResult.Halt(PumpStationExitReason.JudgeComplete)
}
Stage 4: Finalization
private suspend fun runFinalizationPhase(): MultimodalContent {
drainBackgroundEventQueue()
backgroundJobs.forEach { it.join() }
backgroundJobs.clear()
// Final memory update
if (contextFillRatio() > compactionThreshold) runCompactionPhase()
if (taskState.status == PumpStationStatus.Failed || taskState.exitReason in failureReasons) {
emit(HarnessFailed(error = taskState.lastError ?: UnknownPath,
errorMessage = taskState.lastError?.name,
exitReason = taskState.exitReason ?: Error))
} else {
emit(HarnessCompleted(exitReason = taskState.exitReason ?: JudgeComplete,
finalOutput = taskState.latestContent))
}
return taskState.latestContent ?: MultimodalContent()
}
Section 2: Component Breakdown (Phase Methods)
runJudgePhase(): JudgeVerdict
private suspend fun runJudgePhase(): JudgeVerdict {
taskState.phase = PumpStationPhase.Judge
emit(JudgeStarted(...))
if (preInvokeFunction?.invoke(contextWindow, miniBank, this) == false) {
return JudgeVerdict(shouldHalt = true, reason = PumpStationExitReason.InterventionTerminated)
}
val baseInput = buildTurnContent()
val input = preValidationJudgeFunction?.invoke(baseInput, miniBank, this)
?.let { baseInput.copy(miniBankContext = it) } ?: baseInput
val result = judgeAgent!!.executeLocal(input)
val postResult = postJudgeFunction?.invoke(result, this) ?: result
val verdict = parseJudgeVerdict(postResult).withFlagCheck(postResult)
taskState.latestContent = postResult
emit(JudgeCompleted(verdict.isComplete, verdict.shouldTerminate))
return verdict
}
runDispatchPhase(): PathRequest?
private suspend fun runDispatchPhase(): PathRequest? {
taskState.phase = PumpStationPhase.Dispatch
emit(DispatchStarted(...))
val baseInput = taskState.latestContent ?: buildTurnContent()
val input = preValidationDispatchFunction?.invoke(baseInput, contextWindow, miniBank, this)
?.let { baseInput.copy(miniBankContext = it) } ?: baseInput
var result = dispatchAgent!!.executeLocal(input)
var repairAttempts = 0
while (repairAttempts < failurePolicy.maxDispatchRepairAttempts) {
if (checkMultimodalFlags(result, "Dispatch").shouldHalt) {
taskState.lastError = PumpStationError.P2PRequestInvalid
return null
}
val pathRequest = parseDispatchOutput(result)
if (pathRequest != null) {
emit(DispatchCompleted(selectedPathName = pathRequest.pathName, pathRequest = pathRequest))
return pathRequest
}
if (!failurePolicy.repairInvalidDispatchJson) break
repairAttempts++
result = dispatchAgent!!.executeLocal(buildRepairPrompt(result))
}
emit(DispatchCompleted(null, null))
emit(PathFailed(...reason = PumpStationError.DispatchJsonRepairFailed))
if (failurePolicy.stopHarnessOnInvalidPathRequest) {
taskState.lastError = PumpStationError.DispatchJsonRepairFailed
}
return null
}
runPathFlow(request: PathRequest)
private suspend fun runPathFlow(request: PathRequest) {
val path = resolvePath(request.pathName) ?: run {
emit(PathFailed(...error = PumpStationError.UnknownPath, errorMessage = "Path not found"))
taskState.latestContent = buildLlmErrorMessage(
PumpStationError.UnknownPath,
mapOf("pathName" to request.pathName, "availablePaths" to getVisiblePathNames())
)
return
}
val input = buildPathInput(path, request)
invokePath(path, input)
}
runHealthCheckPhase() (conditional)
Fires if healthAgent is set AND (turnsSinceLast >= healthAgentTurnInterval OR errorRatio >= healthAgentErrorRatioThreshold).
runForegroundAgentsPhase() (conditional)
For each HarnessAgentSlot with concurrency == Blocking: fire if turnIndex % foregroundTurnInterval == 0.
runBackgroundAgentsPhase() (conditional, async)
For each HarnessAgentSlot with concurrency == Async: queue via launch if turnIndex % backgroundTurnInterval == 0.
runMemoryUpdatePhase() (conditional)
Queue memory agents at interval; block if contextFillRatio() > compactionThreshold.
runCompactionPhase() (conditional)
Fires per compactionStrategy (Whole / Chunked / Hybrid). Triggers preCompactionFunction → strategy → postCompactionFunction.
detectAndHandleContextBlowout(afterPhase: PumpStationPhase): Boolean
The emergency brake. Called at every phase boundary where a child agent returned content. If total context size exceeds blowoutThreshold:
- Stash oversized content (if
failurePolicy.stashOversizedOutputs) - Replace
taskState.latestContentwith stash placeholder - Run
preCompactionFunction→runCompactionPhase()→postCompactionFunction - Fire
onContextTruncatedcallback - Return
true(blowout was handled)
checkPauseGuards(phase: PumpStationPausePhase): Boolean
Called at 11 named phase boundaries. Returns false if kill switch tripped, exit reason set, or pause requested at this phase. Suspends via Channel<Unit> until resume() is called.
Phase Method Summary Table
| Method | Phase | Conditional? | Emits Started/Completed? |
|---|---|---|---|
runPreInitPhase | PreInit | No (once) | No / No (emits HarnessStarted) |
refreshAgentInstances | (R.1) | No | No |
refreshPipelinesPrompts | (R.2) | No | No |
refreshSettingsPropagation | (R.3) | No | No |
runHealthCheckPhase | HealthCheck | Yes (interval) | Yes / Yes |
runJudgePhase | Judge | No | Yes / Yes |
runDispatchPhase | Dispatch | No | Yes / Yes |
runPathFlow | PathExecution | No (after dispatch) | (delegated to invokePath) |
runForegroundAgentsPhase | ForegroundAgents | Yes (interval) | Yes / Yes (per agent) |
runBackgroundAgentsPhase | BackgroundAgents | Yes (interval, async) | (agents emit) |
runMemoryUpdatePhase | MemoryUpdate | Yes (interval) | Yes / Yes |
runCompactionPhase | Compaction | Yes (strategy) | Yes / Yes |
runExitFlow | GoalValidation | Yes (judge said complete) | Yes / Yes |
runFinalizationPhase | Exit | No (once) | No / No (emits HarnessCompleted/Failed) |
detectAndHandleContextBlowout | (per-phase) | Yes (over threshold) | (StashCreated, onContextTruncated callback) |
checkPauseGuards | (per-phase) | No (always checked) | HarnessSuspended / HarnessResumed |
Section 3: Data Flow
Prompt Layering (the corrected model)
personality, systemTask, userGuidelines, entryUserPrompt go into the system prompt — set via pipe.setSystemPrompt(prompt) where prompt is the composed system prompt string. The pipe’s applySystemPrompt() then layers:
rawSystemPrompt(set bysetSystemPrompt())- JSON input/output requirements (if not native JSON)
- PCP merged mode
- PCP-only mode
- P2P agent descriptors
- Path descriptor protocol (if
enableHarnessMode()is called — PumpStation dispatch integration) - Context instructions
- Todo list (if
injectTodoListis set on the pipe) footerPrompt- Semantic decompression prelude
The PumpStation’s contribution: set rawSystemPrompt to the composed system prompt, call enableHarnessMode() on the dispatch pipe, optionally enable injectTodoList, set the footerPrompt.
buildTurnContent() (user-message content)
private fun buildTurnContent(): MultimodalContent {
return MultimodalContent(
text = buildUserMessageForTurn(), // turnSummary + role-specific question
binaryContent = taskState.latestContent?.binaryContent ?: mutableListOf(),
context = ContextWindow(
loreBookKeys = contextWindow.loreBookKeys.toMutableMap(),
contextElements = contextWindow.contextElements.toMutableList(),
converseHistory = turnHistory, // curated, structured
version = contextWindow.version
),
miniBankContext = miniBank,
tools = taskState.latestContent?.tools ?: PcPRequest(),
metadata = mutableMapOf<Any, Any>(
"taskState" to taskState, "phase" to taskState.phase,
"turnIndex" to taskState.turnIndex, "runId" to taskState.runId,
"isInitialTurn" to (taskState.turnIndex == 0),
"visiblePaths" to getVisiblePathNames()
)
)
}
buildUserMessageForTurn() returns just the user-facing text:
- Judge:
turnSummary+ “Is the task complete?” - Dispatch:
turnSummary+ “Select the next path.” - HealthCheck: serialized
HealthContextJSON - Goal:
turnSummary+ judge verdict + path call log + “Verify the work was done.” - Path:
pathRequest.inputData - Foreground/Background:
turnSummary+ role-specific question
Two-Tier History
turnHistory (Optimized) | rawTurnHistory (Full State) | |
|---|---|---|
| Audience | Judge, Dispatch, agents | Goal agent, DITL hooks, developers |
| Size bound | maxTurnHistorySize | maxRawTurnHistorySize (or unbounded) |
| Compaction | Yes | No (append-only) |
| LLM-facing | Yes (via context.converseHistory) | No (only via buildGoalContent metadata) |
Invariant: rawTurnHistory is a strict superset of turnHistory. Both are updated in lockstep in invokePath().
Event Queue
backgroundEventQueue: Channel<PumpStationEvent>(UNLIMITED) is the single source of truth. All phases emit events via emit(event) helper. Drained at finalization.
Background Job Lifecycle
backgroundJobs: MutableList<Job> tracks async work. Spawned by runBackgroundAgentsPhase and runMemoryUpdatePhase. Joined at compaction, finalization. Failures are isolated (caught, logged, don’t halt the loop).
Stash Lifecycle
stash: MutableMap<String, ConverseData> and stashManifest: MutableList<StashEntry>. Populated by detectAndHandleContextBlowout(). Read by paths (via new getStashContent(stashId) on PathObject) and the goal agent (via buildGoalContent() metadata).
Kill Switch & Pause
- Kill switch propagated via
setParentInterface(this)to all child agents - Checked at every
checkPauseGuards()call pauseAt(phases...)→resume()viaChannel<Unit>rendezvousforceHalt(reason)for emergency exit from paused state
Builder Function Lifecycle
Invoked at the start of every turn via refreshAgentInstances(). For paths, invoked per-path-call (already implemented in PathObject.execute()).
Loop Control State Machine
NotStarted → P2PInitInternal() → Running + PreInit
Running + PreInit → runPreInitPhase() → Running + Judge
Running + Judge → runJudgePhase()
├── isComplete → Running + GoalValidation → Running + Exit
│ ├── goalPass: Completed
│ └── goalFail: back to Running + Judge (recursion)
└── !isComplete → Running + Dispatch → ... → back to Judge
Suspended (during pause), Terminated (kill switch), Failed (max turns / error)
Section 4: Error Handling
Three-Tier Error Model
| Tier | Loop behavior | Example |
|---|---|---|
| Warning | Continue, emit warning event | Background job error |
| Recoverable | Continue, attempt recovery | Dispatch JSON parse failure, path not found, context blowout |
| Halt | Exit, emit HarnessFailed | Kill switch, max turns, terminatePipeline |
Per-Phase Error Handling
- Judge: catch exceptions, treat unparseable as
isComplete=false, halt onterminatePipeline, treatpassPipelineasisComplete=true - Dispatch: catch exceptions, apply repair flow (up to
maxDispatchRepairAttempts), halt onterminatePipelineor unparseable +stopHarnessOnInvalidPathRequest - Path:
invokePath()already has try/catch (PumpStation.kt:1804-1820). Errors emitPathFailed, settaskState.lastError, return input. Now also replacestaskState.latestContentwith LLM-targeted error message. - Memory: background job failures are isolated, not fatal
- Compaction: failures caught,
taskState.lastError = MemoryBlowout - Goal: caught as terminatePipeline, increments
goalFailCount, halts if exceeded
Dispatch JSON Repair Flow
parseDispatchOutput(content) fails
├── failurePolicy.repairInvalidDispatchJson == false: skip repair
├── maxDispatchRepairAttempts attempts to re-prompt dispatch with repair instructions
├── All attempts fail:
│ ├── stopHarnessOnInvalidPathRequest: halt
│ └── else: skip this turn (return null)
The repair prompt is a fresh MultimodalContent asking the dispatch to fix its output. It includes the schema and the previous malformed output.
Context Blowout Recovery
contextFillRatio() + contentSize > blowoutThreshold
├── stashOversizedOutputs: stash to stash[stashId], replace latestContent with placeholder, emit StashCreated
├── preCompactionFunction(content, overflowTurn, history, this) ← DITL can intervene
├── runCompactionPhase() ← unconditional
├── postCompactionFunction(content, newHistory, this)
└── onContextTruncated(true, remainingFreeSpace) ← developer callback
Recovery exhaustion halts with HarnessFailed(MemoryBlowout) after maxBlowoutRecoveries (new config, default 3) attempts.
LLM-Targeted Error Messages
When the harness replaces taskState.latestContent with an error message, the message must be LLM-targeted natural language with:
- What the LLM did
- Why it’s wrong
- What to do instead
- Concrete corrected example
- Available paths (if applicable)
Standard format:
[Harness Notice] Your previous action had an issue: {what}.
What you did: {concrete action}
Why it's a problem: {explanation}
What to do instead: {concrete correction}
Example of a correct call:
{JSON example}
Helper: buildLlmErrorMessage(error: PumpStationError, details: Map<String, Any>): String with sub-builders per error type.
Kill Switch Cascade
The kill switch is the only forcible halt outside normal completion. Checked at every checkPauseGuards(). Trips on:
- Harness itself (after blowout recovery exhaustion)
- Path execution (via
station.killSwitch?.trip()) - DITL hook (if developer accesses
harness.killSwitch) - External observer (via
tripKillSwitch())
Pause-Then-Halt Pattern
pauseAt(phases...) → suspends at named boundaries. resume() wakes up. forceHalt(reason) is the emergency exit. Developer is responsible for resuming; harness will hang otherwise (by design).
New Configuration Fields
memoryUpdateTimeoutMs: Long = 30_000maxBlowoutRecoveries: Int = 3maxRepairPromptTokens: Int = 500maxGoalFailAttempts: Int = 3(already in v5 design)
Section 5: Testing
Strategy
- Unit tests for each phase method with mock agents
- Integration tests for the full loop with real LLM mocks
- Capture-and-assert tests for event emission
- Hook-call-recording tests for DITL hook firing
- Fault-injection tests for error scenarios
- Snapshot tests for prompt composition
Test Fixtures (PumpStationTestFixtures.kt)
MockP2PAgent— records calls, returns scripted contentMockPipeline— wraps MockP2PAgentjudgeScriptedResponse(),dispatchScriptedResponse(),testPath()— builderssetEventObserver(consumer)— new method for testability
Test Files (new)
src/test/kotlin/Pipeline/
├── PumpStationTestFixtures.kt
├── runHarnessLoopTest.kt
├── runJudgePhaseTest.kt
├── runDispatchPhaseTest.kt
├── runPathFlowTest.kt
├── runMemoryUpdatePhaseTest.kt
├── runCompactionPhaseTest.kt
├── runExitFlowTest.kt
├── detectAndHandleContextBlowoutTest.kt
├── refreshPipelinesPromptsTest.kt
├── runHealthCheckPhaseTest.kt
├── buildLlmErrorMessageTest.kt
├── PumpStationEndToEndTest.kt
├── PumpStationPauseResumeTest.kt
├── PumpStationSnapshotTest.kt
├── PumpStationDslTest.kt
└── (existing tests extended)
Coverage Targets
| Area | Target |
|---|---|
| Phase methods | 100% |
| Helper methods (build*, parse*) | 90% |
| DITL hook firing | 100% |
| Error recovery | 100% |
| Event emission | 100% |
| Flag-based control | 100% |
| Loop guard interactions | 100% |
| Memory management | 80% |
| Pause/Resume | 100% |
| Snapshot/Restore | 90% |
Estimated: ~25 new test files, ~150-200 new test cases.
Missing Scaffolding Summary
New Data Classes
JudgeVerdict(isComplete, shouldTerminate, shouldHalt, reason)— typed parser outputTurnResultsealed class —Continue | Halt(reason)MemorySnapshot— captured in-progress state of memory agents (used bysaveSnapshot()to record lorebook and summary mid-flight values, so a rollback can restore without losing work)DispatchOutput(pathRequest, repairAttempts, parseError)HarnessAgentSlot(agent, concurrency, builderFunction)— replacesadditionalHarnessAgents: MutableList<P2PInterface>FlagCheckResult(shouldHalt, shouldPass, shouldInterrupt, haltReason)— forcheckMultimodalFlags()
New Configuration Fields
maxGoalFailAttempts: Int = 3maxRawTurnHistorySize: Int? = nullblowoutThreshold: Double(defaultcompactionThreshold + 0.1)memoryUpdateTimeoutMs: Long = 30_000maxBlowoutRecoveries: Int = 3maxRepairPromptTokens: Int = 500
Default Prompts (auto-injected when developer doesn’t supply)
DEFAULT_JUDGE_PROMPT— judge role framingDEFAULT_DISPATCH_PROMPT— dispatch role framingDEFAULT_GOAL_PROMPT— goal validation framing- Default judge footer / dispatch footer
Modified Configuration Fields
additionalHarnessAgents: MutableList<P2PInterface>→MutableList<HarnessAgentSlot>additionalHarnessAgentBuilderFuncList→ same wrapper type
Bug Fixes
getPaths()(PumpStation.kt:1257-1261) — should serializegetVisiblePathDescriptorsInternal()notpathList
New Event Types
HealthCheckStarted(onlyHealthCheckCompletedexists currently)
New Phase Methods (the loop body)
runHarnessLoop(),runPreInitPhase(),runHealthCheckPhase(),runJudgePhase(),runDispatchPhase(),runPathFlow(),runForegroundAgentsPhase(),runBackgroundAgentsPhase(),runMemoryUpdatePhase(),runCompactionPhase(),runExitFlow(),runFinalizationPhase(),checkPauseGuards(),withDitlWrap(),refreshAgentInstances(),refreshPipelinesPrompts(),refreshSettingsPropagation(),applyPromptsToPipeline(),checkMultimodalFlags()
New Helper Methods
buildTurnContent(),buildGoalContent(),buildPathInput(),buildJudgeSystemPrompt(),buildDispatchSystemPrompt(),buildGoalSystemPrompt(),buildJudgeFooter(),buildDispatchFooter(),defaultPromptFor(pipe),defaultFooterFor(pipe),buildUserMessageForTurn(),parseJudgeVerdict(),parseDispatchOutput(),parseHealthReport(),resolvePath(),contextFillRatio(),computeErrorRatio(),queueBackgroundMemoryAgents(),awaitBackgroundMemoryAgents(),drainBackgroundEventQueue(),pruneTurnHistory(),pruneRawTurnHistory(),summarizePoppedEntries(),compactWhole(),compactChunked(),compactHybrid(),detectAndHandleContextBlowout(),buildLlmErrorMessage(),buildInvalidPathRequestMessage(),buildUnknownPathMessage(),buildRepairFailedMessage(),buildPathExecutionExceptionMessage(),estimateContentSize(),estimateHistorySize(),computeBlowoutThreshold(),computeRemainingFreeSpace(),buildStashPlaceholder(),generateStashId(),buildHealthContext(),buildRepairPrompt(),tripKillSwitch(),forceHalt(),setEventObserver(),emitCompleted(),emitFailed(),awaitResumeSignal()
New Methods on PathObject
getStashContent(stashId: String): ConverseData?
TaskState Modifications
taskState.goalFailCount: Int = 0(new field)taskState.latestContent: MultimodalContent?already exists; expand how it’s set
Implementation Order (suggested)
When implementation begins (via writing-plans skill), the recommended order:
- Data classes first (lowest risk, no dependencies)
- Configuration fields (extend the class)
- Helper methods (pure functions of state)
- Per-phase methods (each independently testable)
- Per-turn refresh (R.1, R.2, R.3)
- Main loop (integrates all the above)
- Finalization (last)
- Test infrastructure (alongside each step)
- Bug fixes (e.g.,
getPaths()) - DSL extensions (if needed for new fields)
Verification
When implementation is complete, verify by:
./gradlew test— all unit and integration tests pass- Manual end-to-end test: build a PumpStation with a real LLM (Bedrock or Ollama), supply a multi-turn task, observe:
- Judge/Dispatch cycle
- Path execution via
enableHarnessMode() - Memory management (if agents configured)
- Goal validation (if goal agent configured)
- Pause/resume at phase boundaries
- Snapshot/restore
- Kill switch behavior
- Dispatch JSON repair on malformed output
- Context blowout detection with stash
- LLM-targeted error messages on path failure