Pipe is the core TPipe class for single-step AI inference with model configuration and context management.
Pipe Class API
Table of Contents
Overview
The Pipe class is the core abstraction for AI model interactions in TPipe.
abstract class Pipe : P2PInterface, ProviderInterface
Public Properties
Configuration
pipeName
var pipeName: String = ""
Optional name for debugging, tracing, and identification.
jsonInput
var jsonInput: String = ""
JSON schema for expected input structure. Used for validation and prompt injection.
jsonOutput
var jsonOutput: String = ""
JSON schema for expected output structure. Used for validation and response formatting.
saveSnapshot
var saveSnapshot: Boolean = false
If true, automatically saves a snapshot to the content object’s snapshot system at pipe startup. Useful when you can’t control the content object directly or need guaranteed snapshot preservation.
enablePipeTimeout
var enablePipeTimeout: Boolean = false
Enables timeout tracking for this pipe. When true, pipe execution is monitored and can be aborted if duration exceeds pipeTimeout.
pipeTimeout
var pipeTimeout: Long = 300000L
Timeout duration in milliseconds (default 5 minutes). Pipe aborts if execution exceeds this duration.
timeoutStrategy
var timeoutStrategy: PipeTimeoutStrategy = PipeTimeoutStrategy.Fail
Strategy for handling timeouts: Fail (terminate), Retry (automatic retry with snapshot), or CustomLogic (developer-defined).
maxRetryAttempts
var maxRetryAttempts: Int = 5
Maximum number of retry attempts when timeoutStrategy = Retry. Pipe fails after exhausting attempts.
applyTimeoutRecursively
var applyTimeoutRecursively: Boolean = true
If true, timeout and retry settings propagate to child pipes (validator, branch, transformation, reasoning).
Function Hooks
preInitFunction
var preInitFunction: (suspend (MultimodalContent) -> Unit)? = null
Called before context loading. Allows input preprocessing.
preValidationFunction
var preValidationFunction: (suspend (ContextWindow, MultimodalContent?) -> ContextWindow)? = null
Called after context loading, before prompt injection. Allows context adjustments.
preValidationMiniBankFunction
var preValidationMiniBankFunction: (suspend (MiniBank, MultimodalContent?) -> MiniBank)? = null
Alternative to preValidationFunction for multi-page context scenarios.
preInvokeFunction
var preInvokeFunction: (suspend (MultimodalContent) -> Boolean)? = null
Determines if pipe should be skipped. Returns true to skip, false to continue.
postGenerateFunction
var postGenerateFunction: (suspend (MultimodalContent) -> Unit)? = null
Called immediately after AI generates content, before validation. Useful for caching or logging raw output.
exceptionFunction
var exceptionFunction: (suspend (MultimodalContent, Throwable) -> Unit)? = null
Called when exceptions occur during AI execution. Provides content state and exception for debugging.
Integration
containerPtr
var containerPtr: P2PInterface? = null
Container object to execute instead of this pipe. Enables delegation to complex orchestration.
currentPipelineId
var currentPipelineId: String? = null
ID of currently executing pipeline. Used for tracking and coordination.
parentInterface
var parentInterface: P2PInterface? = null
Parent P2PInterface reference for hierarchical traversal. Set automatically when this pipe is embedded in a container or assigned to a pipeline. Used for recursive operations and context propagation.
Public Functions
Configuration
init(): Pipe
Abstract initialization function that must be implemented by provider-specific pipe classes.
Behavior: Called to initialize provider-specific configurations, authentication, and model settings. Each provider implementation (BedrockPipe, OllamaPipe, etc.) must implement this method to set up their specific requirements. Returns the pipe instance for method chaining.
setPipeName(name: String): Pipe
Sets the pipe name for debugging and identification.
setPipelineRef(pipeline: Pipeline?): Pipe
Sets reference to parent pipeline.
setProvider(provider: ProviderName): Pipe
Sets the AI provider (AWS, OpenAI, etc.).
setModel(modelName: String): Pipe
Sets the AI model name.
setPromptMode(promptMode: PromptMode): Pipe
Sets how prompts are handled by the model.
Behavior: Affects context management and prompt construction. singlePrompt disables automatic context handling, chat relies on provider-side context, internalContext enables TPipe’s context system.
wrapContentWithConverse(role: ConverseRole = ConverseRole.agent): Pipe
Enables automatic conversation history wrapping for pipeline chaining.
Behavior: When enabled, the pipe automatically detects if input content is in ConverseHistory format and continues building the conversation. The pipe’s output is wrapped with the specified role and added to the conversation history. Essential for multi-turn conversations and agent-based systems. All pipes in a conversation chain should have this enabled.
allowEmptyUserPrompt(): Pipe
Disables safety guardrail that prevents empty user prompts.
Behavior: By default, TPipe blocks empty user prompts as they cause destructive bugs in 99% of cases. This method acts as a contractual promise that you’ve designed the pipe to handle empty prompts safely and won’t cause catastrophic pipeline damage.
allowEmptyContentObject(): Pipe
Disables safety guardrail that prevents completely empty content objects.
Behavior: By default, TPipe blocks content objects with no user prompt, text input, binary content, or context data as they’re footguns 99.99% of the time. This method is a contractual promise that this edge case is safe and has been properly handled.
forceSaveSnapshot(): Pipe
Forces the pipe to save a snapshot to the content object at startup.
Behavior: Sets saveSnapshot = true to ensure the content object is automatically snapshotted when the pipe begins execution. Useful when you can’t control the content object directly or need guaranteed snapshot preservation for branch failure recovery.
enablePipeTimeout(applyRecursively: Boolean = true, duration: Long = 300000, autoRetry: Boolean = false, retryLimit: Int = 5, customLogic: (suspend(pipe: Pipe, content: MultimodalContent) -> Boolean)? = null): Pipe
Enables timeout tracking and configures retry behavior for this pipe.
Parameters:
applyRecursively: If true, propagates settings to child pipes (validator, branch, transformation, reasoning)duration: Timeout duration in milliseconds (default 300000 = 5 minutes)autoRetry: If true, setstimeoutStrategy = Retryfor automatic retryretryLimit: Maximum retry attempts (default 5)customLogic: Optional custom retry function forCustomLogicstrategy
Behavior: Starts timeout tracking when pipe executes. On timeout, behavior depends on strategy:
Fail: Terminates immediately (default ifautoRetry = falseandcustomLogic = null)Retry: Automatically restores snapshot and retries up toretryLimittimesCustomLogic: InvokescustomLogicfunction to determine retry action
⚠️ Warning: Retry re-executes ALL pre-execution DITL functions (
preInitFunction,preValidationFunction, etc.). These functions MUST be read-only. Writing to ContextBank or program memory will cause duplicate writes on retry. See Timeout and Retry for details.
setRetryFunction(func: (suspend (pipe: Pipe, content: MultimodalContent) -> Boolean)?): Pipe
Sets custom retry logic function for CustomLogic timeout strategy.
Parameters:
func: Function receiving pipe reference and content, returns true to retry
Behavior: Called when timeout occurs and timeoutStrategy = CustomLogic. Function can analyze failure, repair content, or make external checks before deciding to retry. Return true to retry, false to fail.
Prompt Management
setSystemPrompt(prompt: String): Pipe
Sets the system prompt for the AI model.
Behavior: Stores as rawSystemPrompt and rebuilds with injections when applySystemPrompt() is called. JSON schemas, PCP context, and P2P agents are automatically injected during system prompt application.
setUserPrompt(prompt: String): Pipe
Sets the user prompt prefix.
enableSemanticCompression(): Pipe
Enables automatic semantic compression for the user prompt.
Behavior: Turns on the TokenBudgetSettings.compressUserPrompt path, creating or updating the pipe’s
token-budget configuration without overwriting the other budget fields. Use this when you want TPipe to apply
legend-backed prompt reduction before truncation.
enableSemanticDecompression(): Pipe
Reserves the semantic decompression hook for system-prompt injection.
Behavior: Flips the internal flag that applySystemPrompt() checks before prepending the decompression
prelude. When semantic compression is also enabled, TPipe inserts a short instruction block at the very top of
the rebuilt system prompt that tells the model that the user prompt was compressed using TPipe Semantic
Compression, explains that the compressed text should be reconstructed as closely as possible to the original
intent and data, explains the Legend: / code: phrase format and the blank-line boundary, instructs it to
read the legend first, expand repeated proper-noun codes, restore omitted glue words and syntax as faithfully
as possible, treat ¶ as a paragraph break, preserve quoted spans, and then continue with the rest of the
prompt.
compressPrompt(prompt: String, settings: SemanticCompressionSettings = SemanticCompressionSettings()): SemanticCompressionResult
Compresses a prompt string using TPipe’s semantic compression rules.
Behavior: Returns a compressed prompt body plus a legend that maps the short codes back to the original
repeated proper nouns. Quoted spans are preserved verbatim, the legend codes advance as AA, AB, AC, and
the proper-noun thresholds follow the semantic-compression spec: single tokens never code, 2-token names need 6
repeats, 3-token names need 4, 4- and 5-token names need 3, and 6+ token names need 2. The helper does not
mutate pipe state. This is the public opt-in wrapper around the same compressor used by
TokenBudgetSettings.compressUserPrompt.
setMiddlePrompt(prompt: String): Pipe
Sets prompt injected between input and output JSON schemas in the system prompt.
Behavior: The middle prompt is inserted after JSON input schema injection but before JSON output schema injection during applySystemPrompt(). Useful for providing context-specific instructions that should appear between the input/output format specifications.
setFooterPrompt(prompt: String): Pipe
Sets prompt added to the end of the system prompt after all other injections.
Behavior: The footer prompt is appended as the final element when applySystemPrompt() is called, appearing after JSON schemas, PCP context, P2P agents, and all other automatic injections. Ideal for final instructions or formatting requirements.
copySystemPromptToUserPrompt(): Pipe
Copies system prompt to user prompt for models that handle user prompts better.
Behavior: Creates a ConverseHistory with system prompt as developer role and user prompt as user role. Useful for models that don’t properly respect system prompts. The system prompt is cleared after copying to prevent duplication.
applySystemPrompt(): Pipe
Rebuilds system prompt with all injections and configurations.
Behavior: Critical function that reconstructs the system prompt by injecting the semantic decompression prelude, JSON schemas, PCP context, P2P agents, context instructions, todo lists, and custom footer text in a stable order. Must be called after changing JSON schemas or protocol settings to take effect.
Input/Output
setMultimodalInput(content: MultimodalContent): Pipe
Sets multimodal input content (text and binary data).
setJsonInput(json: String): Pipe
Sets JSON input schema as string.
Behavior: Automatically disables native JSON mode and injects the schema into the system prompt when applySystemPrompt() is called.
setJsonInput<T>(json: T, senddefaults: Boolean = true): Pipe
Sets JSON input schema from Kotlin object.
Behavior: Uses reflection to generate schema. If senddefaults is false, optional fields are excluded from the schema.
setJsonInput(kclass: KClass<*>): Pipe
Sets JSON input schema from KClass directly.
Behavior: Useful for primitives (e.g., String::class) or classes with private constructors that cannot be instantiated for the generic version.
setJsonInputInstructions(instructions: String): Pipe
Sets custom instructions for JSON input handling that override the default input format explanation.
Behavior: Replaces the default JSON input instructions that are injected into the system prompt when applySystemPrompt() is called. This helper also disables native JSON mode so the input schema is injected as prompt text. Use this to provide model-specific or domain-specific guidance on how to interpret the JSON input schema. If empty, uses TPipe’s default input instructions.
setJsonOutput(json: String): Pipe
Sets JSON output schema as string.
Behavior: Automatically disables native JSON mode and injects the schema into the system prompt when applySystemPrompt() is called.
setJsonOutput<T>(json: T): Pipe
Sets JSON output schema from Kotlin object.
setJsonOutput(kclass: KClass<*>): Pipe
Sets JSON output schema from KClass directly.
Behavior: Useful for primitives (e.g., String::class) or classes with private constructors that cannot be instantiated for the generic version.
setJsonOutputInstructions(instructions: String): Pipe
Sets custom instructions for JSON output handling that override the default output format explanation.
Behavior: Replaces the default JSON output instructions that are injected into the system prompt when applySystemPrompt() is called. This helper also disables native JSON mode so the output schema is injected as prompt text. Use this to provide specific formatting requirements, validation rules, or output constraints beyond the basic schema. If empty, uses TPipe’s default output instructions.
requireJsonPromptInjection(stripExternalText: Boolean = false): Pipe
Forces JSON prompt injection for models without native JSON support.
Behavior: Explicitly sets supportsNativeJson = false and optionally stripNonJson = true. JSON injector helpers call this behavior automatically for the common schema-driven path, but this method is still the way to opt into response stripping.
TodoList Integration
setTodoListPageKey(key: String): Pipe
Links this pipe to a TodoList stored in ContextBank.
Behavior: Sets the page key used to retrieve a TodoList from ContextBank. When applySystemPrompt() is called, the pipe automatically:
- Retrieves the TodoList using this key
- Adds instructions explaining the task format
- Serializes the TodoList to JSON
- Appends it to the system prompt
The AI receives the complete task list and can work on tasks accordingly. If the content object has metadata["todoTaskNumber"] set, that specific task is highlighted with additional focus instructions.
Example:
val pipe = BedrockPipe()
.setSystemPrompt("You are a task executor.")
.setTodoListPageKey("my-tasks")
.applySystemPrompt() // TodoList injected here
See TodoList API for complete TodoList documentation.
setTodoListInstructions(instructions: String): Pipe
Overrides the default instructions for TodoList injection.
Behavior: Replaces the default explanation of how to interpret the TodoList with custom instructions. Use this to guide the AI’s behavior when working with tasks.
Default instructions:
You will be provided with a todo list that has a list of tasks you have been
asked to complete. Each element on the list will contain a description of the
task, the requirements to verify it has been completed, and whether it has been
completed or not.
Example:
pipe.setTodoListInstructions("""
Below is your task checklist. Work through each item in order.
For each task, provide detailed findings and mark completion clearly.
""".trimIndent())
Properties:
todoPageKey
var todoPageKey: String = ""
The ContextBank page key for retrieving the TodoList. Set via setTodoListPageKey().
todoListInstructions
var todoListInstructions: String = ""
Custom instructions for TodoList interpretation. Set via setTodoListInstructions().
injectTodoList
var injectTodoList: Boolean = false
Internal flag indicating whether TodoList injection is enabled. Automatically set to true when setTodoListPageKey() is called.
Model Parameters
setTemperature(temp: Double): Pipe
Sets randomness/creativity level (0.0 to 2.0).
setTopP(top: Double): Pipe
Sets nucleus sampling probability threshold.
setTopK(top: Int): Pipe
Sets number of top tokens to consider.
setMaxTokens(max: Int): Pipe
Sets maximum output tokens.
enableMaxTokenOverflow(): Pipe
Allows incomplete output when hitting token limit.
Behavior: When enabled, pipes won’t fail if output is truncated due to token limits. Useful for generation tasks where partial output is acceptable.
setRepetitionPenalty(penalty: Double): Pipe
Sets penalty for repetitive text generation.
setStopSequences(seqs: List<String>): Pipe
Sets sequences that stop text generation.
Context Management
setContextWindowSize(window: Int): Pipe
Sets maximum input tokens for context.
Behavior: Affects token budgeting and context truncation. When combined with autoTruncateContext(), context is automatically trimmed to fit within this limit.
setContextWindowSettings(windowSettings: ContextWindowSettings): Pipe
Sets context truncation method.
Behavior: Controls how context is truncated: TruncateTop removes oldest entries, TruncateBottom removes newest, TruncateBoth removes from both ends.
setTokenBudget(budget: TokenBudgetSettings): Pipe
Sets advanced token budgeting configuration with support for dynamic user prompt allocation.
Behavior: Enables sophisticated token management with separate budgets for user prompt, system prompt, reasoning, and output. Automatically truncates content to fit within specified limits. Overrides simple contextWindowSize when set.
Dynamic User Prompt Allocation: When TokenBudgetSettings.userPromptSize is set to null, TPipe automatically calculates the required space based on the actual token count of the user prompt. This enables optimal space utilization by allocating exactly what’s needed for the user input and maximizing remaining space for context.
Semantic Compression: When TokenBudgetSettings.compressUserPrompt is true, TPipe will attempt semantic compression on the natural-language user prompt before truncation. This is designed for human language prompts only; structured payloads such as JSON, code, XML, or schema fragments are left to the standard budget and truncation path.
The default compressor lexicon is resource-backed, with stop-word and phrase tables loaded from
src/main/resources/semantic-compression/ and merged with any caller-provided additions.
Common contractions are expanded before function-word stripping, paragraph breaks survive as pilcrow markers
(¶) including tab-indented lines that start after a line break, and the audit helper can be used to surface
recurring prompt boilerplate that should be added to the lexicon next.
Fluent Builders: Call enableSemanticCompression() to turn this path on from the Pipe API, and call
enableSemanticDecompression() to prepend the decompression prelude that explains how the compressed prompt
should be expanded again before the rest of the system prompt is processed, including how to read ¶ as a
paragraph boundary.
Dynamic Allocation Process:
- Automatic Sizing: TPipe counts tokens in the actual user prompt and allocates that exact amount
- Overflow Handling: If the calculated size exceeds available space, TPipe automatically reduces the allocation and truncates the user prompt to fit (when
allowUserPromptTruncation = true) - Space Optimization: Remaining space after user prompt allocation is available for context data
- Cleanup: The
userPromptSizeis reset tonullafter processing to prevent issues in subsequent calls
Explicit vs Dynamic Allocation:
- Explicit (
userPromptSize = 12000): Reserves exactly 12,000 tokens for user input, predictable but may waste space - Dynamic (
userPromptSize = null): Allocates based on actual content size, optimal space usage but variable allocation
Tip: Set preserveTextMatches = true inside TokenBudgetSettings (or call enableTextMatchingPreservation()) to keep context elements and conversation history entries that match the user prompt before the rest of the truncation budget is applied.
setTokenCountingBias(value: Double): Pipe
Sets multiplicative adjustment for all token counts.
Behavior: Applies bias to token counting: adjustedTokens = round(rawTokens * (1.0 + bias)). Use positive values (0.1-0.3) to add safety margin when TPipe underestimates, negative values (-0.05 to -0.1) to maximize context when TPipe overestimates. Default is 0.0 (no adjustment).
Examples:
pipe.setTokenCountingBias(0.1) // Add 10% safety margin
pipe.setTokenCountingBias(0.2) // Add 20% safety margin
pipe.setTokenCountingBias(-0.05) // Reduce by 5% to maximize usage
pullGlobalContext(): Pipe
Enables pulling context from global context bank.
Behavior: Context is loaded from ContextBank using the configured pageKey. Loaded before pipeline context if both are enabled.
pullPipelineContext(): Pipe
Enables pulling context from parent pipeline.
Behavior: Overrides pullGlobalContext() if both are enabled. Context is merged from the parent pipeline’s context window.
pullBankedContext(): Pipe
Enables pulling banked context regardless of page key configuration.
Behavior: Forces the pipe to pull the default banked context from ContextBank even when page keys are configured. Useful when you need both the general banked context and specific page-keyed context simultaneously. Sets pullFromBankedContext = true internally.
updatePipelineContextOnExit(): Pipe
Updates pipeline context when pipe completes.
Behavior: Merges this pipe’s context back into the parent pipeline after execution. Uses emplaceLorebook setting to determine merge strategy.
autoInjectContext(instruction: String): Pipe
Automatically injects context into user prompt.
Behavior: Context is injected as text into the user prompt during execution. The instruction parameter explains how to interpret the context.
autoTruncateContext(fillMode: Boolean = false, fillAndSplitMode: Boolean = false): Pipe
Enables automatic context truncation with optional fill or fill-and-split selection.
Parameters:
fillMode: If true, enables select-and-fill lorebook selection during context truncation. When active, split budgets are applied after priority lorebook selection has filled with top-weighted entries.fillAndSplitMode: If true, enables fill mode and reserves a split budget for the rest of the top-level context. This keeps lorebook selection weighted first, then redistributes any unused lorebook budget to the rest of the context window.
Behavior: Context is automatically truncated during execution based on contextWindowSize and contextWindowTruncation settings. Essential for preventing token overflow. When fillMode is true, lorebook entries are prioritized and filled first before remaining budget is split between other context components. When fillAndSplitMode is true, the top-level context window reserves half of the budget for lorebook entries and half for the remaining context, but any unused lorebook space is reclaimed and handed to the rest of the context window while still preserving the existing context-elements vs conversation-history split.
enableTextMatchingPreservation(): Pipe
Ensures context elements and conversation history entries containing words from the latest prompt survive truncation before other content is considered.
Behavior: Sets TokenBudgetSettings.preserveTextMatches = true. When budgets are tight, text-matching entries get reserved tokens before the remaining content is truncated.
disableTextMatchingPreservation(): Pipe
Reverts to standard truncation ordering so every context element and conversation entry competes equally for budget.
Behavior: Sets TokenBudgetSettings.preserveTextMatches = false.
setPageKey(key: String): Pipe
Sets context bank page key for context isolation.
Behavior: If key contains commas, splits into multiple keys stored in pageKeyList. Multiple keys enable pulling context from different domains simultaneously.
enableDynamicFill(): Pipe
Enables dynamic budget redistribution for multi-page token budgeting. This is now the default strategy.
Behavior: Uses an iterative redistribution algorithm that:
- Initial Allocation: Starts with priority fill estimates
- Usage Simulation: Predicts actual token usage per page
- Dynamic Redistribution: Sends unused budget to high-need pages
- Iterative Optimization: Converges within a few passes
Benefits:
- Eliminates token waste when some pages cannot fill their allocations
- Maximizes context utilization across all page keys
- Adaptive redistribution based on actual content size
setMultiPageBudgetStrategy(strategy: MultiPageBudgetStrategy): Pipe
Sets the multi-page budget allocation strategy for advanced token budgeting.
Available strategies:
DYNAMIC_FILL: Iterative redistribution for optimal token utilization (default)EQUAL_SPLIT: Equal budget distribution across all pagesWEIGHTED_SPLIT: Budget distribution based on page weightsPRIORITY_FILL: Sequential allocation until pages are full
setPageWeights(weights: Map<String, Double>): Pipe
Sets page weight overrides for WEIGHTED_SPLIT strategy.
Usage:
pipe.setPageWeights(mapOf(
"critical" to 2.0, // Gets 2x normal allocation
"normal" to 1.0, // Gets 1x normal allocation
"background" to 0.5 // Gets 0.5x normal allocation
))
Usage Example:
val pipe = BedrockPipe()
.setPageKey("critical,normal,background")
.enableDynamicFill()
.autoTruncateContext()
Comparison to other strategies:
EQUAL_SPLIT: Static equal budget distribution (simple, can waste tokens)WEIGHTED_SPLIT: Static weighted distribution (proportionate, no redistribution)PRIORITY_FILL: Static priority-based allocation (good, but no reuse)DYNAMIC_FILL: Dynamic redistribution (adaptive, maximal utilization)
Token Counting
enableComprehensiveTokenTracking(): Pipe
Enables comprehensive token usage tracking for this pipe and its child pipes.
Behavior: When enabled, tracks detailed token consumption including input tokens, output tokens, and aggregated usage from all nested child pipes (validator, transformation, branch, and reasoning pipes). Provides comprehensive visibility into token usage across complex pipe hierarchies. Automatically initialized when tracing is enabled.
disableComprehensiveTokenTracking(): Pipe
Disables comprehensive token usage tracking and clears stored usage data.
Behavior: Turns off detailed token tracking and resets all stored usage data to free memory and disable the tracking overhead. Returns to basic token counting only.
getTokenUsage(): TokenUsage
Returns comprehensive usage data for this pipe and its children.
Behavior: Provides access to detailed token usage information when comprehensive tracking is enabled, or returns an empty TokenUsage object when tracking is disabled. Includes input/output tokens and child pipe usage.
TokenUsage Methods:
getUsageBreakdown(): String: Returns formatted breakdown of token usage for debugging purposes, showing parent pipe usage, child pipe usage, and totals in a readable format.
Usage Example:
pipe.enableComprehensiveTokenTracking()
val result = pipe.execute("Your prompt")
val usage = pipe.getTokenUsage()
println(usage.getUsageBreakdown())
// Output:
// Parent Pipe: 150 input, 75 output
// Child Pipes:
// validator: 25 input, 10 output
// transformer: 30 input, 15 output
// Total: 205 input, 100 output
getTotalInputTokens(): Int
Returns total input tokens consumed by this pipe and nested pipes when tracking is enabled.
Behavior: Includes input tokens from this pipe plus the recursive totals from all child pipes when comprehensive tracking is active. Returns 0 if tracking is disabled.
getTotalOutputTokens(): Int
Returns total output tokens consumed by this pipe and nested pipes when tracking is enabled.
Behavior: Includes output tokens from this pipe plus the recursive totals from all child pipes when comprehensive tracking is active. Returns 0 if tracking is disabled.
isComprehensiveTokenTrackingEnabled(): Boolean
Indicates whether comprehensive token tracking is enabled on this pipe.
Behavior: Allows external code to check if detailed token usage data is being collected and is available through the token usage methods.
setMultiplyWindowSizeBy(value: Int): Pipe
Sets token counting multiplier.
Behavior: Affects how Dictionary token counting estimates tokens. Higher values provide more conservative estimates.
setCountSubWordsInFirstWord(value: Boolean): Pipe
Controls subword counting in first word.
setFavorWholeWords(value: Boolean): Pipe
Prioritizes whole words in token counting.
setCountOnlyFirstWordFound(value: Boolean): Pipe
Counts only first occurrence of words.
setSplitForNonWordChar(value: Boolean): Pipe
Enables splitting on non-word characters.
setAlwaysSplitIfWholeWordExists(value: Boolean): Pipe
Forces splitting when whole words exist.
setCountSubWordsIfSplit(value: Boolean): Pipe
Counts subwords when splitting occurs.
setNonWordSplitCount(value: Int): Pipe
Sets count for non-word character splits.
truncateContextAsString(): Pipe
Enables string-based context truncation.
Behavior: Context is converted to a single string before truncation rather than truncating individual entries. More aggressive but can also chop content into a potentially incomplete state.
getTruncationSettings(): TruncationSettings
Returns the current truncation settings bundled with any multi-page configuration that has been applied to the pipe.
Behavior:
- Includes all token counting flags (multipliers, split rules, etc.)
- Mirrors the current
loreBookFillModestate through the returnedfillModeflag - Mirrors the current fill-and-split state through the returned
fillAndSplitModeflag - Surfaces any multi-page budget strategy and page weights pulled from
tokenBudgetSettings
Integration: Useful for passing a single settings object into the new helper functions such as selectAndFillLoreBookContextWithSettings() or string-based truncation helpers.
LoreBook
enableImmutableLoreBook(): Pipe
Prevents lorebook modifications during execution.
Behavior: Sets emplaceLorebook = false. Lorebook entries cannot be updated or replaced during context merging operations.
enableAppendLoreBookScheme(): Pipe
Enables append-only lorebook updates.
Behavior: Sets appendLoreBook = true. New information is added to existing lorebook entries rather than replacing them. Can lead to contradictory information but preserves historical data.
enableLoreBookFillMode(): Pipe
Enables the select-and-fill strategy for LoreBook selection used during context truncation.
Behavior: Sets loreBookFillMode = true. When enabled, selectAndTruncateContext() first runs selectAndFillLoreBookContext() using the full budget, then truncates context elements and conversation history with any remaining tokens, ensuring the lorebook crowding is capped by the requested token allocation.
Model Reasoning
setReasoning(): Pipe
Enables model reasoning/thinking mode.
Behavior: Enables basic reasoning mode. Behavior depends on model support - some models have native reasoning, others use TPipe’s reasoning pipe system.
setReasoning(tokens: Int): Pipe
Enables reasoning with token allocation.
Behavior: Sets modelReasoningSettingsV2 with specific token budget for reasoning. Reasoning output is truncated if it exceeds this limit.
setReasoning(custom: String): Pipe
Enables reasoning with custom settings.
Behavior: Sets modelReasoningSettingsV3 for vendor-specific reasoning configuration. Format depends on the AI provider.
Streaming
obtainStreamingCallbackManager(): StreamingCallbackManager
Gets or creates the streaming callback manager for this pipe.
Behavior: Lazy-initializes the manager on first access. Returns the manager instance for direct callback manipulation. Use this for dynamic callback management (adding/removing callbacks at runtime).
Example:
val manager = pipe.obtainStreamingCallbackManager()
manager.addCallback { chunk -> print(chunk) }
manager.removeCallback(someCallback)
streamingCallbacks(builder: StreamingCallbackBuilder.() -> Unit): Pipe (BedrockPipe)
Configures multiple streaming callbacks using builder pattern.
Behavior: Registers multiple independent callbacks to receive streaming chunks. Each callback can perform different operations (UI updates, logging, metrics) without interfering with each other. Supports configurable execution mode (sequential or concurrent) and automatic error isolation. Automatically enables streaming mode.
Example:
pipe.streamingCallbacks {
add { chunk -> print(chunk) }
add { chunk -> logToFile(chunk) }
add { chunk -> updateMetrics(chunk) }
concurrent() // or sequential()
onError { e, chunk -> println("Error: ${e.message}") }
}
Parameters:
builder: Lambda that configures the StreamingCallbackBuilder
Returns: This pipe instance for method chaining
See Also: Streaming Callbacks Guide
enableStreaming(callback: (suspend (String) -> Unit)? = null, showReasoning: Boolean = false): Pipe (BedrockPipe)
Enables streaming mode with optional callback.
Behavior: Switches to streaming API calls where tokens arrive incrementally. If callback is provided, it’s invoked for each chunk. If showReasoning is true, propagates streaming to reasoning pipes recursively.
Example:
pipe.enableStreaming { chunk -> print(chunk) }
setStreamingCallback(callback: suspend (String) -> Unit): Pipe (BedrockPipe)
Sets a suspending callback for streaming chunks.
Behavior: Registers a single callback that receives each text chunk as it arrives. Automatically enables streaming mode. Use this for async operations within the callback (database writes, network calls, etc.).
Example:
pipe.setStreamingCallback { chunk ->
delay(10)
logToDatabase(chunk)
}
setStreamingCallback(callback: (String) -> Unit): Pipe (BedrockPipe)
Sets a non-suspending callback for streaming chunks.
Behavior: Convenience overload for simple synchronous callbacks. Automatically wraps the callback in a suspending lambda. Use this for simple operations like printing or basic text accumulation.
Example:
pipe.setStreamingCallback { chunk -> print(chunk) }
disableStreaming(): Pipe (BedrockPipe)
Disables streaming mode and clears all callbacks.
Behavior: Switches back to standard (non-streaming) API calls. Clears both legacy single callback and all multi-callback manager callbacks to prevent memory leaks.
ℹ️ Note: Provider-specific methods (BedrockPipe) are available in provider implementations. Base Pipe class provides
obtainStreamingCallbackManager()andemitStreamingChunk()for all providers.
AWS Bedrock Guardrails (BedrockPipe)
AWS Bedrock Guardrails provide content moderation and safety controls. These methods are specific to BedrockPipe.
setGuardrail(identifier: String, version: String = "DRAFT", enableTrace: Boolean = false): BedrockPipe
Configures AWS Bedrock Guardrail for content filtering.
Parameters:
identifier: Guardrail ID or ARN from AWS Bedrock Consoleversion: Guardrail version number or “DRAFT” (default: “DRAFT”)enableTrace: Enable guardrail tracing for debugging (default: false)
Behavior: Guardrails automatically evaluate both user inputs and model responses against configured policies including content filters, denied topics, sensitive information filters, and word filters. Requires bedrock:ApplyGuardrail IAM permission.
Example:
val pipe = BedrockPipe()
.setRegion("us-east-1")
.setModel("anthropic.claude-3-sonnet-20240229-v1:0")
.setGuardrail("abc123def456", "1", enableTrace = true)
enableFullGuardrailTrace(): BedrockPipe
Enables comprehensive guardrail tracing including non-detected content.
Behavior: Provides enhanced debugging for content filters, denied topics, PII detection, and contextual grounding policies. Sets trace mode to “enabled_full”.
Example:
val pipe = BedrockPipe()
.setGuardrail("abc123def456", "DRAFT")
.enableFullGuardrailTrace()
clearGuardrail(): BedrockPipe
Removes guardrail configuration and disables content filtering.
Example:
pipe.clearGuardrail()
suspend fun applyGuardrailStandalone(content: String, source: String = "INPUT", fullOutput: Boolean = false): ApplyGuardrailResponse?
Evaluates content against configured guardrail without invoking foundation models.
Parameters:
content: Text content to evaluatesource: “INPUT” for user input or “OUTPUT” for model output (default: “INPUT”)fullOutput: Return full assessment including non-detected content (default: false)
Returns: ApplyGuardrailResponse containing action taken and detailed assessments, or null on failure.
Throws:
IllegalStateExceptionif guardrail is not configuredIllegalArgumentExceptionif client is not initialized
Behavior: Allows independent content validation at any stage of application flow. Useful for pre-validating user input, checking content at multiple pipeline stages, or implementing custom content moderation workflows. Requires prior guardrail configuration via setGuardrail().
Example:
runBlocking {
pipe.init()
val assessment = pipe.applyGuardrailStandalone(
content = userInput,
source = "INPUT"
)
when (assessment?.action) {
"GUARDRAIL_INTERVENED" -> println("Content blocked")
"NONE" -> println("Content passed checks")
}
}
See Also: AWS Bedrock Guardrails Guide for comprehensive documentation.
Function Hooks
infix fun setValidatorFunction(func: suspend (MultimodalContent) -> Boolean): Pipe
Sets function to validate AI output.
Behavior: Called after AI generation but before transformation. If returns false, triggers failure handling (branch pipe or onFailure function). Validation occurs before any post-processing. Can be called using infix notation: pipe setValidatorFunction { ... }.
setStringValidatorFunction(func: (String) -> Boolean): Pipe
Sets string-based validator function.
setTransformationFunction(func: suspend (MultimodalContent) -> MultimodalContent): Pipe
Sets function to transform AI output.
Behavior: Called after successful validation. Can modify the output content before it’s returned. If transformation fails, triggers failure handling.
setStringTransformationFunction(func: (String) -> String): Pipe
Sets string-based transformation function.
setPreInitFunction(func: suspend (MultimodalContent) -> Unit): Pipe
Sets function called before context loading.
Behavior: First function called during execution. Allows preprocessing input before any context merging or prompt construction occurs.
setPreValidationFunction(func: suspend (ContextWindow, MultimodalContent?) -> ContextWindow): Pipe
Sets function for context adjustment before validation.
Behavior: Called after context loading but before prompt injection. Last chance to modify context before AI execution. Overrides preValidationMiniBankFunction if both are set.
setPreValidationMiniBankFunction(func: suspend (MiniBank, MultimodalContent?) -> MiniBank): Pipe
Sets function for MiniBank context adjustment.
Behavior: Alternative to preValidationFunction when using multiple page keys. Only called if preValidationFunction is not set.
setPreInvokeFunction(func: suspend (MultimodalContent) -> Boolean): Pipe
Sets function to determine if pipe should be skipped.
Behavior: Called just before AI invocation. If returns true, pipe execution is skipped entirely and content passes through unchanged. Useful for conditional logic.
setPostGenerateFunction(func: suspend (MultimodalContent) -> Unit): Pipe
Sets function to execute immediately after AI generates content.
Behavior: Called right after AI generation, before any validation or transformation. Useful for caching raw output, logging, or capturing content before validation steps modify it.
setOnFailure(func: suspend (MultimodalContent, MultimodalContent) -> MultimodalContent): Pipe
Sets function to handle validation failures.
Behavior: Called when validation fails and no branch pipe is configured. Can attempt to repair the output or return alternative content. If returns content with terminatePipeline = true, pipeline stops.
setStringOnFailure(func: (String, String) -> Boolean): Pipe
Sets string-based failure handler.
setExceptionFunction(func: suspend (MultimodalContent, Throwable) -> Unit): Pipe
Sets function to handle exceptions during AI execution.
Parameters:
func: Function that receives the content state and exception when errors occur
Behavior: Called whenever an exception is thrown during generateContent() execution. Provides access to both the content object state and the exception for debugging purposes. Useful for logging, error recovery, or custom error handling logic.
Usage Example:
pipe.setExceptionFunction { content, exception ->
println("Exception in pipe: ${exception.message}")
println("Content state: ${content.text}")
// Custom error handling logic
}
Pipe Chaining
setValidatorPipe(pipe: Pipe): Pipe
Sets pipe to validate output using AI-based analysis.
Parameters:
pipe: The validator pipe to use for validation
Behavior: Validator pipe is executed with the AI-generated output. The validator pipe’s output is checked for termination status only - the actual content output is discarded. The original generated content flows to validatorFunction and all downstream operations. This allows AI-based validation analysis without modifying the content that continues through the pipeline.
⚠️ Important: The validator pipe’s text output and modifications are not passed forward. Only its termination flag (
shouldTerminate()) is respected.
setTransformationPipe(pipe: Pipe): Pipe
Sets pipe to transform output instead of transformation function.
Behavior: Transformation pipe is executed with the AI output after successful validation. Takes precedence over transformationFunction.
infix fun setBranchPipe(pipe: Pipe): Pipe
Sets pipe to handle validation failures.
Behavior: Executed when validation fails. Takes precedence over onFailure function. Branch pipe’s output is passed through transformation pipe and transformation function (if set) before becoming the final result. Can be called using infix notation: pipe setBranchPipe failurePipe.
setReasoningPipe(pipe: Pipe): Pipe
Sets pipe for chain-of-thought reasoning.
Behavior: Executed before the main AI call to generate reasoning content. Reasoning output is injected into the main prompt. Enables chain-of-thought for models without native reasoning support.
Protocols
setPcPContext(context: PcpContext): Pipe
Sets Pipe Context Protocol configuration.
Behavior: PCP tools are automatically injected into system prompt when applySystemPrompt() is called. Tools become available to the AI during execution.
setPcPDescription(description: String): Pipe
Sets custom PCP description that overrides the default tool explanation injected into the system prompt.
Behavior: Replaces TPipe’s default PCP tool description with custom instructions when applySystemPrompt() is called. Use this to provide model-specific guidance on how to use PCP tools or to customize the tool invocation format. If empty, uses TPipe’s default PCP instructions.
processPcpResponse(llmResponse: String): PcpExecutionResult
Processes PCP requests from LLM response.
Behavior: Parses AI output for PCP tool calls and executes them. Returns execution results that can be fed back to the AI or used by the application.
setP2PAgentList(agentList: MutableList<AgentDescriptor>): Pipe
Sets list of available P2P agents.
Behavior: Agent list is automatically injected into system prompt when applySystemPrompt() is called. Agents become available for the AI to call during execution.
setP2PDescription(description: String): Pipe
Sets custom P2P agent description that overrides the default agent explanation injected into the system prompt.
Behavior: Replaces TPipe’s default P2P agent description with custom instructions when applySystemPrompt() is called. Use this to provide model-specific guidance on how to interact with P2P agents or to customize the agent invocation format. If empty, uses TPipe’s default P2P instructions.
setContainerPtr(ptr: P2PInterface): Pipe
Sets container to execute instead of this pipe.
Behavior: When set, execute() calls delegate to the container’s executeLocal() method instead of running the pipe’s own logic. Enables complex orchestration patterns.
Tracing
enableTracing(config: TraceConfig = TraceConfig(enabled = true)): Pipe
Enables execution tracing with configuration and comprehensive token tracking.
Behavior: Emits trace events throughout pipe execution and automatically enables comprehensive token usage tracking to provide detailed token consumption data in traces. Trace detail level affects performance and memory usage. Events are stored globally and can be retrieved for debugging.
disableTracing(): Pipe
Disables execution tracing.
Multi-Stream Tracing
TPipe supports broadcasting trace events to multiple trace IDs simultaneously. This is useful for complex orchestration (like Splitters) where a pipe’s events should appear in both its individual trace and the orchestrator’s trace.
addTraceId(id: String)
Adds a trace ID to the active set. Events will be broadcast to this ID in addition to the pipe’s own ID.
removeTraceId(id: String)
Removes a trace ID from the active set. Events will no longer be broadcast to this ID.
Getters
getProviderEnum(): ProviderName
Returns the configured AI provider.
getModelName(): String
Returns the configured model name.
getP2PAgentList(): List<AgentDescriptor>?
Returns list of available P2P agents.
getTruncationSettings(): TruncationSettings
Returns current token truncation settings.
Behavior: Builds settings from current token counting configuration. Used internally for consistent token counting across the pipe.
getReasoningContent(): String
Returns reasoning content from last execution.
selectGlobalContextMode(): String
Returns selected global context mode.
countBinaryTokens(content: MultimodalContent, truncationSettings: TruncationSettings): Int
Counts tokens in binary content.
countTokens(input: Boolean, content: MultimodalContent): Int
Counts tokens in content for input or output estimation.
toPipeSettings(): PipeSettings
Converts pipe configuration to PipeSettings object.
Error Handling
hasError(): Boolean
Checks if pipe has captured an error during execution.
Returns: true if an error is stored, false otherwise.
Example:
val result = pipe.execute("input")
if (pipe.hasError()) {
println("Pipe failed: ${pipe.getErrorMessage()}")
}
getErrorMessage(): String
Gets the error message from the last captured error.
Returns: Error message string, or empty string if no error.
getErrorType(): TraceEventType?
Gets the type of error that occurred.
Returns: TraceEventType (PIPE_FAILURE, API_CALL_FAILURE, VALIDATION_FAILURE, TRANSFORMATION_FAILURE), or null if no error.
clearError()
Clears the stored error information.
Usage: Call before reusing a pipe to reset error state.
lastError: PipeError?
Direct access to the complete error object containing exception, event type, phase, pipe name, pipe ID, and timestamp.
ℹ️ Note: Errors are automatically captured when trace events with failure types are logged. The error persists until explicitly cleared or the pipe executes successfully.
Pipe State Inspection
These methods allow you to read the current configuration state of a pipe without modifying it. They are useful for validation logic, DSL builders, and debugging.
copyTokenBudgetSettings(): TokenBudgetSettings?
Returns a detached copy of the current token budget settings, or null if token budgeting is not configured.
Behavior: The returned copy is safe to inspect or reuse without affecting the pipe’s internal state. Map values (like pageWeights) are also copied.
val budget = pipe.copyTokenBudgetSettings()
if (budget != null)
{
println("Context window: ${budget.contextWindowSize}")
println("Max tokens: ${budget.maxTokens}")
}
isAutoTruncateContextEnabled(): Boolean
Returns true when legacy automatic truncation is enabled for this pipe.
if (pipe.isAutoTruncateContextEnabled())
{
println("Legacy auto-truncation is active")
}
hasContextOverflowProtectionConfigured(): Boolean
Returns true when the pipe has any overflow protection path configured — either token budgeting or legacy auto truncation.
Behavior: This is the method the Manifold DSL uses to validate that worker pipes have overflow protection before startup.
if (!pipe.hasContextOverflowProtectionConfigured())
{
throw IllegalStateException("Pipe needs overflow protection before use in a manifold")
}
getConfiguredContextWindowSize(): Int
Returns the current context window size in tokens.
val windowSize = pipe.getConfiguredContextWindowSize()
println("Context window: $windowSize tokens")
getConfiguredMaxTokens(): Int
Returns the current maximum output token count.
val maxTokens = pipe.getConfiguredMaxTokens()
println("Max output tokens: $maxTokens")
getSystemPromptText(): String
Returns the current system prompt text bound to this pipe.
val prompt = pipe.getSystemPromptText()
println("System prompt length: ${prompt.length} chars")
truncateModuleContextSuspend(): Pipe
Suspend-safe truncation entry point. Subclasses that need remote-aware lorebook selection (e.g., loading lorebook data from a remote MemoryServer) should override this instead of the synchronous truncateModuleContext().
Behavior: The default implementation delegates to truncateModuleContext(). Provider modules can override this to perform async operations during truncation.
// Called internally during pipe execution when autoTruncateContext is enabled
// Override in custom provider modules:
override suspend fun truncateModuleContextSuspend(): Pipe
{
// Perform async lorebook loading, then truncate
loadRemoteLorebook()
return truncateModuleContext()
}
Execution
execute(promptResult: String = ""): String
Executes pipe with string input, returns string output.
Behavior: Wraps input in MultimodalContent, calls multimodal execution, returns text result. Legacy method for backward compatibility.
execute(content: MultimodalContent): MultimodalContent
Executes pipe with multimodal content, returns multimodal result.
Behavior: Main execution method. Follows complete execution pipeline: pre-init → context loading → pre-validation → pre-invoke check → reasoning → AI generation → validation → transformation → branch handling. Execution can be short-circuited at multiple points based on configuration and validation results.
Error Handling: On failure, returns empty MultimodalContent with pipeError field populated. Check pipe.hasError() or result.hasError() to detect failures.
Next Steps
- Pipeline Class API - Continue into pipeline orchestration.