how-to ~10 min
DistributionGrid

`DistributionGrid` is TPipe's remote grid-harness container.

DistributionGrid

πŸ’‘ Tip: DistributionGrid is TPipe’s distributed grid harness. Think of it as a cluster of worker nodes where each node has a router (decides where work goes) and a worker (actually does the work). Tasks hop between nodes over P2P until complete.

Table of Contents

What DistributionGrid Is

DistributionGrid is a distributed task routing system. Each grid node is a single TPipe agent instance (running as a coroutine within the TPipe process) with:

  • A router pipeline β€” receives the task content, decides what to do next, writes a DistributionGridDirective back into content metadata
  • A worker pipeline β€” receives the task content, does the actual work, returns the result
  • Optional peers β€” other nodes this node can send work to

The grid handles:

  • Task routing β€” router decides: run locally, send to a peer, or forward to a remote node
  • Remote handoff β€” framing tasks as grid RPC over P2P
  • Session reuse β€” cached P2P sessions for repeated peer communication
  • Retry logic β€” routing policy controls whether to retry same peer or try another
  • Registry discovery β€” optional registry for finding downstream nodes
  • Durable checkpoints β€” save/resume for long-running or interruptible tasks

When to Use It

Use DistributionGrid when:

  • You have TPipe pipelines on different machines that need to coordinate
  • You want to distribute LLM work across a cluster of agents
  • You need reliability via retry and checkpoint mechanisms
  • You’re building a multi-agent system where one agent’s output feeds into another across machines

Don’t use it for simple single-machine fan-out β€” use Splitter instead.

How It Works: The Big Picture

End-to-End Flow

Caller                          Grid Node A                         Grid Node B
  β”‚                                   β”‚                                   β”‚
  β”‚  grid.execute(content)            β”‚                                   β”‚
  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ί β”‚                                   β”‚
  β”‚                                   β”‚  1. Wrap content in                 β”‚
  β”‚                                   β”‚     DistributionGridEnvelope      β”‚
  β”‚                                   β”‚                                   β”‚
  β”‚                                   β”‚  2. Run router pipeline            β”‚
  β”‚                                   β”‚     content = router.execute(content)
  β”‚                                   β”‚                                   β”‚
  β”‚                                   β”‚  3. Read directive from            β”‚
  β”‚                                   β”‚     content.metadata["distributionGridDirective"]
  β”‚                                   β”‚     β†’ RUN_LOCAL_WORKER            β”‚
  β”‚                                   β”‚     β†’ HAND_OFF_TO_PEER             β”‚
  β”‚                                   β”‚     β†’ RETURN_TO_SENDER             β”‚
  β”‚                                   β”‚     β†’ TERMINATE                   β”‚
  β”‚                                   β”‚                                   β”‚
  β”‚                                   β”‚  4a. RUN_LOCAL_WORKER:            β”‚
  β”‚                                   β”‚      Run worker pipeline           β”‚
  β”‚                                   β”‚      β†’ finalize as result          β”‚
  β”‚                                   β”‚                                   β”‚
  β”‚                                   β”‚  4b. HAND_OFF_TO_PEER:            β”‚
  β”‚                                   β”‚      Serialize envelope as         β”‚
  β”‚                                   β”‚      P2P request, send to peer    β”‚
  β”‚                                   β”œ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─► β”‚
  β”‚                                   β”‚                     5. Peer node   β”‚
  β”‚                                   β”‚                     receives P2P  β”‚
  β”‚                                   β”‚                     request       β”‚
  β”‚                                   β”‚                     runs router  β”‚
  β”‚                                   β”‚                     runs worker  β”‚
  β”‚                                   β”‚                     returns resultβ”‚
  β”‚                                   β”‚β—„ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ──
  β”‚                                   β”‚  6. Receive response, finalize     β”‚
  β”‚                                   β”‚     as terminal content           β”‚
  │◄──────────────────────────────────                                   β”‚
  β”‚  result                           β”‚                                   β”‚

Node Identity

Each node has:

  • A node ID β€” stable identifier for this node (used in originNodeId, currentNodeId)
  • A transport β€” how to reach this node (address + method)
  • A P2P descriptor β€” outward-facing identity published for discovery

Agent Contract

Understanding the input/output contract between the grid harness and your router/worker pipelines is critical for writing conforming components.

The Router Contract

The router is a pipeline that receives MultimodalContent and must write a DistributionGridDirective into the content’s metadata before returning.

What the Router Receives

The router receives envelope.content β€” which is MultimodalContent. At origin, this is the caller’s input. At downstream nodes, it contains the accumulated task state.

