Skip to content

DAG Runtime Specification

Package: @robota-sdk/dag-runtime v0.1.0

Scope

dag-runtime owns the runtime orchestration layer for DAG execution. Its responsibilities are:

  • Creating DAG runs from published definitions with idempotent run-key semantics.
  • Resolving time semantics (logical date) based on trigger type via TimeSemanticsService from dag-core.
  • Transitioning DAG run and task run states through DagRunStateMachine and TaskRunStateMachine from dag-core.
  • Identifying entry nodes (nodes with empty dependsOn) and enqueuing them as initial task runs.
  • Querying DAG run status along with associated task runs.
  • Cancelling DAG runs through state machine transition validation.
  • Publishing execution progress events (STARTED, FAILED) via IRunProgressEventReporter.

Boundaries

Depends on

  • @robota-sdk/dag-core -- all domain types, state machines, port interfaces, error builders, time semantics, and execution progress event constants.

Does not own

  • Worker execution loops -- owned by dag-worker.
  • Scheduler triggers -- owned by dag-scheduler.
  • Storage and queue implementations -- consumed through port interfaces (IStoragePort, IQueuePort, IClockPort) defined in dag-core.
  • API transport -- owned by dag-api.
  • DAG definition authoring or validation -- owned by dag-core and dag-designer.
  • Projection or read-model concerns -- owned by dag-projection.
  • Node definition or execution logic -- owned by dag-nodes and dag-worker.

Import direction

All imports flow toward dag-core. This package does not import from any sibling DAG package.

Architecture Overview

The package exposes three service classes, each with a single responsibility:

  1. RunOrchestratorService -- The primary orchestration service. Accepts port dependencies via constructor injection (IStoragePort, IQueuePort, IClockPort, optional IRunProgressEventReporter). Provides three public methods:
    • createRun -- Resolves the definition, validates publication status, resolves time semantics, generates an idempotent run key, and persists a new DagRun in created status. Handles race conditions by re-querying on storage conflict.
    • startCreatedRun -- Takes a dagRunId in created status, parses the definition snapshot, identifies entry nodes, transitions the run through created -> queued -> running, creates task runs for entry nodes, and enqueues queue messages. On enqueue failure, transitions the run to failed and all affected tasks to cancelled.
    • startRun -- Composes createRun and startCreatedRun into a single idempotent operation. If the run already exists and is past created, returns existing task run IDs.

Behavioral Contracts

Entry Task Enqueue Failure Recovery

When dispatchEntryTasks enqueues entry tasks and one enqueue fails mid-batch:

  1. Tasks already enqueued successfully remain in the queue (no rollback of successful enqueues).
  2. The failed task's TaskRun is transitioned to cancelled status.
  3. All previously created TaskRun records (including successfully enqueued ones) are transitioned to cancelled status. Workers that dequeue already-enqueued messages will encounter a cancelled task and skip execution via state machine rejection.
  4. The DAG run is transitioned to failed status.
  5. A FAILED execution progress event is published.

Entry nodes that have not yet been iterated in the dispatch loop do not have TaskRun records created, so no cleanup is needed for them. This means the storage may not contain a complete audit trail of all intended entry tasks — only those that were created before the failure.

startCreatedRun Idempotency

startCreatedRun is safe to call on a run that is already past the created state. If the run is in queued, running, or any terminal state, the method returns the existing task run IDs without re-enqueuing or re-transitioning. This prevents duplicate task creation on retry.

  1. RunQueryService -- Read-only service. Accepts IStoragePort. Retrieves a DagRun and its associated ITaskRun[] by dagRunId.

  2. RunCancelService -- Accepts IStoragePort and IClockPort. Validates the cancel transition through DagRunStateMachine and updates run status to cancelled.

All services use the TResult<T, IDagError> pattern for error handling. There are no fallback paths or silent error swallowing.

Run Key Idempotency

Run keys follow the format {dagId}:{logicalDate} or {dagId}:{logicalDate}:rerun:{rerunKey}. Duplicate run key detection prevents redundant DAG runs. A storage-level race condition (concurrent create) is handled by re-querying the existing run.

State Transitions

All state transitions are delegated to DagRunStateMachine and TaskRunStateMachine from dag-core. The orchestrator does not define or override transition rules.

Type Ownership

