Workflow Streams - Python SDK
Workflow Streams is a Temporal Python SDK contrib library that gives a Workflow a durable, offset-addressed event channel built on Temporal's basic message primitives: Signals, Updates, and Queries.
The library implements batching to amortize overheads, deduplication for exactly-once delivery, topics for stream filtering, and continue-as-new helpers to support long-running operations.
Workflow Streams are intended for keeping outside observers updated on the progress being made by a Workflow and its associated Activities. Example applications include updating a user interface to show the progress being made by an AI agent, or providing updates on in-flight payment or order processing work. Workflow Streams are not designed for ultra-low-latency applications such as real-time voice streaming.
A WorkflowStream instance is a many-to-many communication channel hosted inside a Workflow: any number of publishers (the Workflow itself, its Activities, other Workflows, etc.) append events, and any number of subscribers consume them. Temporal's Signals and Updates provide the underlying message passing primitives; the temporalio.contrib.workflow_streams module packages the batching, offset tracking, topic filtering, and continue-as-new hand-off that turn those primitives into a usable stream.
The temporalio.contrib.workflow_streams module is currently in
Public Preview. Refer to the
Temporal product release stages guide for more information.
Cross-language client support is on the roadmap. Only the Python client is covered for now.
The API may change before general availability.
For runnable end-to-end examples, see the Workflow Streams samples. The directory has four scenarios: basic publish/subscribe with heterogeneous topics, a reconnecting subscriber that demonstrates offset-based resume, an external (non-Activity) publisher pattern, and a long-running ticker that bounds its log via truncate().
Concepts
Three roles cover everything you need to use Workflow Streams:
- A streaming Workflow hosts the event stream. Constructing a
WorkflowStreaminstance from@workflow.initregisters the handlers that accept publishes and serve subscribers. Each Workflow has at most oneWorkflowStream; constructing more than one raisesRuntimeError. - Publishers append events. The Workflow itself can publish via the in-memory
WorkflowStream. Activities and external clients publish viaWorkflowStreamClient, which batches and ships events to the Workflow. - Subscribers consume events. They call
WorkflowStreamClient.subscribe(...)and iterate the returned async iterator.
Events are organized by topic — a string label set when publishing. Subscribers can filter to one or more topics or consume all of them. Topics are implicit; publishing to a topic creates it.
The deeper picture — where Signals and Updates fit, how batching works, how state is carried across continue-as-new — appears in Architecture at the end of this guide. You can skip ahead to it once the basics are wired up.
Usage patterns
Workflow Streams is built for the case where an observer can show up late — after a page refresh, server restart, or closed laptop — and still see what happened while they were away. Three patterns recur when applying the library.
End-to-end durability. The event log lives inside the Workflow. Any process that bridges events to the outside world, like an SSE proxy serving a browser, a WebSocket fan-out, a forwarding Activity, stays stateless. A reconnecting client supplies its from_offset, the bridge polls forward from there, and the Workflow is the single source of truth.
Don't put Redis or Kafka in front of the stream to "buffer" reconnects. Splitting authority between two stores creates state that drifts and has to be reconciled under retries. The Workflow already owns ordering, persistence, and replay.
Build consumer state from individual events. Publishers emit values in their own vocabulary (STATUS_CHANGE, TOOL_CALL, TEXT_DELTA, and so on) and consumers project those values into anything they want to render. The library assigns each event a stable offset and delivers it exactly once. Replaying from offset 0 always yields the same sequence, which is what makes reconnection correct.
The library does not deduplicate across Activity attempts: a retried Activity's events are appended to the log alongside the failed attempt's. Treat the stream as an append-only log of attempts and let an idempotent consumer reducer reconcile them — overwriting where appropriate (STATUS_CHANGE, TEXT_COMPLETE), or resetting an accumulator on a sentinel event (AGENT_START) before deltas resume. See Delivery semantics for the precise guarantees.
Workflow as conduit, not consumer. When the source of events is an Activity (for example, an Activity streaming tokens from an LLM), make the Activity publish events directly to the Workflow's stream. The Workflow publishes its own lifecycle events and processes only the complete result returned by each Activity. It doesn't read the token stream.
This separation keeps the Workflow's deterministic progress independent of side-channel events that may include partial output from an Activity attempt that ended up retrying. Surface retry transitions to consumers as their own event (e.g., a RETRY event published with force_flush=True) and let the consumer decide how to present them.
Install and import
The library ships as a contrib module with the Temporal Python SDK:
pip install temporalio
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamClient, WorkflowStreamState
Enable streaming on a Workflow
Turn an existing Workflow into a streaming source by constructing a WorkflowStream from its @workflow.init method:
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream
@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
self.stream = WorkflowStream()
Constructing WorkflowStream does two things:
- It allocates the in-memory event log that holds published entries
- It dynamically registers the Workflow Streams Signal, Update, and Query handlers on the current Workflow. Once these are in place, the Workflow accepts publish Signals from external publishers and serves long-poll subscribe Updates with no further code on your part.
Construction must happen exactly once and must happen in @workflow.init. The publish Signal handler has to be registered before the first publish Signal is delivered and @workflow.init is the only point in the Workflow lifecycle that runs ahead of message delivery. Calling WorkflowStream(...) outside @workflow.init, or constructing more than one stream on the same Workflow, raises RuntimeError.
Support Continue-As-New (optional)
If your Workflow uses Continue-As-New, carry the stream's state across the boundary so subscribers see no gap when the run rolls over. Add a WorkflowStreamState | None field to your Workflow input and pass it to the constructor:
from dataclasses import dataclass
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamState
@dataclass
class OrderInput:
order_id: str
stream_state: WorkflowStreamState | None = None
@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
self.stream = WorkflowStream(prior_state=input.stream_state)
The | None on the field type is required: prior_state is None on a fresh start and a WorkflowStreamState instance after a Continue-As-New. Use the concrete type, not Any, so the default data converter can rebuild the dataclass on the new run.
Carrying the field is all that's needed to receive state across Continue-As-New see Continue-As-New below for how to invoke the rollover.
Publish from a Workflow
Bind a topic name to its event type once via self.stream.topic("name", type=Type), then call publish() on the returned handle to append events. The handle records the per-stream binding from topic name to value type so call sites don't have to repeat the type on every publish, and so subscribers reading the same handle decode to the matching type.
from dataclasses import dataclass
@dataclass
class StatusEvent:
state: str
progress: int = 0
detail: str = ""
@workflow.defn
class OrderWorkflow:
@workflow.init
def __init__(self, input: OrderInput) -> None:
self.stream = WorkflowStream(prior_state=input.stream_state)
self.status = self.stream.topic("status", type=StatusEvent)
@workflow.run
async def run(self, input: OrderInput) -> None:
self.status.publish(StatusEvent(state="validating", detail="checking inventory"))
await validate_order(input.order_id)
self.status.publish(StatusEvent(state="charging", progress=33, detail="authorizing payment"))
await charge_payment(input.order_id)
self.status.publish(StatusEvent(state="shipping", progress=66, detail="dispatching to warehouse"))
await dispatch_order(input.order_id)
self.status.publish(StatusEvent(state="completed", progress=100))
publish() runs the payload converter to encode each value. The codec chain (encryption, compression, and so on) runs once on the Signal or Update envelope that carries the batch — never per item — so encryption and compression are applied exactly once each direction.
The type= argument is optional and defaults to Any. Pass it when you want the binding recorded so re-binding the same name to an unequal type raises, or so subscribers can pick up the type from the same handle.
Publish from an Activity
Use WorkflowStreamClient.from_within_activity() inside an Activity. The Temporal client and target Workflow Id are taken from the Activity context. The async context manager batches publishes and flushes on exit. Bind a topic handle on the client and publish through it, the same way you would on the Workflow side.
from datetime import timedelta
from temporalio import activity
from temporalio.contrib.workflow_streams import WorkflowStreamClient
@activity.defn
async def stream_chunks(order_id: str) -> None:
client = WorkflowStreamClient.from_within_activity(batch_interval=timedelta(seconds=2))
async with client:
events = client.topic("events", type=Chunk)
for chunk in generate_chunks(order_id):
events.publish(chunk)
activity.heartbeat()
# Buffer is flushed on context manager exit.
To skip buffering for latency-sensitive events, pass force_flush=True to wake the background flusher so the buffer ships without waiting for the next batch interval. The call itself doesn't wait for delivery to the Workflow or to subscribers:
events.publish(chunk, force_flush=True)
publish() is non-blocking and applies no backpressure. It appends to an in-memory buffer and returns. If a publisher emits faster than batches can ship, the buffer grows. Subscriber slowness does not slow publishers down because subscribers pull from the Workflow's log on their own schedule.
If your producer can run unboundedly faster than its batches can drain, a tight loop generating events, for example, apply your own rate limiting upstream of publish(), or use truncate() on the Workflow side to bound log growth (see Architecture).
Use await client.flush() when you need a mid-stream barrier. This is proof that prior publications have reached the Temporal server before subsequent work runs, while the client stays open for more publishing. Exiting async with client already flushes on its way out, so the explicit call is only for barriers in the middle:
async with client:
events = client.topic("events", type=Chunk)
for chunk in first_phase():
events.publish(chunk)
await client.flush()
checkpoint_id = await record_phase_one_complete() # only safe once subscribers have seen phase one
for chunk in second_phase(checkpoint_id):
events.publish(chunk)
Subscribe
Use WorkflowStreamClient.create() to get a client bound to a Workflow, then iterate a topic handle's subscribe() which is the symmetric counterpart to publish(). The handle's bound type drives decoding, so each item.data arrives as T via the client's payload converter. The codec chain is applied once at the Update envelope, not per item.
from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient
async def watch_order(order_id: str) -> None:
temporal_client = await Client.connect("localhost:7233")
stream = WorkflowStreamClient.create(temporal_client, workflow_id=order_id)
status = stream.topic("status", type=StatusEvent)
async for item in status.subscribe():
evt = item.data
print(f"[{evt.progress:3d}%] {evt.state}: {evt.detail}")
if evt.state == "completed":
break
The iterator handles re-polling, server-imposed range limits, and Workflow-side log truncation transparently. There are no extra signals or exceptions for normal callers to handle. See the API reference if you need the underlying details.
WorkflowStreamClient.subscribe() is a long-poll loop and does not exit on its own when the host Workflow completes. Closing a streamed Workflow cleanly takes two pieces working together:
-
The Workflow publishes an in-band terminator that consumers recognize and break on. The
StatusEvent(state="completed")in the example above is the minimal form. The consumer'sif evt.state == "completed": breakis the matching half. Each subscription needs to know its own end-of-stream marker because thecontribdoesn't impose one. -
The Workflow holds its run open briefly after publishing the terminator so subscribers' next poll fetches it before the in-memory log is gone. Items published in the same Workflow task that returns from
@workflow.runare abandoned and the next subscriber poll lands on a completed Workflow. A shortawait workflow.sleep(timedelta(milliseconds=500))before returning is enough to let one more poll come through.self.status.publish(StatusEvent(state="completed", progress=100))
await workflow.sleep(timedelta(milliseconds=500))
return result
With both pieces in place, subscribers receive the terminator, break out of their async for, and stop polling. By the time the Workflow exits there are no in-flight poll handlers, so the SDK doesn't warn about unfinished handlers. As a fallback for runs that crashed before publishing a final event, call handle.describe() after the loop returns and inspect the Workflow's status.
Heterogeneous topics
A topic handle binds one name to one type, so it only fits a single-type subscription. To consume multiple topics whose payload types differ, call client.subscribe() directly with a list of names (or subscribe([]) for every topic) and pass result_type=temporalio.common.RawValue so each item arrives as the underlying Payload wrapped in a RawValue. Dispatch on item.topic and decode the wrapped payload with the client's payload converter:
from temporalio.common import RawValue
converter = temporal_client.data_converter.payload_converter
async for item in stream.subscribe(["status", "progress"], result_type=RawValue):
if item.topic == "status":
evt = converter.from_payload(item.data.payload, StatusEvent)
print(f"[status] {evt.state}: {evt.detail}")
elif item.topic == "progress":
evt = converter.from_payload(item.data.payload, ProgressEvent)
print(f"[progress] {evt.message}")
A single iterator over multiple topics also avoids the cancellation race that two concurrent subscribers would create. RawValue is also the right shape when you want to forward the bytes through to another system without decoding them.
Omitting result_type entirely or passing result_type=None decodes each item with the converter's default rules — for the stock JSON converter, a Python primitive, dict, or list. That works for fully homogeneous streams, but not for the dispatch-by-topic pattern above, where each topic has its own concrete dataclass.
Subscribers automatically follow Continue-As-New chains, so a long-running Workflow can roll over without disrupting active consumers. Under the hood the iterator re-resolves the Workflow handle to the new run and continues polling from the carried offset.
Continue-As-New
WorkflowStream.continue_as_new(build_args) drains waiting subscribers, waits for in-flight handlers to finish, then calls workflow.continue_as_new with the args produced by build_args(post_drain_state). Carry both your application state and the stream state across the boundary:
from dataclasses import dataclass, field
from temporalio import workflow
from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamState
@dataclass
class AppState:
items_processed: int = 0
@dataclass
class WorkflowInput:
app_state: AppState = field(default_factory=AppState)
stream_state: WorkflowStreamState | None = None
@workflow.defn
class LongRunningWorkflow:
@workflow.init
def __init__(self, input: WorkflowInput) -> None:
self.app_state = input.app_state
self.stream = WorkflowStream(prior_state=input.stream_state)
@workflow.run
async def run(self, input: WorkflowInput) -> None:
while True:
await do_one_iteration(self)
if workflow.info().is_continue_as_new_suggested():
await self.stream.continue_as_new(
lambda stream_state: [
WorkflowInput(
app_state=self.app_state,
stream_state=stream_state,
)
]
)
To pass other Continue-As-New parameters such as task_queue, retry_policy, run_timeout, use the explicit recipe instead:
self.stream.detach_pollers()
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(
args=[WorkflowInput(app_state=self.app_state, stream_state=self.stream.get_state())],
task_queue="other-tq",
)
Delivery semantics
Workflow Streams sits below the failure-free abstraction that durable execution presents to your Workflow code. Durable execution gives you a deterministic view of progress in which retries are hidden and only successful results are visible.
Streaming, by design, surfaces in-progress work to consumers (typically a UI) before it has been settled into that abstraction. Which is what makes the library useful for showing the user a model's tokens as they arrive, an Activity's progress as it runs, or a long pipeline's intermediate state. That trade is what shapes the guarantees below.
Exactly-once at the execution layer. Each (publisher_id, sequence) batch lands in the Workflow's event log at most once, even if the publisher's underlying Signal is retried by the SDK or the network. Once an event is in the log, every subscriber that polls past its offset will see it, and deduplicate state is carried across Continue-As-New so a retried publish that arrives after a rollover still lands at most once.
Ordering. The log imposes a single total order on all events, fixed once written: an event at offset N stays at offset N on every read. Within one publisher (one WorkflowStreamClient instance, or the Workflow itself), events appear in publish order. Across concurrent publishers, the interleaving is whatever the Workflow saw when serializing inbound Signals — stable once recorded, but not under application control. If event A must precede event B, publish them from the same publisher.
Not exactly-once at the application layer. The library does not deduplicate across Activity retries. When an Activity publishes events, fails, and is retried, both attempts' events appear in the stream. That's what gives you visibility into the failed attempt while it's happening.
Deduplicating, hiding, or labelling those events is the application's job, and is the trade you're making in exchange for streaming below the durable-execution boundary. The conventional pattern is for an Activity that detects it's on a retry attempt to publish a RETRY event with force_flush=True, and for the UI to clear or annotate prior-attempt output when it sees one. The Workflow as conduit, not consumer pattern keeps the Workflow's own state independent of these retried events.
Other failure modes. Events still in a publisher's in-memory client buffer are lost if the process crashes before they ship. Subscribers that handle an item and crash before persisting their next offset will reprocess that item on resume. Send consumer state from events with both in mind.
Two limits on the deduplication window are worth understanding:
-
publisher_ttl(default 15 minutes). Retention for the per-publisher deduplicate state. At each Continue-As-New, deduplicate entries older than this are dropped. A publisher that returns after a longer pause may produce a duplicate.Pass it as a
kwargtoWorkflowStream.continue_as_new(...)or toWorkflowStream.get_state(...)if you're driving the rollover with the explicit recipe. Tune upward if your publishers can be silent for extended windows. -
max_retry_duration(default 10 minutes). AWorkflowStreamClientretries a failed batch for up to this long. If the duration elapses with the batch still pending, for example, a sustained network partition, the client gives up, the pending batch is dropped, and aTimeoutErroris raised.The batch's actual delivery outcome is unknown, so subscribers might still see the events. The error surfaces from
await client.flush()or theasync withblock's__aexit__. If you don't flush or exit the context manager, a background timeout can pass unobserved. Keepmax_retry_duration < publisher_ttlto preserve deduplication across publisher retries.
Use await client.flush() when proof of delivery is required. force_flush=True wakes the background flusher immediately but is still fire-and-forget — use it for latency, not for confirmation.
Codec and payload encoding
Per-item values go through the sync payload converter to produce temporalio.api.common.v1.Payload objects.
The codec chain (the same one configured on your client) runs once on the Signal or Update envelope, not per item, so codec transforms apply symmetrically to Workflow-side and client-side publishes with no double-encryption.
For details on configuring custom converters and codecs, see Payload conversion.
Architecture
The user-facing API hides three pieces of machinery worth understanding when you tune throughput, debug delivery, or reason about history size.
Append-only log inside the Workflow. WorkflowStream keeps an in-memory list of (topic, data) entries inside the Workflow's state. Each entry has a monotonically increasing global offset. Subscribers maintain their own offset cursor and on each long-poll, they receive the next range past that cursor.
The log is part of the Workflow's state, so it persists for the life of the run and is carried across Continue-As-New through WorkflowStreamState. To bound log growth in long-lived Workflows, call self.stream.truncate(up_to_offset) on the Workflow side. Subscribers that fall behind a truncation jump forward to the new base offset and continue. Streams that carry large items risk exceeding Temporal's per-payload size limit — most acutely on the carried state at Continue-As-New, but also on inbound publish Signals and outbound subscribe Update responses. Offload the bytes via External Storage so each item is a small reference rather than the full payload.
Where Signals, Updates, and Queries fit. The three handlers registered when you construct a WorkflowStream map to Temporal's basic message primitives:
- Publishers send Signals.
WorkflowStreamClientbatches publishes inside its async context manager and ships each batch as a single__temporal_workflow_stream_publishSignal. The default batch interval is two seconds.force_flush=Trueskips that timer for an immediate send. - Subscribers send Updates.
subscribe()issues a long-poll__temporal_workflow_stream_pollUpdate that returns when entries past the requested offset are available or when the polling cooldown elapses with no progress. - Position is queryable. A Query (
__temporal_workflow_stream_offset) reports the current head offset for tools that need to bookmark a position without consuming entries.
Batching and deduplicating. Each publisher is identified by a (publisher_id, sequence) pair and the stream deduplicates on this pair. WorkflowStreamClient assigns a stable publisher_id and a monotonic sequence per publish, so a retried Signal — for example, after a transient network error — lands at most once in the log. Deduplicate state is part of the carried Workflow state, so the guarantee survives Continue-As-New (subject to publisher_ttl).
Caveats
-
One
WorkflowStreamper Workflow. The constructor raisesRuntimeErrorif called twice. -
Synchronous handlers can race published Signals. If you add a custom synchronous
@workflow.signalor@workflow.updatehandler that readsWorkflowStreamstate, and a client calls__temporal_workflow_stream_publishimmediately followed by your handler, the handler may observe pre-publish state when both land in the same activation.The fix is to make the handler
asyncandawait asyncio.sleep(0)before reading state. Thatyieldlets pending publishes apply with no Temporal timer or history events. Don't substituteworkflow.sleep(0), which schedules a timer. Most handlers are already safe like, anything thatawaits before reading state, the module's own poll handler, and Workflow-internal publishes. -
Response size cap. Poll responses are capped at roughly 1 MB. Subscribers repoll immediately when the server caps a response. No special handling is required, but high-throughput producers should expect a steady stream of small batches rather than occasional large ones.
See also
- Workflow Streams samples (samples-python) — four runnable scenarios covering basic publish/subscribe, reconnecting subscribers, external publishers, and bounded logs via
truncate(). temporalio.contrib.workflow_streamsAPI reference.- Workflow message passing — Signals, Updates, and Queries that Workflow Streams is built on.
- Payload conversion — converters and codecs.