The router also has access to the full DistributionGridEnvelope through the hook context, which includes:

  • envelope.taskId β€” Stable task identifier
  • envelope.originNodeId β€” Node that created the task
  • envelope.hopHistory β€” Audit trail of all prior hops
  • envelope.currentObjective β€” Current task text (may differ from original at downstream nodes)
  • envelope.attributes β€” Extensible metadata map

What the Router Must Output

The router must set a DistributionGridDirective via the helper method setDistributionGridDirective(). The grid reads this after your pipeline returns.

Minimal working router that always runs locally:

val routerPipeline = Pipeline().apply {
    pipelineName = "grid-router"
    add(BedrockPipe().apply {
        setPipeName("router")
        setJsonOutput(DistributionGridDirective())
        setSystemPrompt("""
            You are a DistributionGrid router. Your job is to return a routing directive.

            Return this JSON: {"kind": "RUN_LOCAL_WORKER", "notes": "default routing"}

            Do not change the kind β€” always route to local worker for now.
        """.trimIndent())
        setJsonTransformer { json ->
            val directive = deserialize<DistributionGridDirective>(json)
            content.setDistributionGridDirective(directive)
            content
        }
    })
}

Router that can dispatch to peers:

val routerPipeline = Pipeline().apply {
    pipelineName = "grid-router"
    add(BedrockPipe().apply {
        setPipeName("router")
        setJsonOutput(DistributionGridDirective())
        setSystemPrompt("""
            You are a DistributionGrid router.

            Analyze the task and choose a directive:
            - {"kind": "RUN_LOCAL_WORKER", "notes": "..."} β†’ run on local worker
            - {"kind": "HAND_OFF_TO_PEER", "targetPeerId": "peer-key", "notes": "..."} β†’ send to peer

            If the task is simple and self-contained, run locally.
            If the task requires specialized capabilities from another peer, dispatch there.
        """.trimIndent())
        setJsonTransformer { json ->
            val directive = deserialize<DistributionGridDirective>(json)
            content.setDistributionGridDirective(directive)
            content
        }
    })
}

Directive Resolution

The grid reads the directive from the content’s metadata. The setDistributionGridDirective() helper abstracts the magic string "distributionGridDirective" so you don’t need to remember it.

To read a directive back (e.g., in a hook):

hooks {
    beforePeerDispatch { envelope ->
        val directive = envelope.content.getDistributionGridDirective()
        // ...
        envelope
    }
}

If the router doesn’t write a directive, the grid falls back to DistributionGridDirective(kind = RUN_LOCAL_WORKER).

The Worker Contract

The worker is a pipeline that receives MultimodalContent and returns the work result as MultimodalContent.

What the Worker Receives

The worker receives envelope.content β€” the same MultimodalContent that the router received and potentially modified. The worker should not need to understand the envelope structure.

What the Worker Must Output

The worker should return its result as MultimodalContent (with text containing the output). The grid wraps this in a DistributionGridOutcome and returns it to the caller.

Minimal working worker:

val workerPipeline = Pipeline().apply {
    pipelineName = "grid-worker"
    add(BedrockPipe().apply {
        setPipeName("worker")
        setSystemPrompt("""
            You are a DistributionGrid worker. Execute the task and return your result.
            Return the best answer you can produce.
        """.trimIndent())
    })
}

DSL Settings That Affect the Contract

SettingEffect on Contract
routing { maxHopCount(n) }Limits how many times a task can hop. Workers don’t need to track this; the grid enforces it.
routing { allowRetrySamePeer(true/false) }Whether a failed peer dispatch retries the same peer.
routing { allowRemotePcpForwarding(true/false) }Whether PCP payloads are forwarded to remote nodes. Affects what workers see.
memory { outboundTokenBudget(n) }Shapes outbound memory. The router receives truncated context if the budget is tight.
memory { summaryBudget(n) }Budget for memory summarization if enabled.
hooks { beforeRoute { } }Intercepts before router runs. Can modify content or add attributes.
hooks { beforeLocalWorker { } }Intercepts after router returns RUN_LOCAL_WORKER.
hooks { afterLocalWorker { } }Intercepts after worker completes. Can add execution notes.
hooks { beforePeerDispatch { } }Intercepts before peer handoff. Can set PCP forwarding flag.
hooks { afterPeerResponse { } }Intercepts after peer response.
killSwitch(input, output, onTripped)Halts execution if token limits are exceeded.
concurrencyMode(ISOLATED)Required for P2P exposure. Each request gets a fresh grid state.

