The persevere.plumbing Python package provides a typed interface to the
plumbing runtime. It calls the compiled OCaml binaries (plumb,
plumb-check) as subprocesses and communicates via JSON over
stdin/stdout.
From a source checkout, build the OCaml binaries first, then use
PYTHONPATH=python:
nix develop -c dune build
PYTHONPATH=python python3 -c 'from pathlib import Path; import persevere.plumbing as pb; print(pb.check(Path("examples/pydantic/pipeline.plumb")).bindings)'The persevere-plumbing PyPI package and pre-built wheels exist from the
earlier releases. Future wheel releases should identify the exact public
source tag that produced them.
To install from a local staged wheel instead:
pip install dist/python/persevere_plumbing-*.whl
pip install "pydantic-ai>=0.1" # optional extra used by the pydantic integrationThe package bundles the plumb and plumb-check binaries. If they
are not bundled (e.g. during development), the library searches the
dune build output and then PATH.
Validate a .plumb program and inspect its types, bindings, and
protocols.
import persevere.plumbing as pb
from pathlib import Path
result = pb.check(Path("pipeline.plumb"))
print(result.version) # language version
print(result.bindings) # list of Binding objects
print(result.types) # list of TypeDecl objects
print(result.protocols) # list of ProtocolDecl objectsEach Binding has:
name— process name (e.g."main")input,output— type stringsinput_type,output_type— type AST dicts (formap_type())impl— implementation kind ("agent","pipeline", etc.)attrs— configuration dict
Raises CheckError with structured fields (code, category,
detail, line, column) on validation failure.
Send a single input and collect all outputs.
results = pb.call_sync(Path("pipeline.plumb"), "hello")
# results is a list of output valuescall_sync() is a synchronous wrapper around the async call().
Use call() directly in async code:
results = await pb.call(Path("pipeline.plumb"), "hello")Parameters:
spec— path to the.plumbfileinput— any JSON-serialisable valueauto_approve— bypass tool approval prompts (defaultFalse)search_path— additional module search pathsbinary— override binary pathenv— additional environment variables for the subprocess
Raises PipelineError if the pipeline exits with a non-zero code.
The error includes stderr diagnostics.
Launch a long-lived pipeline for multi-turn conversations or streaming output.
import asyncio
import persevere.plumbing as pb
from pathlib import Path
async def main():
async with await pb.run(Path("doctor.plumb")) as pipeline:
await pipeline.send("I feel anxious")
response = await pipeline.recv()
print(response)
asyncio.run(main())| Method | Returns | Description |
|---|---|---|
send(message) |
None |
Send JSON to the pipeline's stdin |
recv() |
Any | None |
Next output value (skips telemetry and diagnostics) |
recv_event() |
Event | None |
Next output, telemetry, or diagnostic event |
recv_telemetry() |
Telemetry | None |
Next telemetry (skips output and diagnostics) |
close_input() |
None |
Close stdin (idempotent) |
shutdown(timeout) |
int |
Graceful shutdown, returns exit code |
The pipeline is also an async context manager (async with) and an
async iterator (async for event in pipeline).
Output(data)— primary output from the pipelineTelemetry(agent, payload)— agent telemetry (thinking, usage, tool calls, etc.)Diagnostic(line)— non-telemetry stderr line from the runtime or helpersEvent = Output | Telemetry | Diagnostic
Use recv_event() or async for event in pipeline when you need diagnostics.
The selective readers recv() and recv_telemetry() do not retain them.
spec— path to the.plumbfileauto_approve— bypass tool approval prompts (defaultFalse)verbose— enable telemetry streaming (defaultTrue)search_path— additional module search pathsbinary— override binary pathenv— additional environment variables
PlumbingModel is an interop adapter that wraps a plumbing pipeline
as a Pydantic AI Model. It is not the primary interface — use
call_sync() or run() for direct pipeline access.
import asyncio
from pathlib import Path
from pydantic_ai import Agent
from persevere.plumbing.provider import PlumbingModel
async def main():
async with PlumbingModel(Path("pipeline.plumb")) as model:
agent = Agent(model)
result = await agent.run("hello")
print(result.output)
asyncio.run(main())| Parameter | Type | Default | Description |
|---|---|---|---|
program |
str | Path |
required | Path to .plumb file |
pass_history |
bool |
False |
Send full conversation or just last message |
auto_approve |
bool |
False |
Bypass tool approval |
search_path |
list[Path] | None |
None |
Module search paths |
binary |
str | None |
None |
Override binary path |
env |
dict[str, str] | None |
None |
Extra environment variables |
result_type=stronly. Structured output types inject output tools thatPlumbingModelcannot handle. Use the bare API with type mapping (see below) for typed output.- No system prompts. Do not set
system_promptorinstructionson theAgent— they are silently discarded withpass_history=False, or contaminate the pipeline's prompt structure withpass_history=True. - Stateful. The same pipeline process persists across turns. The pipeline maintains its own conversation history internally.
- Lifecycle. Always use
async withto ensure the pipeline process is shut down cleanly.
With pass_history=False (the default), only the last user message
is sent to the pipeline on each turn. The pipeline maintains its own
state — prior messages are already in the agent's context.
async with PlumbingModel(Path("doctor.plumb")) as model:
agent = Agent(model)
r1 = await agent.run("I feel anxious")
r2 = await agent.run("Tell me more", message_history=r1.all_messages())The message_history is passed for Pydantic AI's bookkeeping, not
because the pipeline uses it.
The persevere.plumbing._types module maps plumbing type ASTs to Python types.
This is used internally by PlumbingModel and can be used directly
for typed validation.
| Plumbing type | Python type |
|---|---|
string |
str |
int |
int |
number |
float |
bool |
bool |
json |
Any |
unit |
NoneType |
[T] |
list[T] |
{ f: T, ... } |
Pydantic BaseModel |
A | B |
Union[A, B] |
(A, B) |
tuple[A, B] |
!T |
T (stream wrapper removed) |
from persevere.plumbing._types import input_type, output_type
result = pb.check(Path("pipeline.plumb"))
in_type = input_type(result) # Python type for main binding input
out_type = output_type(result) # Python type for main binding outputPlumbError (base)
├── CheckError — type-check failure (code, category, detail, line, column)
├── RunError — process launch failure (detail, returncode)
├── PipelineError — non-zero exit during execution (detail, returncode, stderr)
├── BinaryNotFoundError — plumb/plumb-check binary not found
└── PlumbingModelError — Pydantic AI integration error (detail, cause)
The persevere.plumbing.dsl subpackage provides Python combinators that
construct plumbing programs as an AST and render them as valid
.plumb source text. You write Python; the OCaml toolchain
type-checks and executes the result.
Current limitation: the Python DSL does not yet provide first-class
helpers for surfaced seeded channel declarations or the cons(x, xs)
expression intrinsic. Empty list literals may now appear in rendered
expressions where the surrounding plumbing type context makes them valid,
but seeded feedback-loop syntax itself is currently a handwritten .plumb
surface feature rather than a dedicated DSL combinator. The surfaced
narrow(T) binding form is also .plumb-only; the Python
DSL does not yet provide a dedicated Narrow(...) combinator.
from pydantic import BaseModel
from persevere.plumbing.dsl import Program, Agent, Filter, Map, F, PlumbBody, SumType, Record
class Verdict(BaseModel):
verdict: bool
commentary: str
draft: str
prog = Program()
prog.let("composer", input=str, output=str,
impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))
prog.let("checker", input=str, output=Verdict,
impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))
body = PlumbBody("input", "output")
body.wire("input", "composer", "checker")
body.wire("checker", Filter(F.verdict == True).project("draft"), "output")
prog.let("main", input=str, output=str, impl=body)
source = prog.render() # .plumb source text
result = prog.check() # CheckResult (validates via plumb-check)| DSL class | .plumb syntax |
Reference |
|---|---|---|
Agent(model=, ...) |
agent { ... } |
processes.md § Agent |
Tool(process=) |
tool { process: name } |
processes.md § Tool |
Filter(F.x >= 85) |
filter(x >= 85) |
processes.md § Filter |
Map(F.x, F.y) |
map({x: x, y: y}) |
processes.md § Map |
Id() / ID |
id |
processes.md § Identity |
Copy() / COPY |
copy |
processes.md § Copy |
Merge() / MERGE |
merge |
processes.md § Merge |
Discard() / DISCARD |
discard |
processes.md § Discard |
Barrier() / BARRIER |
barrier |
processes.md § Barrier |
Pipe(a, b) / a >> b |
a ; b |
Sequential composition |
Tensor(a, b) / a * b |
(a * b) |
Parallel composition |
PlumbBody("in", "out") |
plumb(in, out) { ... } |
Wiring body |
PortRef("p", "ctrl") |
p@ctrl |
Port addressing |
Record([("f", str)]) |
{ f: string } |
Record type |
SumType([A, B]) |
A | B |
Sum type |
ProductType([A, B]) |
(A, B) |
Product type |
StreamType(T) |
!T |
Stream type |
ListType(T) |
[T] |
List type |
Python types are accepted wherever a PlumbType is expected:
| Python | .plumb |
|---|---|
str |
string |
int |
int |
float |
number |
bool |
bool |
dict |
json |
None |
unit |
list[T] |
[T] |
BaseModel subclass |
TypeName (auto-registered) |
Port types on let() are automatically wrapped in StreamType —
almost every port is a stream. Use Bare(some_type) to suppress:
prog.let("tool_binding", input=Bare(str), output=Bare(int), impl=...)
# renders: let tool_binding : string -> int = ...The F object creates current-value selectors for filter/map expressions.
For collection predicates, use the helper functions in_, subset,
and distinct.
from persevere.plumbing.dsl import F, distinct, in_, subset
Filter(F.score >= 85) # filter(score >= 85)
Filter(F.verdict == False) # filter(verdict = false)
Filter((F.a > 0) & (F.b < 10)) # filter(a > 0 && b < 10)
Filter(F[1].grounded == True) # filter(.[1].grounded = true)
Filter(F.pair[1].grounded == True) # filter(pair[1].grounded = true)
Filter(in_(F.evidence_type, ["direct"])) # filter(evidence_type in ["direct"])
Filter(subset(["PCE.md"], F.context_paths)) # filter(["PCE.md"] subset context_paths)
Filter(distinct(F.related_paths)) # filter(distinct(related_paths))
Map(F.score, F.review) # map({score: score, review: review})
Map(quality=F.score) # map({quality: score})
Map(grounded=F[1].grounded) # map({grounded: .[1].grounded})Map(...) positional FieldRef arguments remain top-level identity
projections only. For selector expressions such as F[1].grounded, use
keyword arguments or a raw string expression.
# Type-check
result = prog.check()
# One-shot synchronous call
results = prog.call_sync("hello")
# Async streaming
async with await prog.run() as pipeline:
await pipeline.send("hello")
response = await pipeline.recv()When check() raises CheckError, the error references line/column
in the generated .plumb source. To debug:
source = prog.render()
print(source) # inspect the generated source
prog.check() # error will reference lines in the aboveUse Raw("verbatim text") for syntax the DSL does not model
(protocols, session types, complex builtins). PlumbBody.raw()
adds raw statements to a wiring body.
See examples/README.md for runnable scripts demonstrating each API pattern.