Skip to content

Output & Events

A Dory SDK processor publishes results as events onto a RabbitMQ topic exchange. Every event is wrapped in a versioned envelope and routed by a key derived from the processor's identity, the event type, and (optionally) a geohash. This page covers the publisher side end-to-end and the subscriber-side validator briefly.

The message envelope

All published data is wrapped in a MessageEnvelope (dory.output.envelope). The current schema is ENVELOPE_SCHEMA_VERSION = "0.1".

{
  "schema_version": "0.1",
  "message_id": "<uuid4>",
  "timestamp": "<ISO-8601 UTC>",
  "payload": { "...": "your data" }
}

MessageEnvelope{schema_version="0.1", message_id=uuid4, timestamp, payload} has to_dict() / from_dict().

Versioning is semantic on the schema version:

Bump Meaning Subscriber impact
MINOR Additive, backward-compatible field. Existing subscribers keep working.
MAJOR Breaking change. Subscribers match on major version and must opt in.

Formatters

EnvelopeFormatter(formatter=None, schema_version="0.1") wraps and serializes: wrap(data) -> MessageEnvelope, format(data) -> bytes. The default JSONFormatter(indent=None, sort_keys=False, max_size_bytes=0) emits application/json and serializes datetime, UUID, Enum, and dataclasses transparently.

Routing keys

Routing keys follow the pattern <processor_id>.<event_type>.<geohash_segments>, letting subscribers bind to specific processors, event types, or geographic regions via topic wildcards.

from dory.output.routing import build_routing_key, build_routing_key_from_geo

key = build_routing_key("detection", segment_length=1)
# e.g. "proc-1.detection.9"

key = build_routing_key_from_geo("detection", 40.71, -74.00,
                                 precision=9, segment_length=1)
Source Resolution
processor_id Env PROCESSOR_ID.
Geohash Env DORY_GEOHASH, else computed from DORY_LATITUDE + DORY_LONGITUDE.

segment_length controls how many geohash characters are appended (coarser vs. finer geographic routing). See Configuration for the env vars.

RabbitMQ publisher

RabbitMQPublisher (dory.output.rabbitmq, requires aio_pika) publishes to a durable topic exchange. It always wraps the payload through an EnvelopeFormatter, guards the broker with an internal circuit breaker, and buffers in memory when the broker is unreachable.

from dory.output.rabbitmq import RabbitMQPublisher, PublisherConfig

pub = RabbitMQPublisher(PublisherConfig())
await pub.connect()
await pub.publish("proc-1.detection.9", {"count": 3})
await pub.flush_buffer()
await pub.close()

PublisherConfig highlights:

Field Default Purpose
url amqp://guest:guest@localhost:5672/ Broker URL (overridden by OAuth2, below).
exchange dory.output Target exchange.
exchange_type topic Routing model.
durable True Survives broker restart.
connection_timeout 10.0 Connect timeout.
heartbeat 60 AMQP heartbeat seconds.
buffer_enabled True Buffer on failure.
buffer_max_size 10000 Max buffered messages (oldest evicted).
buffer_max_bytes 100 MB Max buffered bytes.
retry_max_attempts 3 Publish retries.
retry_initial_delay / retry_max_delay 1.0 / 30.0 Backoff bounds.
circuit_breaker_failure_threshold 5 Failures before the breaker opens.
circuit_breaker_timeout 60.0 Breaker open duration.

The breaker is the same primitive described in Resilience. When it is open or the broker is down, messages accumulate in the in-memory buffer (oldest evicted past the limits) and are replayed on reconnect.

Async surface: connect(), publish(routing_key, data, exchange=None, headers=None, raw=False), flush_buffer(), close(), is_connected(), get_buffer_size(). Pass raw=True to skip envelope wrapping.

Warning

The in-memory buffer is not durable. If the process exits while the broker is unreachable, buffered messages are lost. Size buffer_max_size / buffer_max_bytes for your tolerable outage window.

OAuth2 token auth

In production the publisher authenticates to RabbitMQ with an OAuth2 client-credentials token rather than a static password. OAuth2TokenProvider (dory.auth.oauth2) handles this:

from dory.auth.oauth2 import OAuth2TokenProvider

provider = OAuth2TokenProvider(token_url, client_id, client_secret)
amqp_url = await provider.build_amqp_url("rabbit.example.com")
# -> amqps://:<token>@rabbit.example.com:5671//

OAuth2TokenProvider(token_url, client_id, client_secret, scopes=None, refresh_buffer_sec=60):

  • get_token() caches the token until it is near expiry, then POSTs grant_type=client_credentials with HTTP Basic auth.
  • build_amqp_url(host, port=5671, vhost="/", tls=True) produces amqps://:<token>@host:port/vhost — empty username, token as password, per the RabbitMQ OAuth2 convention.

The TokenProvider Protocol lets you swap in Vault, IAM, or another source. The publisher transparently reconnects with a fresh token when a JWT expires.

The publish() helper

BaseProcessor.publish() is the one call a processor normally makes. It builds the geohash routing key from a GeoPoint and publishes through the processor's auto-created publisher.

from dory.geo.geolocalizer import GeoPoint

await self.publish("detection", GeoPoint(40.71, -74.00),
                   {"label": "car", "confidence": 0.93})

BaseProcessor.publish(event_type, location: GeoPoint, payload, *, headers=None, exchange=None).

Note

The publisher is auto-created only when env DORY_RABBITMQ_OAUTH2_TOKEN_URL is set. If self.publisher is None, publish() raises RuntimeError. See Configuration for the DORY_RABBITMQ_* variables.

Subscriber side

Consumers validate envelopes and dispatch on version with EnvelopeValidator (dory.output.validator).

from dory.output.validator import EnvelopeValidator

v = EnvelopeValidator(strict_version=False)
v.register("0.1", handle_v0_1, is_async=True)
await v.handle(incoming_dict)
  • register(version, handler, is_async=True) — bind a handler to a schema version.
  • validate(data) -> MessageEnvelope — parse and verify the envelope.
  • resolve_handler(version) — exact match, falling back to major-version match.
  • handle(data) (async) / can_handle(version).

With strict_version=False, an unknown minor version falls back to the matching major handler; a missing major handler raises UnsupportedVersionError. This is how additive (MINOR) changes stay backward-compatible while breaking (MAJOR) changes are rejected explicitly.