Core Concepts¶
This page covers the runtime model of the Dory SDK: the entry point, the application lifecycle, the BaseProcessor contract, the execution context, the stateful descriptor, and the function-based dory.simple API.
DoryApp — the entry point¶
DoryApp owns the lifecycle. You instantiate it, then hand it your processor class:
Constructor and run:
DoryApp(config_file: str | None = None, log_level: str | None = None)
DoryApp.run(processor_class) -> None # wraps asyncio.run(...)
run takes the class, not an instance — the SDK instantiates it after building the ExecutionContext.
Application lifecycle¶
DoryApp.run drives these phases in order:
- Initialize — load config, set up logging, build
ExecutionContext.from_environment(), create theStateManager,MetricsCollector,RestartDetector, andRecoveryDecisionMaker, detect prior restarts, then instantiateprocessor_class(context). - Health server — start the
HealthServeronconfig.health_port(default 8080). - Lifecycle:
- Probe for saved state.
RecoveryDecisionMaker.decide(...)chooses a recovery strategy.- Run
startup()(bounded byconfig.startup_timeout_sec, default 30). - Connect the publisher if one exists.
- If state is present, call
restore_state(state). On failure,on_state_restore_failed(error)runs; if it returnsFalse, aDoryStateErroris raised. mark_ready().- Run the main loop
run()(aCancelledErroris caught on shutdown). - Cleanup — shutdown and final state save.
graph LR
A[initialize] --> B[health server :8080]
B --> C[startup]
C --> D{state present?}
D -- yes --> E[restore_state]
D -- no --> F[mark_ready]
E --> F
F --> G[run main loop]
G --> H[shutdown + save_state]
See State Management for recovery strategy details and Configuration for timeout settings.
BaseProcessor¶
Subclass BaseProcessor and implement at least the abstract run(). The constructor is __init__(self, context=None); when a context is supplied, the SDK auto-initializes a set of helper components (each best-effort — an ImportError is swallowed):
| Attribute | What it is |
|---|---|
error_classifier |
Error classification for instrumented handlers |
circuit_breakers |
dict keyed "database", "external_api", "cache"; each CircuitBreaker(failure_threshold=5, success_threshold=2, timeout_seconds=30.0, half_open_max_calls=3) |
otel |
OpenTelemetryManager(service_name="dory-app") |
request_tracker, request_id_middleware, connection_tracker |
Request and connection instrumentation |
publisher |
RabbitMQ publisher — only when DORY_RABBITMQ_OAUTH2_TOKEN_URL is set, else None |
See Resilience for circuit breakers and Observability for OpenTelemetry.
There is also a class attribute state_schema: dict[str, type] | None = None you can set for optional state validation.
Hooks¶
| Hook | Signature | Default behavior |
|---|---|---|
run |
async run(self) |
abstract — your main loop |
startup |
async startup(self) |
no-op |
shutdown |
async shutdown(self) |
no-op |
get_state |
get_state(self) -> dict |
returns stateful vars |
restore_state |
async restore_state(self, state) |
restores stateful vars |
on_state_restore_failed |
async on_state_restore_failed(self, error) -> bool |
returns True |
on_rapid_restart_detected |
async on_rapid_restart_detected(self, restart_count) -> bool |
returns True |
on_health_check_failed |
async on_health_check_failed(self, error) -> bool |
returns False |
reset_caches |
reset_caches(self) |
clears caches (golden reset) |
Return values gate behavior
on_state_restore_failed returning False causes startup to raise DoryStateError. on_health_check_failed defaults to False — override and return True only if a failed health check should be treated as recoverable.
run_loop — the shutdown-aware iterator¶
async def run(self) -> None:
async for i in self.run_loop(interval=1.0):
... # body runs once per `interval`
- Loops
while not context.is_shutdown_requested(), yielding the iteration indexi. - Yields before sleeping, so the first iteration runs immediately, then it sleeps
intervalseconds between iterations. - When
check_migrationis true and a migration is imminent, it logs only — it does not break the loop.
run_loop does not exit on migration
run_loop never auto-exits when a migration is signaled; it only logs. Shutdown happens via signals (below), which set is_shutdown_requested() and end the loop. Do not rely on run_loop to stop your processor for a migration.
Other helpers: is_shutting_down() -> bool, and async publish(event_type, location, payload, *, headers=None, exchange=None) which requires a publisher (raises RuntimeError if publisher is None).
Instrumented handler methods¶
The BaseProcessor metaclass auto-wraps async methods named handle_* or _handle_* with instrumentation (request id, request tracking, tracing span, error classification). Lifecycle methods are excluded.
Opt into instrumentation by naming
Name a method handle_* to get request-level instrumentation for free. Methods named otherwise are not wrapped.
ExecutionContext¶
A dataclass passed to your processor. Fields: pod_name, pod_namespace, processor_id, attempt_number (default 1), is_migrating (default False), previous_pod_name. There is no node_name field.
These are methods, not properties
config(), logger(), is_shutdown_requested(), is_migration_imminent(), get_env(key, default=None), request_shutdown(), signal_migration(), and set_attempt_number(n) are all methods. Call them with ().
| Method | Returns |
|---|---|
config() |
app-only env vars (those not prefixed DORY_/KUBERNETES_) |
logger() |
a logging.Logger |
get_env(key, default=None) |
raw env lookup |
is_shutdown_requested() |
bool |
is_migration_imminent() |
bool |
request_shutdown() / signal_migration() / set_attempt_number(n) |
control methods |
from_environment() reads pod identity from DORY_POD_NAME/POD_NAME, DORY_POD_NAMESPACE/POD_NAMESPACE (default "default"), DORY_PROCESSOR_ID/PROCESSOR_ID (else derived from the pod name), plus DORY_IS_MIGRATING and DORY_MIGRATED_FROM. See Configuration.
stateful / StatefulVar¶
Declare persistent state as class attributes with the stateful(default=None) descriptor:
class P(BaseProcessor):
counter = stateful(0)
sessions = stateful(dict) # use a factory for mutable defaults
Each stateful declaration is collected per class and round-tripped automatically by get_state/restore_state. The descriptor type is StatefulVar (exported from dory).
Use factories for mutable defaults
Pass dict or list (the callable, not {}/[]) for mutable defaults so each instance gets its own object.
There is no @processor decorator in dory.decorators
stateful is the only public decorator-style API in dory.decorators. The @processor decorator lives in dory.simple (below).
The dory.simple function API¶
For simple processors, skip the class. Import directly — dory.simple is not re-exported at the top level:
| Symbol | Purpose |
|---|---|
state(default=None) -> StateVar |
module-level state; access via .value (lazy init) |
@processor |
bare or @processor(config_file=..., log_level=...); collects module-level StateVar globals into a processor; if __name__ == "__main__", immediately runs DoryApp().run(...) |
run_processor(func, *, config_file=None, log_level=None) |
run a function-based processor explicitly |
ContextWrapper |
wraps ExecutionContext, adds run_loop(...) |
from dory.simple import processor, state
counter = state(0)
@processor
async def main(ctx):
async for i in ctx.run_loop(interval=1.0):
counter.value += 1
ContextWrapper uses properties, not methods
Inside a @processor function, ctx is a ContextWrapper. Its shutdown_requested and migration_imminent are boolean properties — note this differs from the method form (is_shutdown_requested()) on the raw ExecutionContext. The @processor decorator must be at module top level.
Signals, shutdown, and state save¶
DoryApp installs signal handlers:
| Signal / trigger | Effect |
|---|---|
SIGTERM / SIGINT |
request_shutdown() → mark_not_ready() → sleep 0.5s → shutdown() (timeout config.shutdown_timeout_sec or 30) → close publisher → get_state() + state_manager.save_state(processor_id, state) |
GET /prestop (PreStop hook) |
request_shutdown() + mark_not_ready() + immediate state save |
SIGUSR1 |
snapshot save |
This sequence is what preserves in-memory stateful data across migrations and restarts. See State Management for how the saved state is keyed, serialized, and restored.