Output & Events¶
A Dory SDK processor publishes results as events onto a RabbitMQ topic exchange. Every event is wrapped in a versioned envelope and routed by a key derived from the processor's identity, the event type, and (optionally) a geohash. This page covers the publisher side end-to-end and the subscriber-side validator briefly.
The message envelope¶
All published data is wrapped in a MessageEnvelope (dory.output.envelope). The current schema is ENVELOPE_SCHEMA_VERSION = "0.1".
{
"schema_version": "0.1",
"message_id": "<uuid4>",
"timestamp": "<ISO-8601 UTC>",
"payload": { "...": "your data" }
}
MessageEnvelope{schema_version="0.1", message_id=uuid4, timestamp, payload} has to_dict() / from_dict().
Versioning is semantic on the schema version:
| Bump | Meaning | Subscriber impact |
|---|---|---|
| MINOR | Additive, backward-compatible field. | Existing subscribers keep working. |
| MAJOR | Breaking change. | Subscribers match on major version and must opt in. |
Formatters¶
EnvelopeFormatter(formatter=None, schema_version="0.1") wraps and serializes: wrap(data) -> MessageEnvelope, format(data) -> bytes. The default JSONFormatter(indent=None, sort_keys=False, max_size_bytes=0) emits application/json and serializes datetime, UUID, Enum, and dataclasses transparently.
Routing keys¶
Routing keys follow the pattern <processor_id>.<event_type>.<geohash_segments>, letting subscribers bind to specific processors, event types, or geographic regions via topic wildcards.
from dory.output.routing import build_routing_key, build_routing_key_from_geo
key = build_routing_key("detection", segment_length=1)
# e.g. "proc-1.detection.9"
key = build_routing_key_from_geo("detection", 40.71, -74.00,
precision=9, segment_length=1)
| Source | Resolution |
|---|---|
processor_id |
Env PROCESSOR_ID. |
| Geohash | Env DORY_GEOHASH, else computed from DORY_LATITUDE + DORY_LONGITUDE. |
segment_length controls how many geohash characters are appended (coarser vs. finer geographic routing). See Configuration for the env vars.
RabbitMQ publisher¶
RabbitMQPublisher (dory.output.rabbitmq, requires aio_pika) publishes to a durable topic exchange. It always wraps the payload through an EnvelopeFormatter, guards the broker with an internal circuit breaker, and buffers in memory when the broker is unreachable.
from dory.output.rabbitmq import RabbitMQPublisher, PublisherConfig
pub = RabbitMQPublisher(PublisherConfig())
await pub.connect()
await pub.publish("proc-1.detection.9", {"count": 3})
await pub.flush_buffer()
await pub.close()
PublisherConfig highlights:
| Field | Default | Purpose |
|---|---|---|
url |
amqp://guest:guest@localhost:5672/ |
Broker URL (overridden by OAuth2, below). |
exchange |
dory.output |
Target exchange. |
exchange_type |
topic |
Routing model. |
durable |
True |
Survives broker restart. |
connection_timeout |
10.0 |
Connect timeout. |
heartbeat |
60 |
AMQP heartbeat seconds. |
buffer_enabled |
True |
Buffer on failure. |
buffer_max_size |
10000 |
Max buffered messages (oldest evicted). |
buffer_max_bytes |
100 MB |
Max buffered bytes. |
retry_max_attempts |
3 |
Publish retries. |
retry_initial_delay / retry_max_delay |
1.0 / 30.0 |
Backoff bounds. |
circuit_breaker_failure_threshold |
5 |
Failures before the breaker opens. |
circuit_breaker_timeout |
60.0 |
Breaker open duration. |
The breaker is the same primitive described in Resilience. When it is open or the broker is down, messages accumulate in the in-memory buffer (oldest evicted past the limits) and are replayed on reconnect.
Async surface: connect(), publish(routing_key, data, exchange=None, headers=None, raw=False), flush_buffer(), close(), is_connected(), get_buffer_size(). Pass raw=True to skip envelope wrapping.
Warning
The in-memory buffer is not durable. If the process exits while the broker is unreachable, buffered messages are lost. Size buffer_max_size / buffer_max_bytes for your tolerable outage window.
OAuth2 token auth¶
In production the publisher authenticates to RabbitMQ with an OAuth2 client-credentials token rather than a static password. OAuth2TokenProvider (dory.auth.oauth2) handles this:
from dory.auth.oauth2 import OAuth2TokenProvider
provider = OAuth2TokenProvider(token_url, client_id, client_secret)
amqp_url = await provider.build_amqp_url("rabbit.example.com")
# -> amqps://:<token>@rabbit.example.com:5671//
OAuth2TokenProvider(token_url, client_id, client_secret, scopes=None, refresh_buffer_sec=60):
get_token()caches the token until it is near expiry, then POSTsgrant_type=client_credentialswith HTTP Basic auth.build_amqp_url(host, port=5671, vhost="/", tls=True)producesamqps://:<token>@host:port/vhost— empty username, token as password, per the RabbitMQ OAuth2 convention.
The TokenProvider Protocol lets you swap in Vault, IAM, or another source. The publisher transparently reconnects with a fresh token when a JWT expires.
The publish() helper¶
BaseProcessor.publish() is the one call a processor normally makes. It builds the geohash routing key from a GeoPoint and publishes through the processor's auto-created publisher.
from dory.geo.geolocalizer import GeoPoint
await self.publish("detection", GeoPoint(40.71, -74.00),
{"label": "car", "confidence": 0.93})
BaseProcessor.publish(event_type, location: GeoPoint, payload, *, headers=None, exchange=None).
Note
The publisher is auto-created only when env DORY_RABBITMQ_OAUTH2_TOKEN_URL is set. If self.publisher is None, publish() raises RuntimeError. See Configuration for the DORY_RABBITMQ_* variables.
Subscriber side¶
Consumers validate envelopes and dispatch on version with EnvelopeValidator (dory.output.validator).
from dory.output.validator import EnvelopeValidator
v = EnvelopeValidator(strict_version=False)
v.register("0.1", handle_v0_1, is_async=True)
await v.handle(incoming_dict)
register(version, handler, is_async=True)— bind a handler to a schema version.validate(data) -> MessageEnvelope— parse and verify the envelope.resolve_handler(version)— exact match, falling back to major-version match.handle(data)(async) /can_handle(version).
With strict_version=False, an unknown minor version falls back to the matching major handler; a missing major handler raises UnsupportedVersionError. This is how additive (MINOR) changes stay backward-compatible while breaking (MAJOR) changes are rejected explicitly.