`DistributionGrid` is TPipe's remote grid-harness container.
DistributionGrid
π‘ Tip:
DistributionGridis TPipeβs distributed grid harness. Think of it as a cluster of worker nodes where each node has a router (decides where work goes) and a worker (actually does the work). Tasks hop between nodes over P2P until complete.
Table of Contents
- What DistributionGrid Is
- When to Use It
- How It Works: The Big Picture
- Internal Data Structures
- The Router Contract
- The Worker Contract
- Hook Points Reference
- DSL Builder
- Manual Assembly
- Peer and Registry Discovery
- Durable Checkpoints
- PCP Forwarding Policy
- P2P Concurrency
- Common Startup Failures
- Best Practices
What DistributionGrid Is
DistributionGrid is a distributed task routing system. Each grid node is a single TPipe agent instance (running as a coroutine within the TPipe process) with:
- A router pipeline β receives the task content, decides what to do next, writes a
DistributionGridDirectiveback into content metadata - A worker pipeline β receives the task content, does the actual work, returns the result
- Optional peers β other nodes this node can send work to
The grid handles:
- Task routing β router decides: run locally, send to a peer, or forward to a remote node
- Remote handoff β framing tasks as grid RPC over P2P
- Session reuse β cached P2P sessions for repeated peer communication
- Retry logic β routing policy controls whether to retry same peer or try another
- Registry discovery β optional registry for finding downstream nodes
- Durable checkpoints β save/resume for long-running or interruptible tasks
When to Use It
Use DistributionGrid when:
- You have TPipe pipelines on different machines that need to coordinate
- You want to distribute LLM work across a cluster of agents
- You need reliability via retry and checkpoint mechanisms
- Youβre building a multi-agent system where one agentβs output feeds into another across machines
Donβt use it for simple single-machine fan-out β use Splitter instead.
How It Works: The Big Picture
End-to-End Flow
Caller Grid Node A Grid Node B
β β β
β grid.execute(content) β β
ββββββββββββββββββββββββββββββββββΊ β β
β β 1. Wrap content in β
β β DistributionGridEnvelope β
β β β
β β 2. Run router pipeline β
β β content = router.execute(content)
β β β
β β 3. Read directive from β
β β content.metadata["distributionGridDirective"]
β β β RUN_LOCAL_WORKER β
β β β HAND_OFF_TO_PEER β
β β β RETURN_TO_SENDER β
β β β TERMINATE β
β β β
β β 4a. RUN_LOCAL_WORKER: β
β β Run worker pipeline β
β β β finalize as result β
β β β
β β 4b. HAND_OFF_TO_PEER: β
β β Serialize envelope as β
β β P2P request, send to peer β
β β β β β β β β β β β β β β β β β ββΊ β
β β 5. Peer node β
β β receives P2P β
β β request β
β β runs router β
β β runs worker β
β β returns resultβ
β ββ β β β β β β β β β β β β β β β ββ€
β β 6. Receive response, finalize β
β β as terminal content β
ββββββββββββββββββββββββββββββββββββ€ β
β result β β
Node Identity
Each node has:
- A node ID β stable identifier for this node (used in
originNodeId,currentNodeId) - A transport β how to reach this node (address + method)
- A P2P descriptor β outward-facing identity published for discovery
Agent Contract
Understanding the input/output contract between the grid harness and your router/worker pipelines is critical for writing conforming components.
The Router Contract
The router is a pipeline that receives MultimodalContent and must write a DistributionGridDirective into the contentβs metadata before returning.
What the Router Receives
The router receives envelope.content β which is MultimodalContent. At origin, this is the callerβs input. At downstream nodes, it contains the accumulated task state.
The router also has access to the full DistributionGridEnvelope through the hook context, which includes:
envelope.taskIdβ Stable task identifierenvelope.originNodeIdβ Node that created the taskenvelope.hopHistoryβ Audit trail of all prior hopsenvelope.currentObjectiveβ Current task text (may differ from original at downstream nodes)envelope.attributesβ Extensible metadata map
What the Router Must Output
The router must set a DistributionGridDirective via the helper method setDistributionGridDirective(). The grid reads this after your pipeline returns.
Minimal working router that always runs locally:
val routerPipeline = Pipeline().apply {
pipelineName = "grid-router"
add(BedrockPipe().apply {
setPipeName("router")
setJsonOutput(DistributionGridDirective())
setSystemPrompt("""
You are a DistributionGrid router. Your job is to return a routing directive.
Return this JSON: {"kind": "RUN_LOCAL_WORKER", "notes": "default routing"}
Do not change the kind β always route to local worker for now.
""".trimIndent())
setJsonTransformer { json ->
val directive = deserialize<DistributionGridDirective>(json)
content.setDistributionGridDirective(directive)
content
}
})
}
Router that can dispatch to peers:
val routerPipeline = Pipeline().apply {
pipelineName = "grid-router"
add(BedrockPipe().apply {
setPipeName("router")
setJsonOutput(DistributionGridDirective())
setSystemPrompt("""
You are a DistributionGrid router.
Analyze the task and choose a directive:
- {"kind": "RUN_LOCAL_WORKER", "notes": "..."} β run on local worker
- {"kind": "HAND_OFF_TO_PEER", "targetPeerId": "peer-key", "notes": "..."} β send to peer
If the task is simple and self-contained, run locally.
If the task requires specialized capabilities from another peer, dispatch there.
""".trimIndent())
setJsonTransformer { json ->
val directive = deserialize<DistributionGridDirective>(json)
content.setDistributionGridDirective(directive)
content
}
})
}
Directive Resolution
The grid reads the directive from the contentβs metadata. The setDistributionGridDirective() helper abstracts the magic string "distributionGridDirective" so you donβt need to remember it.
To read a directive back (e.g., in a hook):
hooks {
beforePeerDispatch { envelope ->
val directive = envelope.content.getDistributionGridDirective()
// ...
envelope
}
}
If the router doesnβt write a directive, the grid falls back to DistributionGridDirective(kind = RUN_LOCAL_WORKER).
The Worker Contract
The worker is a pipeline that receives MultimodalContent and returns the work result as MultimodalContent.
What the Worker Receives
The worker receives envelope.content β the same MultimodalContent that the router received and potentially modified. The worker should not need to understand the envelope structure.
What the Worker Must Output
The worker should return its result as MultimodalContent (with text containing the output). The grid wraps this in a DistributionGridOutcome and returns it to the caller.
Minimal working worker:
val workerPipeline = Pipeline().apply {
pipelineName = "grid-worker"
add(BedrockPipe().apply {
setPipeName("worker")
setSystemPrompt("""
You are a DistributionGrid worker. Execute the task and return your result.
Return the best answer you can produce.
""".trimIndent())
})
}
DSL Settings That Affect the Contract
| Setting | Effect on Contract |
|---|---|
routing { maxHopCount(n) } | Limits how many times a task can hop. Workers donβt need to track this; the grid enforces it. |
routing { allowRetrySamePeer(true/false) } | Whether a failed peer dispatch retries the same peer. |
routing { allowRemotePcpForwarding(true/false) } | Whether PCP payloads are forwarded to remote nodes. Affects what workers see. |
memory { outboundTokenBudget(n) } | Shapes outbound memory. The router receives truncated context if the budget is tight. |
memory { summaryBudget(n) } | Budget for memory summarization if enabled. |
hooks { beforeRoute { } } | Intercepts before router runs. Can modify content or add attributes. |
hooks { beforeLocalWorker { } } | Intercepts after router returns RUN_LOCAL_WORKER. |
hooks { afterLocalWorker { } } | Intercepts after worker completes. Can add execution notes. |
hooks { beforePeerDispatch { } } | Intercepts before peer handoff. Can set PCP forwarding flag. |
hooks { afterPeerResponse { } } | Intercepts after peer response. |
killSwitch(input, output, onTripped) | Halts execution if token limits are exceeded. |
concurrencyMode(ISOLATED) | Required for P2P exposure. Each request gets a fresh grid state. |
Envelope Lifecycle Contract
The DistributionGridEnvelope flows through the grid:
- Created at origin β
taskId(UUID),originNodeId,originTransportset at creation - Wrapped at each hop β
senderNodeId,senderTransportupdated before dispatch - Worker receives only content β Worker sees
MultimodalContent, not the envelope - Results wrapped in outcome β Grid produces
DistributionGridOutcomewithfinalContent,hopCount,completionNotes
Failure Handling Contract
Workers should return well-formed output. Failures are tracked in DistributionGridFailure:
data class DistributionGridFailure(
var kind: DistributionGridFailureKind = DistributionGridFailureKind.UNKNOWN,
var sourceNodeId: String = "",
var targetNodeId: String = "",
var reason: String = "",
var retryable: Boolean = false
)
Failure kinds: HANDSHAKE_REJECTED, SESSION_REJECTED, TRUST_REJECTED, POLICY_REJECTED, ROUTING_FAILURE, WORKER_FAILURE, TRANSPORT_FAILURE, VALIDATION_FAILURE, DURABILITY_FAILURE, UNKNOWN.
The grid can retry retryable = true failures based on routingPolicy. Non-retryable failures terminate the task.
Checkpoint Contract
If setDurableStore(...) is configured, the grid checkpoints at:
before-peer-dispatchβ Before sending to a peerafter-local-workerβ After local worker completesafter-peer-responseβ After receiving peer response
Your durable store implementation must handle checkpointState(envelope, reason) and resumeState(taskId). On resume, the grid reconstructs the envelope and continues from the checkpointed point.
Internal Data Structures
DistributionGridEnvelope
The envelope is the core data structure that flows through the grid. Itβs a @Serializable data class:
data class DistributionGridEnvelope(
var taskId: String = "", // Stable task ID (UUID generated at origin)
var originNodeId: String = "", // Node that originally created this task
var originTransport: P2PTransport = P2PTransport(),
var senderNodeId: String = "", // Node that sent this hop
var senderTransport: P2PTransport = P2PTransport(),
var currentNodeId: String = "", // Node currently processing this envelope
var currentTransport: P2PTransport = P2PTransport(),
var content: MultimodalContent = MultimodalContent(), // The actual task content
var taskIntent: String = "", // Original task text (set at origin)
var currentObjective: String = "", // Current task text (may change at each hop)
var routingPolicy: DistributionGridRoutingPolicy = DistributionGridRoutingPolicy(),
var tracePolicy: DistributionGridTracePolicy = DistributionGridTracePolicy(),
var credentialPolicy: DistributionGridCredentialPolicy = DistributionGridCredentialPolicy(),
var executionNotes: MutableList<String> = mutableListOf(), // Human-readable log
var hopHistory: MutableList<DistributionGridHopRecord> = mutableListOf(), // Audit trail
var completed: Boolean = false, // Terminal flag
var latestOutcome: DistributionGridOutcome? = null, // Final result
var latestFailure: DistributionGridFailure? = null, // Failure record
var durableStateKey: String = "", // Checkpoint key for durability
var sessionRef: DistributionGridSessionRef? = null, // Negotiated session
var attributes: MutableMap<String, String> = mutableMapOf() // Extensible metadata
)
The envelope is not what you send to the router or worker. Instead, the grid extracts envelope.content and passes that as MultimodalContent to your pipelines. After your pipeline runs, the grid reads back the result from content.text and content.metadata.
DistributionGridDirective
The router writes its decision into content.metadata["distributionGridDirective"] as a DistributionGridDirective:
data class DistributionGridDirective(
var kind: DistributionGridDirectiveKind = DistributionGridDirectiveKind.RUN_LOCAL_WORKER,
var targetNodeId: String = "", // For HAND_OFF_TO_PEER: target node ID
var targetPeerId: String = "", // For HAND_OFF_TO_PEER: peer key to dispatch to
var targetTransport: P2PTransport? = null, // Optional explicit transport target
var notes: String = "", // Human-readable router notes
var alternatePeerIds: MutableList<String> = mutableListOf(), // Fallback peers
var rejectReason: String = "" // For REJECT/TERMINATE: why
)
Directive kinds:
| Kind | Meaning |
|---|---|
RUN_LOCAL_WORKER | Run the task on the local worker |
HAND_OFF_TO_PEER | Send to a peer (set targetPeerId or targetNodeId) |
RETURN_TO_SENDER | Return to the immediate sender |
RETURN_TO_ORIGIN | Return all the way to the origin node |
RETURN_TO_TRANSPORT | Return to a specific transport address |
RETRY_SAME_PEER | Retry the same peer (after failure) |
TRY_ALTERNATE_PEER | Try the next peer in alternatePeerIds |
REJECT | Reject the task (policy violation) |
TERMINATE | Terminate the task (unrecoverable) |
DistributionGridOutcome
When the grid reaches a terminal state, it produces a DistributionGridOutcome:
data class DistributionGridOutcome(
var status: DistributionGridOutcomeStatus = DistributionGridOutcomeStatus.SUCCESS,
var returnMode: DistributionGridReturnMode = DistributionGridReturnMode.RETURN_TO_SENDER,
var taskId: String = "",
var finalContent: MultimodalContent = MultimodalContent(), // The terminal result
var completionNotes: String = "",
var hopCount: Int = 0,
var finalNodeId: String = "",
var terminalFailure: DistributionGridFailure? = null
)
DistributionGridFailure
Failures are recorded as:
data class DistributionGridFailure(
var kind: DistributionGridFailureKind = DistributionGridFailureKind.UNKNOWN,
var sourceNodeId: String = "",
var targetNodeId: String = "",
var transportMethod: Transport = Transport.Tpipe,
var transportAddress: String = "",
var reason: String = "",
var policyCause: String = "",
var retryable: Boolean = false
)
Failure kinds: HANDSHAKE_REJECTED, SESSION_REJECTED, TRUST_REJECTED, POLICY_REJECTED, ROUTING_FAILURE, WORKER_FAILURE, TRANSPORT_FAILURE, VALIDATION_FAILURE, DURABILITY_FAILURE, UNKNOWN.
Hook Points Reference
Hooks let you intercept the execution at specific points. All hooks receive a DistributionGridEnvelope and return a (possibly modified) envelope.
beforeRoute
Fires before the router runs.
hooks {
beforeRoute { envelope ->
// Modify content, add attributes, or log before routing decision
envelope.attributes["custom-flag"] = "value"
envelope
}
}
beforeLocalWorker
Fires before the local worker runs (after router returned RUN_LOCAL_WORKER).
hooks {
beforeLocalWorker { envelope ->
// Inspect or modify content before worker execution
envelope
}
}
afterLocalWorker
Fires after the local worker completes successfully.
hooks {
afterLocalWorker { envelope ->
// Modify result content, add execution notes
envelope.executionNotes.add("Worker completed at ${System.currentTimeMillis()}")
envelope
}
}
beforePeerDispatch
Fires before sending to a peer (after router returned HAND_OFF_TO_PEER).
hooks {
beforePeerDispatch { envelope ->
// Can set PCP forwarding flag here
// envelope.attributes["distributionGridAllowRemotePcpForwarding"] = "true"
envelope
}
}
afterPeerResponse
Fires after receiving a response from a peer.
hooks {
afterPeerResponse { envelope ->
// Inspect or modify the peer response before finalization
envelope
}
}
outboundMemory
Fires before outbound memory shaping (if memory policy is configured).
hooks {
outboundMemory { envelope ->
// Custom memory shaping logic
envelope
}
}
failure
Fires when a failure is recorded.
hooks {
failure { envelope ->
// Log or annotate the failure
envelope.executionNotes.add("Failure at ${envelope.latestFailure?.reason}")
envelope
}
}
outcomeTransformation
Fires when producing the final terminal content.
hooks {
outcomeTransformation { content, envelope ->
// Transform final output
content
}
}
DSL Builder
The Kotlin DSL is the preferred way to assemble a grid:
import com.TTT.Pipeline.distributionGrid
import com.TTT.P2P.P2PConcurrencyMode
val grid = distributionGrid {
// P2P identity for this node
p2p {
agentName("my-grid-node")
transportAddress("my-grid-node")
transportMethod(Transport.Tpipe)
}
// Router and worker (required)
router(routerPipeline)
worker(workerPipeline)
// Routing policy
routing {
allowRetrySamePeer(true)
maxRetryCount(1)
maxHopCount(8)
allowRemotePcpForwarding(false)
}
// Memory policy
memory {
outboundTokenBudget(4096)
summaryBudget(512)
}
// Tracing
tracing {
enabled()
}
// Orchestration hooks
hooks {
beforeRoute { envelope -> envelope }
afterLocalWorker { envelope -> envelope }
}
// P2P concurrency
concurrencyMode(P2PConcurrencyMode.ISOLATED)
// Kill switch
killSwitch(inputTokenLimit = 100000, outputTokenLimit = 10000)
}
The DSL returns an initialized grid ready for execute().
Manual Assembly
For manual assembly without DSL:
import com.TTT.Pipeline.distributionGridBuilder
val grid = distributionGridBuilder()
.router(routerPipeline)
.worker(workerPipeline)
.setRoutingPolicy(DistributionGridRoutingPolicy().apply {
maxHopCount = 8
})
.setMemoryPolicy(DistributionGridMemoryPolicy().apply {
outboundTokenBudget = 4096
})
.enableTracing()
.concurrencyMode(P2PConcurrencyMode.ISOLATED)
.killSwitch(inputTokenLimit = 50000, outputTokenLimit = 5000)
.build()
Lifecycle Methods
| Method | Description |
|---|---|
init() | Validate bindings and initialize the grid |
pause() / resume() | Pause or resume execution |
isPaused() | Check pause state |
clearRuntimeState() | Clear session, pause flags, discovered state |
clearTrace() | Clear trace data |
resumeTask(taskId) | Resume a checkpointed task |
getTraceReport() | Get formatted trace output |
getFailureAnalysis() | Get structured failure report |
Peer and Registry Discovery
Adding a Local Peer
grid.addPeer(localPeerPipeline)
// With custom descriptor
grid.addPeer(localPeerPipeline, myDescriptor, myRequirements)
Adding an External Peer Descriptor
grid.addPeerDescriptor(externalPeerDescriptor)
Registry-based Discovery
// Add trusted bootstrap registry
grid.addBootstrapRegistry(registryAdvertisement)
// Register with registry and maintain lease
grid.registerWithRegistry(registryId, leaseRequest)
grid.renewRegistryLease(registryId, leaseId)
grid.tickRegistryMemberships()
// Query registry for nodes
grid.queryRegistries(registryQuery)
Durable Checkpoints
For long-running tasks that may be interrupted:
// Configure durable store
grid.setDurableStore(myDurableStore)
// Resume interrupted task
val result = grid.resumeTask(taskId)
The durable store interface:
interface DistributionGridDurableStore {
suspend fun checkpointState(envelope: DistributionGridEnvelope, reason: String)
suspend fun resumeState(taskId: String): DistributionGridDurableState?
}
Checkpoint reasons: before-peer-dispatch, after-local-worker, after-peer-response.
PCP Forwarding Policy
By default, PCP payloads are stripped before remote handoff. Enable via routing policy:
routing {
allowRemotePcpForwarding(true)
}
Or per-dispatch via hook:
hooks {
beforePeerDispatch { envelope ->
envelope.attributes["distributionGridAllowRemotePcpForwarding"] = "true"
envelope
}
}
P2P Concurrency
DistributionGrid is stateful β register with P2PConcurrencyMode.ISOLATED:
grid.concurrencyMode(P2PConcurrencyMode.ISOLATED)
P2PRegistry.register(grid)
See P2P Registry and Routing for details.
Common Startup Failures
DistributionGrid requires a router before init().
No router bound. Call setRouter(routerPipeline) or use router { } in DSL.
DistributionGrid requires a worker before init().
No worker bound. Call setWorker(workerPipeline) or use worker { } in DSL.
Peer 'X' is already registered locally.
Duplicate peer key. Call removePeer("X") first or use replacePeer("X", newPeer).
Router did not write a directive.
Router didnβt set content.metadata["distributionGridDirective"]. Ensure your router pipeline outputs a DistributionGridDirective JSON via setJsonOutput(DistributionGridDirective()).
Best Practices
- Router must output a directive β if your router pipeline doesnβt write to
content.metadata["distributionGridDirective"], the grid defaults toRUN_LOCAL_WORKER - Use the DSL β it handles validation and initialization in one place
- Worker overflow protection β configure token budgeting or truncation on worker pipes
- Use
P2PConcurrencyMode.ISOLATEDwhen registering via P2P - Configure durable store for tasks that take more than a few seconds
- Set
maxHopCountappropriately β too low and tasks wonβt reach destination, too high wastes resources - Enable tracing during development to see routing decisions and hop paths
Previous: β Manifold | Next: Junction β
Next Steps
- Cross-Cutting Topics - Continue to shared container concepts.