Streaming API Reference¶
Added in v0.5.1. Source-of-truth: the OCaml type
Types.llm_response_chunkinlib/core/types.ml. Phase C.1 design contract; Phases C.2 (FFI bridge) and C.3 (Python generator) implement this document.
This page is the API contract for streaming LLM output from PAR's Python binding. It locks the shape of invoke_stream, the Event tagged union, the backpressure strategy, and the threading model. If you are writing Python code that consumes tokens as they arrive, read the Usage Examples section. If you are implementing the FFI bridge, skip to the Implementation Notes at the end.
Overview¶
Streaming lets a caller consume an LLM's output token by token instead of waiting for the full response. For interactive UIs this cuts perceived latency from "wait 8 seconds, then dump 500 words" to "first token in 200 ms, then a steady drip." For long-running tool-call flows it also means the caller can cancel early once it sees enough.
PAR adds streaming in v0.5.1 by exposing the existing ?on_chunk parameter on Runtime.invoke through a new Python method, invoke_stream. The OCaml side has supported chunked output since v0.4.0 (see lib/core/types.ml line 509 for the llm_response_chunk ADT), but the FFI surface and Python wrapper have not yet shipped. This document defines what they should look like before any implementation code is written.
Provider support varies and is documented in the Provider Support section below. OpenAI, Anthropic, and Mock all stream text deltas and tool-call deltas; only Anthropic and Mock also emit usage updates.
Three alternatives considered¶
Three shapes were on the table when designing the Python surface. They are documented here so the choice is auditable, and so future maintainers don't relitigate it without context.
Option 1: Generator¶
The runtime exposes a method that returns a lazy iterator. Each next() call yields one Event. The caller drives consumption with a for loop.
def invoke_stream(self, agent_id: str, message: str) -> Iterator[Event]: ...
for event in rt.invoke_stream("agent", "hello"):
...
Pros
- Matches the OpenAI Python SDK convention (
stream=Truereturns an iterator ofChatCompletionChunk). Developers who have streamed from OpenAI before will write idiomatic PAR code on the first try. - Composes with the rest of Python:
list(stream),itertools.islice(stream, n),asyncio.run_in_executorwrappers, generator-based pipelines. - Backpressure is free. The OCaml side only produces the next chunk when the generator resumes, so a slow consumer cannot flood the queue.
- Resource cleanup maps cleanly onto
generator.close()andwithblocks. Afinallyclause in the generator body can cancel the underlying OCaml fiber. - Cancellation is just
break. Python's iterator protocol already handlesGeneratorExitpropagation.
Cons
- The caller must consume the iterator. If they call
invoke_streamand throw the result away, the OCaml side may keep running until its next chunk attempt blocks forever. Mitigation: afinallyclause in the generator that cancels the fiber, plus a__del__warning. - Error surface is split. Some failures raise from
next()(chunk-level errors), others from the initialinvoke_streamcall (parameter validation). This is the same as OpenAI's SDK, but worth noting. - Cross-thread handoff is required. The OCaml runtime invokes the C callback on its own fiber; the generator runs on the caller's thread. A queue must bridge them. This is unavoidable for any streaming shape that lets the caller consume on its own thread.
- Harder to layer additional callback-style hooks later (logging, metrics). Each layer has to be a generator wrapper rather than a function.
When it's the right choice. When the caller is Pythonic, wants natural for loops, and doesn't need to fan events out to multiple subscribers.
Option 2: Callback¶
The runtime exposes invoke with an on_event keyword that fires for each chunk. The caller passes a callable.
def invoke(self, agent_id: str, message: str,
on_event: Callable[[Event], None]) -> None: ...
rt.invoke("agent", "hello", on_event=print)
Pros
- Matches the OCaml-side shape directly.
?on_chunkatRuntime.invoke(lib/core/runtime.mlline 336) is already a callback parameter; the FFI can hand it straight through with no queue. - Simpler FFI. No iterator state machine on the Python side, no sentinel, no
_DONEprotocol. One callback pointer, one C entrypoint. - Easy to layer. A logger or metrics hook is just another callable composed via a small wrapper.
- Familiar to JavaScript and Java refugees who expect event-driven APIs.
Cons
- Un-Pythonic. Python developers reach for iterators first; callbacks feel like 2012-era
tornado.gen.engine. - Hard to cancel mid-stream. The callback cannot tell the runtime to stop without a side channel (an exception, a flag the runtime has to check). Exception-based cancellation is fragile because the callback might be running on the OCaml fiber's stack.
- No backpressure. If the callback is slow, the OCaml side blocks, but the caller has no way to apply backpressure upstream because they don't control the loop.
- Hard to collect results. The caller has to maintain their own buffer in a closure-captured list, which is ugly when the same callback is reused.
- Composition with
forloops, list comprehensions, andasynciorequires the caller to wrap the callback in their own queue-and-generator adapter. They will end up rebuilding Option 1.
When it's the right choice. When the caller is a non-Pythonic environment that already speaks in callbacks (an Electron host, a Java bridge), or when FFI simplicity matters more than caller ergonomics.
Option 3: Both¶
Expose both surfaces. The generator wraps the callback internally.
def invoke(self, ..., on_event: Optional[Callable[[Event], None]] = None) -> None: ...
def invoke_stream(self, ...) -> Iterator[Event]: ...
Pros
- Familiar to anyone who has used the OpenAI SDK (
stream=Trueonchat.completions.create) and to anyone who has used Anthropic's SDK (client.messages.stream()returns a context manager). - No wrong door. Either style works; callers pick what fits.
Cons
- Two APIs to test, document, and keep in sync. The v0.5.1 surface is small; doubling it for stylistic preference is not justified yet.
- The callback variant has the cancellation and backpressure problems noted under Option 2. Shipping it endorses those problems.
- Versioning hazard. If the generator evolves (per-chunk metadata, async variant), the callback has to evolve in lockstep or grow a second parameter set.
When it's the right choice. When the project is large enough that two distinct caller populations exist (Python application developers plus a non-Python host bridge), and the maintainer budget covers both.
Recommendation: generator (Option 1)¶
PAR's primary Python audience is backend engineers writing agent-powered services. They expect iterators, they reach for for event in ... by default, and they have already used the OpenAI SDK's stream=True. Option 1 matches that muscle memory.
Option 2 is rejected as the primary surface because its cancellation and backpressure problems are real and the FFI simplicity gain is a one-time cost. Option 3 is rejected for v0.5.1 because the maintainer budget does not cover two surfaces, and nothing prevents adding a callback-style wrapper in v0.6 if a real caller asks for it. A generator can be wrapped in a callback adapter in five lines; the reverse requires the full queue-plus-sentinel machinery this document specifies.
The rest of this document specifies Option 1 in full.
Event type¶
Event is a frozen-dataclass union mirroring the OCaml llm_response_chunk ADT at lib/core/types.ml line 509. Each constructor maps to one Python class. Field names match the OCaml record labels exactly so JSON round-trips are predictable.
from dataclasses import dataclass
from typing import Union
@dataclass(frozen=True)
class TextDelta:
"""A chunk of text from the LLM. Concatenate `text` across deltas."""
text: str
@dataclass(frozen=True)
class ToolCallStart:
"""The LLM is beginning a tool call. Followed by zero or more ToolCallDelta."""
tool_call_id: str
name: str
@dataclass(frozen=True)
class ToolCallDelta:
"""A fragment of the tool call's JSON arguments. Concatenate `args_json`."""
tool_call_id: str
args_json: str
@dataclass(frozen=True)
class UsageUpdate:
"""Token usage so far. Emitted at most once per stream, near the end."""
prompt_tokens: int
completion_tokens: int
total_tokens: int
@dataclass(frozen=True)
class Done:
"""The stream is complete. `finish_reason` is one of: stop, tool_calls, length, content_filter, max_iterations."""
finish_reason: str
Event = Union[TextDelta, ToolCallStart, ToolCallDelta, UsageUpdate, Done]
Invariants:
TextDeltaevents arrive in order. Concatenatetextto reconstruct the full assistant message.- A
ToolCallStartis followed by zero or moreToolCallDeltaevents with the sametool_call_id. Concatenateargs_jsonand parse the result as JSON to recover the tool call arguments. UsageUpdateis optional. OpenAI does not emit it; Anthropic and Mock do. Callers that display token usage must tolerate its absence.Doneis always the last event. The generator exits after yielding it. If the stream ends withoutDone(network error, cancellation), the generator raisesPARInvokeErrorfromnext().
API signature¶
from typing import Iterator
def invoke_stream(
self,
agent_id: str,
message: str,
) -> Iterator[Event]: ...
Notes:
- The method name carries the streaming semantic. There is no
stream=Trueflag oninvoke; callers who want non-streaming behavior useinvoke, callers who want streaming useinvoke_stream. Two methods, two intents, no boolean trap. - The return type is
Iterator[Event], notList[Event]. The iterator is lazy: the OCaml side does not produce chunk N+1 until the caller asks for it vianext(). - The first
next()call fetches all chunks from the OCaml work loop. If the invoke fails,PARInvokeErroris raised on first access. - All chunks arrive at once after the LLM completes (buffered, not incremental). This is a v0.5.1-beta limitation; true incremental streaming is deferred to v0.5.2.
- Keyword-only extensions (cancellation tokens, conversation IDs, RAG options) will be added in later versions under their own keyword arguments. The v0.5.1 signature is intentionally minimal.
Buffered chunk delivery¶
The OCaml work loop executes the full Runtime.invoke (including streaming) and buffers all chunks in a ref list. After the invoke completes, chunks are serialized as a JSON array and returned with the final result. Python parses the JSON and yields Events.
This means:
- No daemon thread, no queue, no ctypes callback — eliminating the domain lock crash that affected v0.5.1-beta's initial streaming implementation.
- All chunks are available immediately after the first __iter__ call.
- Memory usage scales with response length (all chunks in memory simultaneously).
- True incremental streaming (chunk-by-chunk delivery) is planned for v0.5.2 via a non-blocking start/poll/wait API.
Provider support¶
| Provider | Text streaming | Tool call streaming | Usage update | Notes |
|---|---|---|---|---|
`Openai |
Yes | Yes | No | OpenAI does not emit token counts during streaming; the UsageUpdate event will not appear. Callers that need usage must fall back to non-streaming invoke. |
`Anthropic |
Yes | Yes | Yes | Verify against lib/llm/anthropic_provider.ml when implementing C.2. Anthropic's stream messages include message_delta with usage blocks. |
`Mock |
Yes | Yes | Yes | The mock provider emits all five event types. Use it as the streaming test fixture. |
`Ollama |
Yes | Unknown | Unknown | Not validated for streaming in v0.5.1. Test before relying on it. |
If a provider does not support streaming natively, the runtime falls back to emitting a single TextDelta with the full response followed by Done. The caller should not assume chunks are small.
Usage examples¶
Three runnable examples covering the patterns you will actually need: a basic token stream, a tool-call stream that reconstructs the call arguments, and an error-handling wrapper that catches provider failures without leaking partial output.
Example 1: print tokens as they arrive¶
The most common shape. Iterate the generator, match on TextDelta to print each fragment as it lands, and stop when Done arrives. The flush=True matters for terminals and pipe-forwarded UIs; without it, Python buffers stdout and the streaming UX disappears.
from par_runtime import Runtime, TextDelta, Done
with Runtime(config_json) as rt:
for event in rt.invoke_stream("agent", "Tell me a joke"):
if isinstance(event, TextDelta):
print(event.text, end="", flush=True)
elif isinstance(event, Done):
print() # newline after the final token
# event.finish_reason is one of: stop, tool_calls, length,
# content_filter, max_iterations
If you just need the full message and do not care about latency, "".join(e.text for e in rt.invoke_stream(...) if isinstance(e, TextDelta)) reconstructs it. You lose the streaming benefit, but the API does not force you to consume incrementally.
Example 2: stream a tool call and reconstruct its arguments¶
LLM providers send tool calls as a ToolCallStart (the call id and tool name) followed by zero or more ToolCallDelta fragments whose args_json strings concatenate to the full JSON arguments. Buffer the fragments by tool_call_id, then parse the concatenation when the stream ends.
import json
from collections import defaultdict
from par_runtime import (
Runtime, TextDelta, ToolCallStart, ToolCallDelta, Done,
)
with Runtime(config_json) as rt:
text_parts = []
tool_buffers = defaultdict(list)
tool_names = {}
for event in rt.invoke_stream("agent", "What's the weather in Tokyo?"):
if isinstance(event, TextDelta):
text_parts.append(event.text)
elif isinstance(event, ToolCallStart):
tool_names[event.tool_call_id] = event.name
elif isinstance(event, ToolCallDelta):
tool_buffers[event.tool_call_id].append(event.args_json)
elif isinstance(event, Done):
break
for tool_call_id, fragments in tool_buffers.items():
args = json.loads("".join(fragments))
print(f"Tool call: {tool_names[tool_call_id]}({args})")
The same pattern works for parallel tool calls: each call has its own tool_call_id, so the buffer keyed by id keeps them separate without race conditions.
Example 3: handle errors and cancel cleanly¶
Wrap the iterator in try/except to catch provider failures (network, auth, content filter) and fiber errors. Breaking out of the loop or letting the with block exit runs the generator's finally clause, which joins the OCaml fiber and releases the queue. Never let an exception escape without closing the iterator.
from par_runtime import Runtime, TextDelta, PARError
try:
with Runtime(config_json) as rt:
try:
for event in rt.invoke_stream("agent", "hello"):
if isinstance(event, TextDelta):
print(event.text, end="", flush=True)
except PARError as e:
# Provider-side failure surfaced via the FFI: bad model name,
# rate limit, content filter, etc. Partial output may already
# have been printed; that is expected for streaming.
print(f"\n[stream failed: {e}]")
except KeyboardInterrupt:
# Ctrl-C during iteration. GeneratorExit fires, the finally
# block cancels the OCaml fiber, and the runtime shuts down.
print("\n[cancelled by user]")
raise
finally:
# rt.close() runs automatically when the `with` block exits.
pass
The PARError covers every error path that crosses the FFI boundary: malformed config, unknown agent id, provider HTTP errors, and any exception raised inside the chunk callback. The KeyboardInterrupt branch is worth keeping explicit so user-initiated cancellation logs cleanly rather than printing a traceback.
Limitations¶
- No async/await support in v0.5.1. The iterator is synchronous. An
async forwrapper is a v0.5.2 candidate; it will likely be a thinasyncioadapter around the sync iterator rather than a separate code path. - No nested event hierarchy. PAR does not emit LangChain-style
parent_run_idorrun_idmetadata on streaming events. If you need to correlate streams with tool calls or sub-agent invocations, use the event bus (par_event_subscribe, wired up in C.2) for structured events. - No
invoke_with_rag_streaming. The RAG entrypoint (Runtime.invoke_with_rag, deferred to v0.5.2 per roadmap) will get its own streaming variant once the base streaming surface ships. - Backpressure is blocking. If the consumer is much slower than the producer, the OCaml fiber blocks on
queue.put. This is an acceptable tradeoff versus unbounded memory growth, but it does mean a hung consumer ties up an OCaml fiber until the stream completes or is cancelled. - Single consumer only. The iterator is not broadcast. If multiple subscribers need the same stream, fan out at the application level (wrap the iterator in your own pub-sub).
Implementation notes (for C.2 and C.3 maintainers)¶
This section is informational. It does not define the public API; it records the hooks the FFI bridge should use.
- Reuse the existing
?on_chunkparameter.Runtime.invokeatlib/core/runtime.mlline 336 already accepts?on_chunk : (Types.llm_response_chunk -> unit) option. Wire the C callback through this parameter; do not add a new code path on the OCaml side. - Do not route chunks through the event bus. The event bus (
Event_busmodule) has no streaming event constructor and should not gain one. Streaming chunks are a synchronous callback, not a publish-subscribe event. Mixing the two would couple stream consumers to event-bus retention policy. - Reference consumer implementation.
bin/main.mlline 386 definesstream_print_chunk, used at lines 501 and 578 to streampar askoutput to the terminal. It is the canonical example of a chunk consumer and shows the expectedText_delta/Tool_call_deltahandling. - New C entrypoint. Add
par_invoke_stream(par_runtime_t* rt, const char* agent_id, const char* message, par_event_callback cb, void* user_data)tolib/ffi/par_ffi.handlib/ffi/par_ffi.c. Model the signature onpar_invokeand the existingpar_event_callbacktypedef atlib/ffi/par_ffi.hline 64. Theuser_datapointer is forwarded untouched to the callback so the Python binding can pass its queue reference. - Existing subscribe stub.
par_event_subscribeis declared atlib/ffi/par_ffi.hline 64 and stubbed atlib/ffi/par_ffi.cline 336 (returns-1). It is unrelated to streaming but uses the same callback shape. Wiring it up is optional for C.2 and may slip to a later phase; the streaming entrypoint does not depend on it. - Existing Python precedent.
bindings/python/par_runtime/_ffi.pyline 62 defines_PYTHON_TOOL_CALLBACK = CFUNCTYPE(c_char_p, c_int, c_char_p). Mirror this pattern for the streaming callback: define_STREAM_CALLBACK = CFUNCTYPE(None, c_char_p, c_char_p)(event_type, event_json), keep the closure onself._cb_keepalivefor the runtime's lifetime, parse the JSON inside the callback, and push a constructedEventonto the queue.
See also¶
- Agent API -
Runtime.invoke,agent_config, the non-streaming entrypoint thatinvoke_streammirrors - Overview - SDK architecture and module map
- Workflow API - workflow orchestration; workflow steps do not yet support streaming
- Tools API - the 20 built-in tools, including type-safe bash