Skip to content

Edge & Failover

The Dory SDK runs processors on edge nodes that may lose connectivity, get replaced, or temporarily run two copies of the same logical processor. To keep a single writer at any moment — and hand off cleanly when a node fails — the SDK provides a small set of composable edge primitives:

Primitive Responsibility
FencingManager Epoch-based fencing token; the cluster-wide lock that prevents split-brain.
RoleManager PRIMARY/STANDBY state machine built on top of fencing.
HeartbeatManager Periodic liveness report to the orchestrator.
EdgeHealthReporter Rolls connectivity + role + fencing into a health verdict.
AdaptiveProcessor Wires all of the above together from detected workload context.
CameraGeolocalizer Pixel-to-geo conversion for tagging results with a location.

These give processor-level safety. Node-level failover (detecting a dead node, draining, and starting a replacement) is driven by the orchestrator — see Orchestrator edge failover.

Note

Edge primitives are exported from the top-level dory package. See API Reference for the full export list.

Fencing and epochs

Fencing solves the split-brain problem: when a network partition or a slow shutdown leaves two instances both believing they are the active processor, you must guarantee that only one of them can keep writing.

The fence is a monotonically increasing per-processor epoch:

  1. When an instance acquires the processor, the epoch is atomically incremented and returned inside a FencingToken.
  2. Every write is gated on the token still being valid for the current epoch.
  3. When a newer instance acquires, the epoch bumps again. The older token's epoch is now behind the current epoch, so validate() returns False — the stale instance must stop writing.

Because the epoch only ever moves forward, two writers can never both hold a valid token at the same epoch.

FencingManager

from dory.edge import FencingManager, FencingConfig

mgr = FencingManager(FencingConfig(backend="redis"))
token = await mgr.acquire("proc-1", node_id="node-a")
if await mgr.validate(token):
    ...  # safe to write
await mgr.release(token)

FencingConfig fields:

Field Default Notes
acquire_timeout_sec 10.0 Max wait to acquire the lock.
lock_ttl_sec 60.0 Lock expiry; auto-refreshed in the background.
refresh_interval_sec 15.0 Re-EXPIRE cadence. Must be < lock_ttl_sec.
backend "redis" "redis" (production) or "memory" (testing).
redis_url None Falls back to env DORY_REDIS_URL, then redis://localhost:6379.
key_prefix "dory:fencing" Redis key namespace.

Async methods: acquire(processor_id, node_id, timeout_sec=None), validate(token), validate_or_raise(token) (raises StaleEpochError), release(token), get_current_epoch(processor_id), get_active_token(processor_id), close().

FencingToken(processor_id, node_id, epoch, acquired_at, token_id) exposes is_valid(current_epoch) which is true when epoch >= current_epoch.

Warning

The "memory" backend is process-local and provides no cross-node protection. Use "redis" whenever more than one node could run the same processor.

RoleManager

RoleManager is the state machine that turns a fencing token into an operational role. It creates its own FencingManager internally, so you pass it a FencingConfig, not a manager.

from dory.edge import RoleManager, FencingConfig

roles = RoleManager("proc-1", node_id="node-a",
                    fencing_config=FencingConfig(backend="redis"))
await roles.start()            # acquires token -> PRIMARY
if roles.role.can_process():
    ...

ProcessorRole values and capabilities:

Role can_process() can_write_state()
INITIALIZING no no
PRIMARY yes yes
STANDBY no no
DRAINING no yes
FENCED no no
STOPPED no no

Only PRIMARY may process. DRAINING may still write so it can flush state during a graceful handoff (see State Management).

Async methods: start() (acquire → PRIMARY, or STANDBY if start_as_standby=True or acquisition fails), promote_to_primary(), demote_to_standby(reason), validate_fencing(), validate_fencing_or_fence() (a stale token transitions to FENCED), stop(reason). Properties: role, processor_id, node_id, fencing_token; get_status() is a method.

Note

A background task (_monitor_fencing) re-validates the token every 5 seconds and fences a PRIMARY whose epoch has gone stale — protecting against a missed acquisition by a newer instance.

HeartbeatManager

HeartbeatManager reports liveness to the orchestrator. Identity is set separately so the manager can be constructed before the role is known.

from dory.edge import HeartbeatManager, HeartbeatConfig

