Skip to content

Core Concepts

This page covers the runtime model of the Dory SDK: the entry point, the application lifecycle, the BaseProcessor contract, the execution context, the stateful descriptor, and the function-based dory.simple API.

DoryApp — the entry point

DoryApp owns the lifecycle. You instantiate it, then hand it your processor class:

from dory import DoryApp, BaseProcessor

DoryApp().run(MyProcessor)

Constructor and run:

DoryApp(config_file: str | None = None, log_level: str | None = None)
DoryApp.run(processor_class) -> None   # wraps asyncio.run(...)

run takes the class, not an instance — the SDK instantiates it after building the ExecutionContext.

Application lifecycle

DoryApp.run drives these phases in order:

  1. Initialize — load config, set up logging, build ExecutionContext.from_environment(), create the StateManager, MetricsCollector, RestartDetector, and RecoveryDecisionMaker, detect prior restarts, then instantiate processor_class(context).
  2. Health server — start the HealthServer on config.health_port (default 8080).
  3. Lifecycle:
  4. Probe for saved state.
  5. RecoveryDecisionMaker.decide(...) chooses a recovery strategy.
  6. Run startup() (bounded by config.startup_timeout_sec, default 30).
  7. Connect the publisher if one exists.
  8. If state is present, call restore_state(state). On failure, on_state_restore_failed(error) runs; if it returns False, a DoryStateError is raised.
  9. mark_ready().
  10. Run the main loop run() (a CancelledError is caught on shutdown).
  11. Cleanup — shutdown and final state save.
graph LR
  A[initialize] --> B[health server :8080]
  B --> C[startup]
  C --> D{state present?}
  D -- yes --> E[restore_state]
  D -- no --> F[mark_ready]
  E --> F
  F --> G[run main loop]
  G --> H[shutdown + save_state]

See State Management for recovery strategy details and Configuration for timeout settings.

BaseProcessor

Subclass BaseProcessor and implement at least the abstract run(). The constructor is __init__(self, context=None); when a context is supplied, the SDK auto-initializes a set of helper components (each best-effort — an ImportError is swallowed):

Attribute What it is
error_classifier Error classification for instrumented handlers
circuit_breakers dict keyed "database", "external_api", "cache"; each CircuitBreaker(failure_threshold=5, success_threshold=2, timeout_seconds=30.0, half_open_max_calls=3)
otel OpenTelemetryManager(service_name="dory-app")
request_tracker, request_id_middleware, connection_tracker Request and connection instrumentation
publisher RabbitMQ publisher — only when DORY_RABBITMQ_OAUTH2_TOKEN_URL is set, else None

See Resilience for circuit breakers and Observability for OpenTelemetry.

There is also a class attribute state_schema: dict[str, type] | None = None you can set for optional state validation.

Hooks

Hook Signature Default behavior
run async run(self) abstract — your main loop
startup async startup(self) no-op
shutdown async shutdown(self) no-op
get_state get_state(self) -> dict returns stateful vars
restore_state async restore_state(self, state) restores stateful vars
on_state_restore_failed async on_state_restore_failed(self, error) -> bool returns True
on_rapid_restart_detected async on_rapid_restart_detected(self, restart_count) -> bool returns True
on_health_check_failed async on_health_check_failed(self, error) -> bool returns False
reset_caches reset_caches(self) clears caches (golden reset)

Return values gate behavior

on_state_restore_failed returning False causes startup to raise DoryStateError. on_health_check_failed defaults to False — override and return True only if a failed health check should be treated as recoverable.

run_loop — the shutdown-aware iterator

async def run(self) -> None:
    async for i in self.run_loop(interval=1.0):
        ...  # body runs once per `interval`
run_loop(self, interval: float = 1.0, check_migration: bool = True) -> AsyncIterator[int]
  • Loops while not context.is_shutdown_requested(), yielding the iteration index i.
  • Yields before sleeping, so the first iteration runs immediately, then it sleeps interval seconds between iterations.
  • When check_migration is true and a migration is imminent, it logs only — it does not break the loop.