Envelope Lifecycle Contract

The DistributionGridEnvelope flows through the grid:

  1. Created at origin β€” taskId (UUID), originNodeId, originTransport set at creation
  2. Wrapped at each hop β€” senderNodeId, senderTransport updated before dispatch
  3. Worker receives only content β€” Worker sees MultimodalContent, not the envelope
  4. Results wrapped in outcome β€” Grid produces DistributionGridOutcome with finalContent, hopCount, completionNotes

Failure Handling Contract

Workers should return well-formed output. Failures are tracked in DistributionGridFailure:

data class DistributionGridFailure(
    var kind: DistributionGridFailureKind = DistributionGridFailureKind.UNKNOWN,
    var sourceNodeId: String = "",
    var targetNodeId: String = "",
    var reason: String = "",
    var retryable: Boolean = false
)

Failure kinds: HANDSHAKE_REJECTED, SESSION_REJECTED, TRUST_REJECTED, POLICY_REJECTED, ROUTING_FAILURE, WORKER_FAILURE, TRANSPORT_FAILURE, VALIDATION_FAILURE, DURABILITY_FAILURE, UNKNOWN.

The grid can retry retryable = true failures based on routingPolicy. Non-retryable failures terminate the task.

Checkpoint Contract

If setDurableStore(...) is configured, the grid checkpoints at:

  • before-peer-dispatch β€” Before sending to a peer
  • after-local-worker β€” After local worker completes
  • after-peer-response β€” After receiving peer response

Your durable store implementation must handle checkpointState(envelope, reason) and resumeState(taskId). On resume, the grid reconstructs the envelope and continues from the checkpointed point.

Internal Data Structures

DistributionGridEnvelope

The envelope is the core data structure that flows through the grid. It’s a @Serializable data class:

data class DistributionGridEnvelope(
    var taskId: String = "",                    // Stable task ID (UUID generated at origin)
    var originNodeId: String = "",              // Node that originally created this task
    var originTransport: P2PTransport = P2PTransport(),
    var senderNodeId: String = "",             // Node that sent this hop
    var senderTransport: P2PTransport = P2PTransport(),
    var currentNodeId: String = "",             // Node currently processing this envelope
    var currentTransport: P2PTransport = P2PTransport(),
    var content: MultimodalContent = MultimodalContent(),  // The actual task content
    var taskIntent: String = "",                // Original task text (set at origin)
    var currentObjective: String = "",           // Current task text (may change at each hop)
    var routingPolicy: DistributionGridRoutingPolicy = DistributionGridRoutingPolicy(),
    var tracePolicy: DistributionGridTracePolicy = DistributionGridTracePolicy(),
    var credentialPolicy: DistributionGridCredentialPolicy = DistributionGridCredentialPolicy(),
    var executionNotes: MutableList<String> = mutableListOf(),  // Human-readable log
    var hopHistory: MutableList<DistributionGridHopRecord> = mutableListOf(),  // Audit trail
    var completed: Boolean = false,             // Terminal flag
    var latestOutcome: DistributionGridOutcome? = null,  // Final result
    var latestFailure: DistributionGridFailure? = null,      // Failure record
    var durableStateKey: String = "",          // Checkpoint key for durability
    var sessionRef: DistributionGridSessionRef? = null,      // Negotiated session
    var attributes: MutableMap<String, String> = mutableMapOf()  // Extensible metadata
)

The envelope is not what you send to the router or worker. Instead, the grid extracts envelope.content and passes that as MultimodalContent to your pipelines. After your pipeline runs, the grid reads back the result from content.text and content.metadata.

DistributionGridDirective

The router writes its decision into content.metadata["distributionGridDirective"] as a DistributionGridDirective:

data class DistributionGridDirective(
    var kind: DistributionGridDirectiveKind = DistributionGridDirectiveKind.RUN_LOCAL_WORKER,
    var targetNodeId: String = "",              // For HAND_OFF_TO_PEER: target node ID
    var targetPeerId: String = "",              // For HAND_OFF_TO_PEER: peer key to dispatch to
    var targetTransport: P2PTransport? = null,  // Optional explicit transport target
    var notes: String = "",                     // Human-readable router notes
    var alternatePeerIds: MutableList<String> = mutableListOf(),  // Fallback peers
    var rejectReason: String = ""               // For REJECT/TERMINATE: why
)

Directive kinds:

