Observability¶
The Dory SDK exposes the signals operators need to run a processor in production: HTTP health and readiness endpoints, Prometheus metrics, structured JSON logs, optional OpenTelemetry tracing, and request/connection middleware. Most of this is wired automatically by DoryApp and BaseProcessor (see Core Concepts); this page describes what each surface does and how to extend it.
Health endpoints¶
HealthServer is an aiohttp server (default port 8080; port=0 auto-finds a free port in the 8080–9000 range). It exposes:
| Method | Path | Purpose |
|---|---|---|
GET |
/health |
Liveness probe. |
GET |
/ready |
Readiness probe. |
GET |
/metrics |
Prometheus text exposition. |
GET |
/state |
Capture state for migration. |
POST |
/state |
Restore state for migration. |
GET |
/prestop |
Pre-termination hook. |
GET |
/ |
Root info. |
HealthServer(port=8080, metrics_collector=None, state_getter=None,
state_restorer=None, prestop_handler=None,
state_token=None, transfer_config=None)
/health — liveness¶
Runs LivenessProbe.check() and returns 200 when healthy or 503 otherwise. It always adds an X-Dory-SDK-Version response header, which the orchestrator uses to detect that a pod is running the Dory SDK. The default liveness probe always reports healthy; add custom conditions with LivenessProbe.add_check().
/ready — readiness gating¶
Runs ReadinessProbe.check() and returns 200 / 503.
Warning
A processor is not ready until mark_ready() is called. Until then /ready returns 503, so Kubernetes will not route traffic to the pod. Call mark_ready() once startup and state restore are complete; call mark_not_ready() to drain.
ReadinessProbe exposes mark_ready(), mark_not_ready(), and is_ready(). A StartupProbe is also available.
/metrics¶
Returns metrics_collector.export_prometheus() (content type text/plain; version=0.0.4), or a stub when no collector is attached. See Metrics.
GET / POST /state¶
These power live migration: GET captures state, POST restores it. Both authenticate with a bearer token (env DORY_STATE_TOKEN, compared in constant time).
GET /staterunsstate_getterin a thread pool with a timeout. Responses:200on success,504on timeout,413if the state is too large,401on auth failure,503if no getter is configured.POST /statevalidates the payload size, parses JSON, and runs the asyncstate_restorerunder a restore timeout.
Migration mechanics are covered in State Management.
/prestop¶
Calls mark_not_ready() and then the configured prestop_handler. Wire this to a Kubernetes preStop lifecycle hook so the pod stops receiving traffic and flushes state before termination.
Metrics¶
MetricsCollector(prefix="dory") is a home-grown Prometheus text exporter — there is no dependency on prometheus_client. It is served at /metrics by HealthServer.
Note
Because the exporter is built in, you do not install or import prometheus_client. export_prometheus() produces the exposition text directly.
Standard metrics¶
All standard metrics are prefixed dory_:
| Metric | Type | Description |
|---|---|---|
dory_startup_duration_seconds |
gauge | Time to start up. |
dory_shutdown_duration_seconds |
gauge | Time to shut down. |
dory_state_save_duration_seconds |
gauge | Time to save state. |
dory_state_load_duration_seconds |
gauge | Time to load state. |
dory_state_size_bytes |
gauge | Serialized state size. |
dory_restart_count |
counter | Process restarts. |
dory_golden_image_resets_total |
counter | Golden-image resets. |
dory_health_check_failures_total |
counter | Failed health checks. |
dory_processor_info |
gauge | Info metric, labels processor_id / version / pod. |
Convenience recorders set these for you: record_startup_started(), record_startup_completed(), record_state_save(duration, size_bytes=0), and similar.
Custom metrics¶
Register and update your own metrics — they auto-register on first use:
collector.set_gauge("queue_depth", 42, labels={"queue": "ingest"})
collector.inc_counter("messages_processed_total", 1.0, labels={"topic": "events"})
set_gauge(name, value, labels=None) sets a gauge; inc_counter(name, value=1.0, labels=None) increments a counter.
Structured logging¶
setup_logging(level="INFO") configures JSON structured logging: a single StreamHandler to stdout, with kubernetes, urllib3, and aiohttp quieted to WARNING. The only knob is the log level. DoryApp calls it with config.log_level (see Configuration).
from dory.logging import get_logger
log = get_logger(__name__, extra={"component": "ingest"})
log.info("processed batch", extra={"count": 100})
Use get_logger(name, extra=None) to obtain a logger. An alternative request-id-aware text logger is available via configure_logging_with_request_id(), which injects a request_id field onto every record.
OpenTelemetry¶
OpenTelemetry is an optional dependency — the module imports and no-ops cleanly when it is absent (guarded by an OTEL_AVAILABLE flag), so tracing never crashes a processor that lacks the OTel libraries.
BaseProcessor initializes self.otel:
self.otel = OpenTelemetryManager(
service_name="dory-app",
service_version=os.getenv("DORY_APP_VERSION", "1.0.0"),
environment="production",
console_export=True,
).initialize()
Note
Nothing is auto-traced. Tracing is opt-in — you create spans where you want them.
Create spans with the context manager or the decorator (both no-op when OTel is not initialized):
with self.otel.create_span("fetch_batch", attributes={"size": 100}):
rows = fetch()
@self.otel.trace(name="process_item")
async def process_item(item):
...
OpenTelemetryManager also provides get_tracer(), add_span_attributes(), record_exception(), W3C context propagation via inject_context(headers) / extract_context(headers), and shutdown(). Module globals initialize_otel / get_global_otel and no-op-safe create_span / trace_function are available for use outside a processor.
Middleware¶
BaseProcessor auto-wires three middleware components (best-effort) for tracking work in flight.
Request tracker¶
self.request_tracker (RequestTracker(max_history=1000, enable_history=True)) records each unit of work. The track async context manager yields a request id and auto-completes the request as SUCCESS, FAILED, TIMEOUT, or CANCELLED:
async with self.request_tracker.track("ingest", metadata={"topic": "events"}) as rid:
await do_work()
Inspect aggregates with get_metrics() / get_stats().
Request-id middleware¶
self.request_id_middleware (RequestIdMiddleware) is ContextVar-based and propagates a request id (header X-Request-ID by default). Use with_request_id(request_id=None) to scope an id, and extract_request_id(headers) / inject_request_id(headers) to cross service boundaries. Module helpers: generate_request_id() (uuid4), get_current_request_id(), set_request_id().
Connection tracker¶
self.connection_tracker (ConnectionTracker) registers long-lived connections and runs a background health-check loop that auto-closes idle ones (defaults: health_check_interval=60.0, idle_timeout=300.0, auto_close_on_idle=True):
conn_id = await self.connection_tracker.register_connection(
conn, name="db", close_func=conn.close)
async with self.connection_tracker.use_connection(conn_id):
await conn.execute(sql)
See API Reference for complete signatures.