run_loop does not exit on migration

run_loop never auto-exits when a migration is signaled; it only logs. Shutdown happens via signals (below), which set is_shutdown_requested() and end the loop. Do not rely on run_loop to stop your processor for a migration.

Other helpers: is_shutting_down() -> bool, and async publish(event_type, location, payload, *, headers=None, exchange=None) which requires a publisher (raises RuntimeError if publisher is None).

Instrumented handler methods

The BaseProcessor metaclass auto-wraps async methods named handle_* or _handle_* with instrumentation (request id, request tracking, tracing span, error classification). Lifecycle methods are excluded.

async def handle_frame(self, frame):   # automatically instrumented
    ...

Opt into instrumentation by naming

Name a method handle_* to get request-level instrumentation for free. Methods named otherwise are not wrapped.

ExecutionContext

A dataclass passed to your processor. Fields: pod_name, pod_namespace, processor_id, attempt_number (default 1), is_migrating (default False), previous_pod_name. There is no node_name field.

These are methods, not properties

config(), logger(), is_shutdown_requested(), is_migration_imminent(), get_env(key, default=None), request_shutdown(), signal_migration(), and set_attempt_number(n) are all methods. Call them with ().

Method Returns
config() app-only env vars (those not prefixed DORY_/KUBERNETES_)
logger() a logging.Logger
get_env(key, default=None) raw env lookup
is_shutdown_requested() bool
is_migration_imminent() bool
request_shutdown() / signal_migration() / set_attempt_number(n) control methods

from_environment() reads pod identity from DORY_POD_NAME/POD_NAME, DORY_POD_NAMESPACE/POD_NAMESPACE (default "default"), DORY_PROCESSOR_ID/PROCESSOR_ID (else derived from the pod name), plus DORY_IS_MIGRATING and DORY_MIGRATED_FROM. See Configuration.

stateful / StatefulVar

Declare persistent state as class attributes with the stateful(default=None) descriptor:

class P(BaseProcessor):
    counter = stateful(0)
    sessions = stateful(dict)   # use a factory for mutable defaults

Each stateful declaration is collected per class and round-tripped automatically by get_state/restore_state. The descriptor type is StatefulVar (exported from dory).

Use factories for mutable defaults

Pass dict or list (the callable, not {}/[]) for mutable defaults so each instance gets its own object.

There is no @processor decorator in dory.decorators

stateful is the only public decorator-style API in dory.decorators. The @processor decorator lives in dory.simple (below).

The dory.simple function API

For simple processors, skip the class. Import directly — dory.simple is not re-exported at the top level:

from dory.simple import processor, state
Symbol Purpose
state(default=None) -> StateVar module-level state; access via .value (lazy init)
@processor bare or @processor(config_file=..., log_level=...); collects module-level StateVar globals into a processor; if __name__ == "__main__", immediately runs DoryApp().run(...)
run_processor(func, *, config_file=None, log_level=None) run a function-based processor explicitly
ContextWrapper wraps ExecutionContext, adds run_loop(...)
from dory.simple import processor, state

counter = state(0)

@processor
async def main(ctx):
    async for i in ctx.run_loop(interval=1.0):
        counter.value += 1

ContextWrapper uses properties, not methods

Inside a @processor function, ctx is a ContextWrapper. Its shutdown_requested and migration_imminent are boolean properties — note this differs from the method form (is_shutdown_requested()) on the raw ExecutionContext. The @processor decorator must be at module top level.

Signals, shutdown, and state save

DoryApp installs signal handlers:

Signal / trigger Effect
SIGTERM / SIGINT request_shutdown()mark_not_ready() → sleep 0.5s → shutdown() (timeout config.shutdown_timeout_sec or 30) → close publisher → get_state() + state_manager.save_state(processor_id, state)
GET /prestop (PreStop hook) request_shutdown() + mark_not_ready() + immediate state save
SIGUSR1 snapshot save

This sequence is what preserves in-memory stateful data across migrations and restarts. See State Management for how the saved state is keyed, serialized, and restored.