KindMeaning
RUN_LOCAL_WORKERRun the task on the local worker
HAND_OFF_TO_PEERSend to a peer (set targetPeerId or targetNodeId)
RETURN_TO_SENDERReturn to the immediate sender
RETURN_TO_ORIGINReturn all the way to the origin node
RETURN_TO_TRANSPORTReturn to a specific transport address
RETRY_SAME_PEERRetry the same peer (after failure)
TRY_ALTERNATE_PEERTry the next peer in alternatePeerIds
REJECTReject the task (policy violation)
TERMINATETerminate the task (unrecoverable)

DistributionGridOutcome

When the grid reaches a terminal state, it produces a DistributionGridOutcome:

data class DistributionGridOutcome(
    var status: DistributionGridOutcomeStatus = DistributionGridOutcomeStatus.SUCCESS,
    var returnMode: DistributionGridReturnMode = DistributionGridReturnMode.RETURN_TO_SENDER,
    var taskId: String = "",
    var finalContent: MultimodalContent = MultimodalContent(),  // The terminal result
    var completionNotes: String = "",
    var hopCount: Int = 0,
    var finalNodeId: String = "",
    var terminalFailure: DistributionGridFailure? = null
)

DistributionGridFailure

Failures are recorded as:

data class DistributionGridFailure(
    var kind: DistributionGridFailureKind = DistributionGridFailureKind.UNKNOWN,
    var sourceNodeId: String = "",
    var targetNodeId: String = "",
    var transportMethod: Transport = Transport.Tpipe,
    var transportAddress: String = "",
    var reason: String = "",
    var policyCause: String = "",
    var retryable: Boolean = false
)

Failure kinds: HANDSHAKE_REJECTED, SESSION_REJECTED, TRUST_REJECTED, POLICY_REJECTED, ROUTING_FAILURE, WORKER_FAILURE, TRANSPORT_FAILURE, VALIDATION_FAILURE, DURABILITY_FAILURE, UNKNOWN.

Hook Points Reference

Hooks let you intercept the execution at specific points. All hooks receive a DistributionGridEnvelope and return a (possibly modified) envelope.

beforeRoute

Fires before the router runs.

hooks {
    beforeRoute { envelope ->
        // Modify content, add attributes, or log before routing decision
        envelope.attributes["custom-flag"] = "value"
        envelope
    }
}

beforeLocalWorker

Fires before the local worker runs (after router returned RUN_LOCAL_WORKER).

hooks {
    beforeLocalWorker { envelope ->
        // Inspect or modify content before worker execution
        envelope
    }
}

afterLocalWorker

Fires after the local worker completes successfully.

hooks {
    afterLocalWorker { envelope ->
        // Modify result content, add execution notes
        envelope.executionNotes.add("Worker completed at ${System.currentTimeMillis()}")
        envelope
    }
}

beforePeerDispatch

Fires before sending to a peer (after router returned HAND_OFF_TO_PEER).

hooks {
    beforePeerDispatch { envelope ->
        // Can set PCP forwarding flag here
        // envelope.attributes["distributionGridAllowRemotePcpForwarding"] = "true"
        envelope
    }
}

afterPeerResponse

Fires after receiving a response from a peer.

hooks {
    afterPeerResponse { envelope ->
        // Inspect or modify the peer response before finalization
        envelope
    }
}

outboundMemory

Fires before outbound memory shaping (if memory policy is configured).

hooks {
    outboundMemory { envelope ->
        // Custom memory shaping logic
        envelope
    }
}

failure

Fires when a failure is recorded.

hooks {
    failure { envelope ->
        // Log or annotate the failure
        envelope.executionNotes.add("Failure at ${envelope.latestFailure?.reason}")
        envelope
    }
}

outcomeTransformation

Fires when producing the final terminal content.

hooks {
    outcomeTransformation { content, envelope ->
        // Transform final output
        content
    }
}

DSL Builder

The Kotlin DSL is the preferred way to assemble a grid:

import com.TTT.Pipeline.distributionGrid
import com.TTT.P2P.P2PConcurrencyMode

val grid = distributionGrid {
    // P2P identity for this node
    p2p {
        agentName("my-grid-node")
        transportAddress("my-grid-node")
        transportMethod(Transport.Tpipe)
    }

    // Router and worker (required)
    router(routerPipeline)
    worker(workerPipeline)

    // Routing policy
    routing {
        allowRetrySamePeer(true)
        maxRetryCount(1)
        maxHopCount(8)
        allowRemotePcpForwarding(false)
    }

    // Memory policy
    memory {
        outboundTokenBudget(4096)
        summaryBudget(512)
    }

    // Tracing
    tracing {
        enabled()
    }

    // Orchestration hooks
    hooks {
        beforeRoute { envelope -> envelope }
        afterLocalWorker { envelope -> envelope }
    }

    // P2P concurrency
    concurrencyMode(P2PConcurrencyMode.ISOLATED)

    // Kill switch
    killSwitch(inputTokenLimit = 100000, outputTokenLimit = 10000)
}