TypeOwnerRole
IStartRunInputdag-runtimeInput contract for startRun and createRun
ICreateRunInputdag-runtimeAlias of IStartRunInput for createRun
ICreateRunResultdag-runtimeReturn value of createRun (includes status)
IStartRunResultdag-runtimeReturn value of startRun (includes taskRunIds)
IRunQueryResultdag-runtimeReturn value of getRun (dagRun + taskRuns)
IRunCancelResultdag-runtimeReturn value of cancelRun
RunOrchestratorServicedag-runtimeOrchestration service class
RunQueryServicedag-runtimeQuery service class
RunCancelServicedag-runtimeCancel service class
DAG_RUNTIME_PACKAGE_NAMEdag-runtimePackage name constant
IDagDefinitiondag-coreImported -- DAG definition structure
IDagRundag-coreImported -- DAG run record
ITaskRundag-coreImported -- Task run record
IDagErrordag-coreImported -- Structured error type
TResultdag-coreImported -- Result monad
TDagRunStatusdag-coreImported -- Run status union
TDagTriggerTypedag-coreImported -- Trigger type union
TPortPayloaddag-coreImported -- Port payload type
IStoragePortdag-coreImported -- Storage port interface
IQueuePortdag-coreImported -- Queue port interface
IClockPortdag-coreImported -- Clock port interface
IQueueMessagedag-coreImported -- Queue message structure
IRunProgressEventReporterdag-coreImported -- Event reporter port
DagRunStateMachinedag-coreImported -- Run state transition logic
TaskRunStateMachinedag-coreImported -- Task state transition logic
TimeSemanticsServicedag-coreImported -- Logical date resolution

Public API Surface

ExportKindSignature Summary
RunOrchestratorServiceClassconstructor(storage: IStoragePort, queue: IQueuePort, clock: IClockPort, runProgressEventReporter?: IRunProgressEventReporter)
RunOrchestratorService.createRunMethod(input: ICreateRunInput) => Promise<TResult<ICreateRunResult, IDagError>>
RunOrchestratorService.startCreatedRunMethod(dagRunId: string) => Promise<TResult<IStartRunResult, IDagError>>
RunOrchestratorService.startRunMethod(input: IStartRunInput) => Promise<TResult<IStartRunResult, IDagError>>
RunQueryServiceClassconstructor(storage: IStoragePort)
RunQueryService.getRunMethod(dagRunId: string) => Promise<TResult<IRunQueryResult, IDagError>>
RunCancelServiceClassconstructor(storage: IStoragePort, clock: IClockPort)
RunCancelService.cancelRunMethod(dagRunId: string) => Promise<TResult<IRunCancelResult, IDagError>>
IStartRunInputInterface{ dagId, version?, trigger, logicalDate?, rerunKey?, input }
ICreateRunInputInterfaceExtends IStartRunInput
ICreateRunResultInterface{ dagRunId, dagId, version, logicalDate, status }
IStartRunResultInterface{ dagRunId, dagId, version, logicalDate, taskRunIds }
IRunQueryResultInterface{ dagRun: IDagRun, taskRuns: ITaskRun[] }
IRunCancelResultInterface{ dagRunId, status: 'cancelled' }
DAG_RUNTIME_PACKAGE_NAMEConstant'@robota-sdk/dag-runtime'

Extension Points

  1. Storage port -- Any implementation of IStoragePort can be injected. The package ships no storage implementation; tests use InMemoryStoragePort from dag-core.

  2. Queue port -- Any implementation of IQueuePort can be injected. Tests use InMemoryQueuePort from dag-core.

  3. Clock port -- Any implementation of IClockPort can be injected. Tests use FakeClockPort from dag-core.

  4. Run progress event reporter -- Optional IRunProgressEventReporter can be provided to the orchestrator to receive execution progress events. When omitted, no events are published.

  5. Rerun key -- The rerunKey field in IStartRunInput allows creating distinct runs for the same DAG and logical date, enabling controlled rerun semantics without colliding with the original run key.

Error Taxonomy

All errors use IDagError from dag-core and are constructed via buildValidationError or buildDispatchError.

Validation Errors