hb = HeartbeatManager(HeartbeatConfig())
hb.set_processor_info("proc-1", node_id="node-a",
                      role="PRIMARY", fencing_token=token)
await hb.send_heartbeat()
print(hb.status)        # property -> ConnectivityStatus

HeartbeatConfig fields:

Field Default
orchestrator_url None (env DORY_ORCHESTRATOR_URL, else http://dory-orchestrator:8080)
interval_sec 10.0
timeout_sec 5.0
missed_threshold 3
recovery_threshold 2
latency_threshold_ms 500.0
auto_demote_on_disconnect True
demote_grace_period_sec 30.0

send_heartbeat() POSTs to {orchestrator_url}/api/v1/edge/heartbeat and returns a HeartbeatResponse{acknowledged, orchestrator_time, directive, message}. .status and .metrics are properties; is_connected() / is_disconnected() are methods.

Warning

The orchestrator only emits "continue" or "shutdown" directives ("demote"/"promote" are parsed but not emitted). The heartbeat loop logs the directive; acting on a "shutdown" is the caller's responsibility.

EdgeHealthReporter

EdgeHealthReporter(heartbeat_manager=None, role_manager=None) aggregates edge state for health probes.

from dory.edge import EdgeHealthReporter

reporter = EdgeHealthReporter(heartbeat_manager=hb, role_manager=roles)
status = await reporter.check_health()   # healthy | degraded | unhealthy

get_health_status() returns a dict: is_edge, connectivity, is_connected, heartbeat (success_rate, avg_latency_ms, consecutive_failures), role, can_process, fencing_epoch. check_health() maps connectivity to healthy / degraded / unhealthy and feeds the SDK health server (see Observability).

AdaptiveProcessor: putting it together

AdaptiveProcessor.start() assembles the primitives from detected workload context, so you rarely instantiate them by hand:

  1. WorkloadDetector.detect()WorkloadContext (exposes node_name).
  2. Build FencingManager(fencing_config).
  3. Build RoleManager(processor_id, node_id=context.node_name, fencing_config).
  4. If an orchestrator_url is configured, build HeartbeatConfig + HeartbeatManager.
  5. role_manager.start(), then heartbeat.start().

It tracks an OperationMode: EDGE_CONNECTED, EDGE_OFFLINE, CLOUD_FAILOVER, CLOUD_NORMAL, DEGRADED.

process_with_fencing(operation) validates fencing before and after running the operation, raising FenceViolation if the token went stale mid-flight — closing the window where a demoted PRIMARY writes a result it computed while still authoritative.

from dory.edge.adaptive import AdaptiveProcessor, AdaptiveConfig

cfg = AdaptiveConfig(app_name="detector", processor_id="proc-1",
                     orchestrator_url="http://dory-orchestrator:8080")
proc = AdaptiveProcessor(cfg)
await proc.start()
result = await proc.process_with_fencing(do_work)

AdaptiveConfig(app_name, processor_id, orchestrator_url=None, edge_config=EdgeConfig(), fencing_config=None, custom_detector=None).

Geolocalization

dory.geo.geolocalizer is pure Python (no native deps) and converts an image-space detection into a GeoPoint. It is commonly used to compute the geohash routing key for published results (see Output & Events).

from dory.geo.geolocalizer import (
    CameraGeolocalizer, GeoMethod, ReferencePoint, BoundingBox,
)

refs = [ReferencePoint(image_x, image_y, lat, lng) for ...]  # >=4 for homography
geo = CameraGeolocalizer(refs, method=GeoMethod.HOMOGRAPHY)
point = geo.estimate(BoundingBox(left, top, width, height))   # -> GeoPoint(lat, lng)
Type Notes
GeoMethod HOMOGRAPHY (≥4 refs) or TRILATERATION (≥3 refs).
GeoPoint(lat, lng) Output coordinate.
ReferencePoint(image_x, image_y, lat, lng) Calibration correspondence.
BoundingBox(left, top, width, height) .ground_point = bottom-centre, used for estimation.

CameraGeolocalizer methods: estimate(box), estimate_batch(boxes), reprojection_errors(), mean_reprojection_error(). Helpers estimate_location(box, reference_points, method=...) and estimate_locations_batch(...) skip the explicit object.

Tip

Check mean_reprojection_error() after calibration. A large value means your reference points are inconsistent and every estimate will be off.