The DSL returns an initialized grid ready for execute().

Manual Assembly

For manual assembly without DSL:

import com.TTT.Pipeline.distributionGridBuilder

val grid = distributionGridBuilder()
    .router(routerPipeline)
    .worker(workerPipeline)
    .setRoutingPolicy(DistributionGridRoutingPolicy().apply {
        maxHopCount = 8
    })
    .setMemoryPolicy(DistributionGridMemoryPolicy().apply {
        outboundTokenBudget = 4096
    })
    .enableTracing()
    .concurrencyMode(P2PConcurrencyMode.ISOLATED)
    .killSwitch(inputTokenLimit = 50000, outputTokenLimit = 5000)
    .build()

Lifecycle Methods

MethodDescription
init()Validate bindings and initialize the grid
pause() / resume()Pause or resume execution
isPaused()Check pause state
clearRuntimeState()Clear session, pause flags, discovered state
clearTrace()Clear trace data
resumeTask(taskId)Resume a checkpointed task
getTraceReport()Get formatted trace output
getFailureAnalysis()Get structured failure report

Peer and Registry Discovery

Adding a Local Peer

grid.addPeer(localPeerPipeline)

// With custom descriptor
grid.addPeer(localPeerPipeline, myDescriptor, myRequirements)

Adding an External Peer Descriptor

grid.addPeerDescriptor(externalPeerDescriptor)

Registry-based Discovery

// Add trusted bootstrap registry
grid.addBootstrapRegistry(registryAdvertisement)

// Register with registry and maintain lease
grid.registerWithRegistry(registryId, leaseRequest)
grid.renewRegistryLease(registryId, leaseId)
grid.tickRegistryMemberships()

// Query registry for nodes
grid.queryRegistries(registryQuery)

Durable Checkpoints

For long-running tasks that may be interrupted:

// Configure durable store
grid.setDurableStore(myDurableStore)

// Resume interrupted task
val result = grid.resumeTask(taskId)

The durable store interface:

interface DistributionGridDurableStore {
    suspend fun checkpointState(envelope: DistributionGridEnvelope, reason: String)
    suspend fun resumeState(taskId: String): DistributionGridDurableState?
}

Checkpoint reasons: before-peer-dispatch, after-local-worker, after-peer-response.

PCP Forwarding Policy

By default, PCP payloads are stripped before remote handoff. Enable via routing policy:

routing {
    allowRemotePcpForwarding(true)
}

Or per-dispatch via hook:

hooks {
    beforePeerDispatch { envelope ->
        envelope.attributes["distributionGridAllowRemotePcpForwarding"] = "true"
        envelope
    }
}

P2P Concurrency

DistributionGrid is stateful β€” register with P2PConcurrencyMode.ISOLATED:

grid.concurrencyMode(P2PConcurrencyMode.ISOLATED)
P2PRegistry.register(grid)

See P2P Registry and Routing for details.

Common Startup Failures

DistributionGrid requires a router before init().

No router bound. Call setRouter(routerPipeline) or use router { } in DSL.

DistributionGrid requires a worker before init().

No worker bound. Call setWorker(workerPipeline) or use worker { } in DSL.

Peer 'X' is already registered locally.

Duplicate peer key. Call removePeer("X") first or use replacePeer("X", newPeer).

Router did not write a directive.

Router didn’t set content.metadata["distributionGridDirective"]. Ensure your router pipeline outputs a DistributionGridDirective JSON via setJsonOutput(DistributionGridDirective()).

Best Practices

  • Router must output a directive β€” if your router pipeline doesn’t write to content.metadata["distributionGridDirective"], the grid defaults to RUN_LOCAL_WORKER
  • Use the DSL β€” it handles validation and initialization in one place
  • Worker overflow protection β€” configure token budgeting or truncation on worker pipes
  • Use P2PConcurrencyMode.ISOLATED when registering via P2P
  • Configure durable store for tasks that take more than a few seconds
  • Set maxHopCount appropriately β€” too low and tasks won’t reach destination, too high wastes resources
  • Enable tracing during development to see routing decisions and hop paths

Previous: ← Manifold | Next: Junction β†’

Next Steps