DAG Runtime Specification
Package: @robota-sdk/dag-runtime v0.1.0
Scope
dag-runtime owns the runtime orchestration layer for DAG execution. Its responsibilities are:
- Creating DAG runs from published definitions with idempotent run-key semantics.
- Resolving time semantics (logical date) based on trigger type via
TimeSemanticsServicefromdag-core. - Transitioning DAG run and task run states through
DagRunStateMachineandTaskRunStateMachinefromdag-core. - Identifying entry nodes (nodes with empty
dependsOn) and enqueuing them as initial task runs. - Querying DAG run status along with associated task runs.
- Cancelling DAG runs through state machine transition validation.
- Publishing execution progress events (
STARTED,FAILED) viaIRunProgressEventReporter.
Boundaries
Depends on
@robota-sdk/dag-core-- all domain types, state machines, port interfaces, error builders, time semantics, and execution progress event constants.
Does not own
- Worker execution loops -- owned by
dag-worker. - Scheduler triggers -- owned by
dag-scheduler. - Storage and queue implementations -- consumed through port interfaces (
IStoragePort,IQueuePort,IClockPort) defined indag-core. - API transport -- owned by
dag-api. - DAG definition authoring or validation -- owned by
dag-coreanddag-designer. - Projection or read-model concerns -- owned by
dag-projection. - Node definition or execution logic -- owned by
dag-nodesanddag-worker.
Import direction
All imports flow toward dag-core. This package does not import from any sibling DAG package.
Architecture Overview
The package exposes three service classes, each with a single responsibility:
- RunOrchestratorService -- The primary orchestration service. Accepts port dependencies via constructor injection (
IStoragePort,IQueuePort,IClockPort, optionalIRunProgressEventReporter). Provides three public methods:createRun-- Resolves the definition, validates publication status, resolves time semantics, generates an idempotent run key, and persists a newDagRunincreatedstatus. Handles race conditions by re-querying on storage conflict.startCreatedRun-- Takes adagRunIdincreatedstatus, parses the definition snapshot, identifies entry nodes, transitions the run throughcreated -> queued -> running, creates task runs for entry nodes, and enqueues queue messages. On enqueue failure, transitions the run tofailedand all affected tasks tocancelled.startRun-- ComposescreateRunandstartCreatedRuninto a single idempotent operation. If the run already exists and is pastcreated, returns existing task run IDs.
Behavioral Contracts
Entry Task Enqueue Failure Recovery
When dispatchEntryTasks enqueues entry tasks and one enqueue fails mid-batch:
- Tasks already enqueued successfully remain in the queue (no rollback of successful enqueues).
- The failed task's
TaskRunis transitioned tocancelledstatus. - All previously created
TaskRunrecords (including successfully enqueued ones) are transitioned tocancelledstatus. Workers that dequeue already-enqueued messages will encounter a cancelled task and skip execution via state machine rejection. - The DAG run is transitioned to
failedstatus. - A
FAILEDexecution progress event is published.
Entry nodes that have not yet been iterated in the dispatch loop do not have TaskRun records created, so no cleanup is needed for them. This means the storage may not contain a complete audit trail of all intended entry tasks — only those that were created before the failure.
startCreatedRun Idempotency
startCreatedRun is safe to call on a run that is already past the created state. If the run is in queued, running, or any terminal state, the method returns the existing task run IDs without re-enqueuing or re-transitioning. This prevents duplicate task creation on retry.
RunQueryService -- Read-only service. Accepts
IStoragePort. Retrieves aDagRunand its associatedITaskRun[]bydagRunId.RunCancelService -- Accepts
IStoragePortandIClockPort. Validates the cancel transition throughDagRunStateMachineand updates run status tocancelled.
All services use the TResult<T, IDagError> pattern for error handling. There are no fallback paths or silent error swallowing.
Run Key Idempotency
Run keys follow the format {dagId}:{logicalDate} or {dagId}:{logicalDate}:rerun:{rerunKey}. Duplicate run key detection prevents redundant DAG runs. A storage-level race condition (concurrent create) is handled by re-querying the existing run.
State Transitions
All state transitions are delegated to DagRunStateMachine and TaskRunStateMachine from dag-core. The orchestrator does not define or override transition rules.
Type Ownership
| Type | Owner | Role |
|---|---|---|
IStartRunInput | dag-runtime | Input contract for startRun and createRun |
ICreateRunInput | dag-runtime | Alias of IStartRunInput for createRun |
ICreateRunResult | dag-runtime | Return value of createRun (includes status) |
IStartRunResult | dag-runtime | Return value of startRun (includes taskRunIds) |
IRunQueryResult | dag-runtime | Return value of getRun (dagRun + taskRuns) |
IRunCancelResult | dag-runtime | Return value of cancelRun |
RunOrchestratorService | dag-runtime | Orchestration service class |
RunQueryService | dag-runtime | Query service class |
RunCancelService | dag-runtime | Cancel service class |
DAG_RUNTIME_PACKAGE_NAME | dag-runtime | Package name constant |
IDagDefinition | dag-core | Imported -- DAG definition structure |
IDagRun | dag-core | Imported -- DAG run record |
ITaskRun | dag-core | Imported -- Task run record |
IDagError | dag-core | Imported -- Structured error type |
TResult | dag-core | Imported -- Result monad |
TDagRunStatus | dag-core | Imported -- Run status union |
TDagTriggerType | dag-core | Imported -- Trigger type union |
TPortPayload | dag-core | Imported -- Port payload type |
IStoragePort | dag-core | Imported -- Storage port interface |
IQueuePort | dag-core | Imported -- Queue port interface |
IClockPort | dag-core | Imported -- Clock port interface |
IQueueMessage | dag-core | Imported -- Queue message structure |
IRunProgressEventReporter | dag-core | Imported -- Event reporter port |
DagRunStateMachine | dag-core | Imported -- Run state transition logic |
TaskRunStateMachine | dag-core | Imported -- Task state transition logic |
TimeSemanticsService | dag-core | Imported -- Logical date resolution |
Public API Surface
| Export | Kind | Signature Summary |
|---|---|---|
RunOrchestratorService | Class | constructor(storage: IStoragePort, queue: IQueuePort, clock: IClockPort, runProgressEventReporter?: IRunProgressEventReporter) |
RunOrchestratorService.createRun | Method | (input: ICreateRunInput) => Promise<TResult<ICreateRunResult, IDagError>> |
RunOrchestratorService.startCreatedRun | Method | (dagRunId: string) => Promise<TResult<IStartRunResult, IDagError>> |
RunOrchestratorService.startRun | Method | (input: IStartRunInput) => Promise<TResult<IStartRunResult, IDagError>> |
RunQueryService | Class | constructor(storage: IStoragePort) |
RunQueryService.getRun | Method | (dagRunId: string) => Promise<TResult<IRunQueryResult, IDagError>> |
RunCancelService | Class | constructor(storage: IStoragePort, clock: IClockPort) |
RunCancelService.cancelRun | Method | (dagRunId: string) => Promise<TResult<IRunCancelResult, IDagError>> |
IStartRunInput | Interface | { dagId, version?, trigger, logicalDate?, rerunKey?, input } |
ICreateRunInput | Interface | Extends IStartRunInput |
ICreateRunResult | Interface | { dagRunId, dagId, version, logicalDate, status } |
IStartRunResult | Interface | { dagRunId, dagId, version, logicalDate, taskRunIds } |
IRunQueryResult | Interface | { dagRun: IDagRun, taskRuns: ITaskRun[] } |
IRunCancelResult | Interface | { dagRunId, status: 'cancelled' } |
DAG_RUNTIME_PACKAGE_NAME | Constant | '@robota-sdk/dag-runtime' |
Extension Points
Storage port -- Any implementation of
IStoragePortcan be injected. The package ships no storage implementation; tests useInMemoryStoragePortfromdag-core.Queue port -- Any implementation of
IQueuePortcan be injected. Tests useInMemoryQueuePortfromdag-core.Clock port -- Any implementation of
IClockPortcan be injected. Tests useFakeClockPortfromdag-core.Run progress event reporter -- Optional
IRunProgressEventReportercan be provided to the orchestrator to receive execution progress events. When omitted, no events are published.Rerun key -- The
rerunKeyfield inIStartRunInputallows creating distinct runs for the same DAG and logical date, enabling controlled rerun semantics without colliding with the original run key.
Error Taxonomy
All errors use IDagError from dag-core and are constructed via buildValidationError or buildDispatchError.
Validation Errors
| Code | Source | Condition |
|---|---|---|
DAG_VALIDATION_DEFINITION_NOT_FOUND | createRun | No published definition found for the given dagId (and optional version) |
DAG_VALIDATION_DEFINITION_NOT_PUBLISHED | createRun | Definition exists but its status is not published |
DAG_VALIDATION_MISSING_LOGICAL_DATE | createRun | Scheduled trigger without a logicalDate (via TimeSemanticsService) |
DAG_VALIDATION_DAG_RUN_NOT_FOUND | startCreatedRun, getRun, cancelRun | No DagRun found for the given dagRunId |
DAG_VALIDATION_DEFINITION_SNAPSHOT_MISSING | startCreatedRun | DagRun record has empty or missing definitionSnapshot |
DAG_VALIDATION_DEFINITION_SNAPSHOT_INVALID | startCreatedRun | definitionSnapshot fails structural parse |
DAG_VALIDATION_RUN_INPUT_SNAPSHOT_INVALID | startCreatedRun | inputSnapshot fails JSON object parse |
DAG_VALIDATION_NO_ENTRY_NODE | startCreatedRun | Definition contains no nodes with empty dependsOn |
DAG_VALIDATION_PAYLOAD_INVALID | internal | Parsed payload is not a JSON object |
DAG_VALIDATION_PAYLOAD_PARSE_FAILED | internal | Payload string is not valid JSON |
DAG_VALIDATION_DEFINITION_SNAPSHOT_PARSE_FAILED | internal | Definition snapshot string is not valid JSON |
Dispatch Errors
| Code | Source | Condition |
|---|---|---|
DAG_DISPATCH_DAG_RUN_CREATE_FAILED | createRun | Storage-level failure when creating the DagRun record (and no raced run found) |
DAG_DISPATCH_ENQUEUE_FAILED | startCreatedRun | Queue enqueue call throws for an entry task; triggers run failure and task cancellation |
State Machine Errors
State transition failures (e.g., attempting to cancel a terminal run) are returned directly from DagRunStateMachine.transition and TaskRunStateMachine.transition in dag-core. This package does not wrap or remap those errors.
Class Contract Registry
Interface Implementations
No classes in this package use the implements keyword. All port dependencies are consumed via constructor injection.
Inheritance Chains
None. Service classes are standalone (no extends).
Port Consumption via DI
| Service Class | Injected Port (from dag-core) | Location |
|---|---|---|
RunOrchestratorService | IStoragePort, IQueuePort, IClockPort, IRunProgressEventReporter | src/services/run-orchestrator-service.ts |
RunQueryService | IStoragePort | src/services/run-query-service.ts |
RunCancelService | IStoragePort, IClockPort | src/services/run-cancel-service.ts |
Cross-Package Port Consumers
| Port (Owner) | Consumer Class | Location |
|---|---|---|
IStoragePort (dag-core) | RunOrchestratorService, RunQueryService, RunCancelService | src/services/ |
IQueuePort (dag-core) | RunOrchestratorService | src/services/run-orchestrator-service.ts |
IClockPort (dag-core) | RunOrchestratorService, RunCancelService | src/services/ |
IRunProgressEventReporter (dag-core) | RunOrchestratorService | src/services/run-orchestrator-service.ts |
Test Strategy
Tests are located in packages/dag-runtime/src/__tests__/ and executed via vitest.
Test files
run-orchestrator-service.test.ts-- Tests forRunOrchestratorServicecovering:- Successful run creation and entry task enqueue
- Missing definition error
- Scheduled trigger without logical date error
- Idempotent duplicate run key handling
- Two-phase create-then-start workflow
- Idempotent
startCreatedRunon already-running run - No-entry-node validation
- Enqueue failure transitions (run to
failed, task tocancelled) - Race condition handling during concurrent
createDagRun
run-query-cancel-service.test.ts-- Tests forRunQueryServiceandRunCancelServicecovering:- Query of run with associated task runs
- Cancel from
runningstatus with timestamp verification
Test infrastructure
All tests use in-memory fakes from dag-core:
InMemoryStoragePort-- in-memory storage implementationInMemoryQueuePort-- in-memory queue implementationFakeClockPort-- deterministic clock for reproducible timestamps
Custom test doubles are defined locally:
FailingQueuePort-- simulates enqueue failure at a configurable call countRacyDagRunStoragePort-- extendsInMemoryStoragePortto simulate storage race conditions
Run command
pnpm --filter @robota-sdk/dag-runtime test