CodeSourceCondition
DAG_VALIDATION_DEFINITION_NOT_FOUNDcreateRunNo published definition found for the given dagId (and optional version)
DAG_VALIDATION_DEFINITION_NOT_PUBLISHEDcreateRunDefinition exists but its status is not published
DAG_VALIDATION_MISSING_LOGICAL_DATEcreateRunScheduled trigger without a logicalDate (via TimeSemanticsService)
DAG_VALIDATION_DAG_RUN_NOT_FOUNDstartCreatedRun, getRun, cancelRunNo DagRun found for the given dagRunId
DAG_VALIDATION_DEFINITION_SNAPSHOT_MISSINGstartCreatedRunDagRun record has empty or missing definitionSnapshot
DAG_VALIDATION_DEFINITION_SNAPSHOT_INVALIDstartCreatedRundefinitionSnapshot fails structural parse
DAG_VALIDATION_RUN_INPUT_SNAPSHOT_INVALIDstartCreatedRuninputSnapshot fails JSON object parse
DAG_VALIDATION_NO_ENTRY_NODEstartCreatedRunDefinition contains no nodes with empty dependsOn
DAG_VALIDATION_PAYLOAD_INVALIDinternalParsed payload is not a JSON object
DAG_VALIDATION_PAYLOAD_PARSE_FAILEDinternalPayload string is not valid JSON
DAG_VALIDATION_DEFINITION_SNAPSHOT_PARSE_FAILEDinternalDefinition snapshot string is not valid JSON

Dispatch Errors

CodeSourceCondition
DAG_DISPATCH_DAG_RUN_CREATE_FAILEDcreateRunStorage-level failure when creating the DagRun record (and no raced run found)
DAG_DISPATCH_ENQUEUE_FAILEDstartCreatedRunQueue enqueue call throws for an entry task; triggers run failure and task cancellation

State Machine Errors

State transition failures (e.g., attempting to cancel a terminal run) are returned directly from DagRunStateMachine.transition and TaskRunStateMachine.transition in dag-core. This package does not wrap or remap those errors.

Class Contract Registry

Interface Implementations

No classes in this package use the implements keyword. All port dependencies are consumed via constructor injection.

Inheritance Chains

None. Service classes are standalone (no extends).

Port Consumption via DI

Service ClassInjected Port (from dag-core)Location
RunOrchestratorServiceIStoragePort, IQueuePort, IClockPort, IRunProgressEventReportersrc/services/run-orchestrator-service.ts
RunQueryServiceIStoragePortsrc/services/run-query-service.ts
RunCancelServiceIStoragePort, IClockPortsrc/services/run-cancel-service.ts

Cross-Package Port Consumers

Port (Owner)Consumer ClassLocation
IStoragePort (dag-core)RunOrchestratorService, RunQueryService, RunCancelServicesrc/services/
IQueuePort (dag-core)RunOrchestratorServicesrc/services/run-orchestrator-service.ts
IClockPort (dag-core)RunOrchestratorService, RunCancelServicesrc/services/
IRunProgressEventReporter (dag-core)RunOrchestratorServicesrc/services/run-orchestrator-service.ts

Test Strategy

Tests are located in packages/dag-runtime/src/__tests__/ and executed via vitest.

Test files

  • run-orchestrator-service.test.ts -- Tests for RunOrchestratorService covering:

    • Successful run creation and entry task enqueue
    • Missing definition error
    • Scheduled trigger without logical date error
    • Idempotent duplicate run key handling
    • Two-phase create-then-start workflow
    • Idempotent startCreatedRun on already-running run
    • No-entry-node validation
    • Enqueue failure transitions (run to failed, task to cancelled)
    • Race condition handling during concurrent createDagRun
  • run-query-cancel-service.test.ts -- Tests for RunQueryService and RunCancelService covering:

    • Query of run with associated task runs
    • Cancel from running status with timestamp verification

Test infrastructure

All tests use in-memory fakes from dag-core:

  • InMemoryStoragePort -- in-memory storage implementation
  • InMemoryQueuePort -- in-memory queue implementation
  • FakeClockPort -- deterministic clock for reproducible timestamps

Custom test doubles are defined locally:

  • FailingQueuePort -- simulates enqueue failure at a configurable call count
  • RacyDagRunStoragePort -- extends InMemoryStoragePort to simulate storage race conditions

Run command

bash
pnpm --filter @robota-sdk/dag-runtime test

Released under the MIT License.