Skip to content

Observability

The Dory SDK exposes the signals operators need to run a processor in production: HTTP health and readiness endpoints, Prometheus metrics, structured JSON logs, optional OpenTelemetry tracing, and request/connection middleware. Most of this is wired automatically by DoryApp and BaseProcessor (see Core Concepts); this page describes what each surface does and how to extend it.

Health endpoints

HealthServer is an aiohttp server (default port 8080; port=0 auto-finds a free port in the 8080–9000 range). It exposes:

Method Path Purpose
GET /health Liveness probe.
GET /ready Readiness probe.
GET /metrics Prometheus text exposition.
GET /state Capture state for migration.
POST /state Restore state for migration.
GET /prestop Pre-termination hook.
GET / Root info.
HealthServer(port=8080, metrics_collector=None, state_getter=None,
             state_restorer=None, prestop_handler=None,
             state_token=None, transfer_config=None)

/health — liveness

Runs LivenessProbe.check() and returns 200 when healthy or 503 otherwise. It always adds an X-Dory-SDK-Version response header, which the orchestrator uses to detect that a pod is running the Dory SDK. The default liveness probe always reports healthy; add custom conditions with LivenessProbe.add_check().

/ready — readiness gating

Runs ReadinessProbe.check() and returns 200 / 503.

Warning

A processor is not ready until mark_ready() is called. Until then /ready returns 503, so Kubernetes will not route traffic to the pod. Call mark_ready() once startup and state restore are complete; call mark_not_ready() to drain.

ReadinessProbe exposes mark_ready(), mark_not_ready(), and is_ready(). A StartupProbe is also available.

/metrics

Returns metrics_collector.export_prometheus() (content type text/plain; version=0.0.4), or a stub when no collector is attached. See Metrics.

GET / POST /state

These power live migration: GET captures state, POST restores it. Both authenticate with a bearer token (env DORY_STATE_TOKEN, compared in constant time).

  • GET /state runs state_getter in a thread pool with a timeout. Responses: 200 on success, 504 on timeout, 413 if the state is too large, 401 on auth failure, 503 if no getter is configured.
  • POST /state validates the payload size, parses JSON, and runs the async state_restorer under a restore timeout.

Migration mechanics are covered in State Management.

/prestop

Calls mark_not_ready() and then the configured prestop_handler. Wire this to a Kubernetes preStop lifecycle hook so the pod stops receiving traffic and flushes state before termination.

Metrics

MetricsCollector(prefix="dory") is a home-grown Prometheus text exporter — there is no dependency on prometheus_client. It is served at /metrics by HealthServer.

Note

Because the exporter is built in, you do not install or import prometheus_client. export_prometheus() produces the exposition text directly.

Standard metrics

All standard metrics are prefixed dory_:

Metric Type Description
dory_startup_duration_seconds gauge Time to start up.
dory_shutdown_duration_seconds gauge Time to shut down.
dory_state_save_duration_seconds gauge Time to save state.
dory_state_load_duration_seconds gauge Time to load state.
dory_state_size_bytes gauge Serialized state size.
dory_restart_count counter Process restarts.
dory_golden_image_resets_total counter Golden-image resets.
dory_health_check_failures_total counter Failed health checks.
dory_processor_info gauge Info metric, labels processor_id / version / pod.

Convenience recorders set these for you: record_startup_started(), record_startup_completed(), record_state_save(duration, size_bytes=0), and similar.

Custom metrics

Register and update your own metrics — they auto-register on first use:

collector.set_gauge("queue_depth", 42, labels={"queue": "ingest"})
collector.inc_counter("messages_processed_total", 1.0, labels={"topic": "events"})

set_gauge(name, value, labels=None) sets a gauge; inc_counter(name, value=1.0, labels=None) increments a counter.

Structured logging

setup_logging(level="INFO") configures JSON structured logging: a single StreamHandler to stdout, with kubernetes, urllib3, and aiohttp quieted to WARNING. The only knob is the log level. DoryApp calls it with config.log_level (see Configuration).

from dory.logging import get_logger

log = get_logger(__name__, extra={"component": "ingest"})
log.info("processed batch", extra={"count": 100})

Use get_logger(name, extra=None) to obtain a logger. An alternative request-id-aware text logger is available via configure_logging_with_request_id(), which injects a request_id field onto every record.

OpenTelemetry

OpenTelemetry is an optional dependency — the module imports and no-ops cleanly when it is absent (guarded by an OTEL_AVAILABLE flag), so tracing never crashes a processor that lacks the OTel libraries.

BaseProcessor initializes self.otel:

self.otel = OpenTelemetryManager(
    service_name="dory-app",
    service_version=os.getenv("DORY_APP_VERSION", "1.0.0"),
    environment="production",
    console_export=True,
).initialize()

Note

Nothing is auto-traced. Tracing is opt-in — you create spans where you want them.

Create spans with the context manager or the decorator (both no-op when OTel is not initialized):

with self.otel.create_span("fetch_batch", attributes={"size": 100}):
    rows = fetch()

@self.otel.trace(name="process_item")
async def process_item(item):
    ...

OpenTelemetryManager also provides get_tracer(), add_span_attributes(), record_exception(), W3C context propagation via inject_context(headers) / extract_context(headers), and shutdown(). Module globals initialize_otel / get_global_otel and no-op-safe create_span / trace_function are available for use outside a processor.

Middleware

BaseProcessor auto-wires three middleware components (best-effort) for tracking work in flight.

Request tracker

self.request_tracker (RequestTracker(max_history=1000, enable_history=True)) records each unit of work. The track async context manager yields a request id and auto-completes the request as SUCCESS, FAILED, TIMEOUT, or CANCELLED:

async with self.request_tracker.track("ingest", metadata={"topic": "events"}) as rid:
    await do_work()

Inspect aggregates with get_metrics() / get_stats().

Request-id middleware

self.request_id_middleware (RequestIdMiddleware) is ContextVar-based and propagates a request id (header X-Request-ID by default). Use with_request_id(request_id=None) to scope an id, and extract_request_id(headers) / inject_request_id(headers) to cross service boundaries. Module helpers: generate_request_id() (uuid4), get_current_request_id(), set_request_id().

Connection tracker

self.connection_tracker (ConnectionTracker) registers long-lived connections and runs a background health-check loop that auto-closes idle ones (defaults: health_check_interval=60.0, idle_timeout=300.0, auto_close_on_idle=True):

conn_id = await self.connection_tracker.register_connection(
    conn, name="db", close_func=conn.close)
async with self.connection_tracker.use_connection(conn_id):
    await conn.execute(sql)

See API Reference for complete signatures.