Skip to content

DAG Core Specification

Scope

@robota-sdk/dag-core is the single source of truth (SSOT) for all DAG domain contracts, state rules, and validation logic in the Robota monorepo. It owns the canonical type definitions for DAG definitions, runs, tasks, ports, nodes, edges, errors, and state machines. Every other dag-* package depends on dag-core and must import its contracts from this package rather than re-declaring them. This package contains no infrastructure adapters or runtime orchestration logic; it defines what the DAG domain looks like, not how it executes at scale.

Boundaries

  • No infrastructure adapters. Storage, queue, and lease implementations belong to consumer packages. dag-core defines only the port interfaces (IStoragePort, IQueuePort, ILeasePort, IClockPort, ITaskExecutorPort).
  • No orchestration runtime. DAG scheduling, worker polling, and run coordination belong to runtime and orchestration packages.
  • No node implementations. Concrete node types belong to node implementation packages.
  • No node authoring infrastructure. Base classes (AbstractNodeDefinition), accessors (NodeIoAccessor), registries, lifecycle wrappers, and value objects (MediaReference) belong to a dedicated node authoring package. dag-core defines the interfaces they implement but does not own the implementations.
  • No projection or read models. Event-sourced projections belong to projection packages.
  • No API layer. HTTP/REST composition belongs to API packages.
  • No designer UI. Visual graph editing belongs to designer packages.
  • Contract behavior must be deterministic and fail-fast. No fallback logic.

Architecture Overview

Layer Structure

dag-core/
  src/
    types/           -- Domain type definitions (SSOT contracts)
    interfaces/      -- Port interfaces for infrastructure boundaries
    constants/       -- Status enums, event name constants
    state-machines/  -- DagRun and TaskRun finite state machines
    lifecycle/       -- Node lifecycle runner, cost policy evaluator, task executor port
    services/        -- Domain services (validation, definition mgmt, cost policy)
    registry/        -- (emptied — extracted to node authoring package)
    schemas/         -- (emptied — extracted to node authoring package)
    value-objects/   -- (emptied — extracted to node authoring package)
    utils/           -- Error builder helpers
    testing/         -- (emptied — extracted to dag-adapters-local package)
    __tests__/       -- Unit tests

Design Patterns

  • Result pattern (TResult<T, E>): All domain operations return discriminated unions ({ ok: true; value: T } | { ok: false; error: E }) instead of throwing exceptions. This enforces explicit error handling at every call site.
  • Port/adapter (hexagonal): Infrastructure concerns are defined as port interfaces (IStoragePort, IQueuePort, ILeasePort, IClockPort, ITaskExecutorPort). dag-core owns the ports; consumer packages provide adapters. In-memory adapters for test harnesses are provided by @robota-sdk/dag-adapters-local.
  • Finite state machines: DagRunStateMachine and TaskRunStateMachine encode all legal state transitions as a lookup table. Invalid transitions return errors rather than silently succeeding. Terminal states (success, failed, cancelled) have no outgoing transitions except the explicit RETRY gate on TaskRun.failed -> queued.
  • Abstract template pattern: AbstractNodeDefinition<TSchema> provides a config-parsing template that delegates to *WithConfig methods, ensuring every lifecycle step receives a validated, typed config object. (Owned by node authoring package.)
  • Value object: MediaReference is an immutable value object with factory methods (fromAssetReference, fromBinary, fromCandidate) and no public constructor. (Owned by node authoring package.)
  • SSOT ownership: Every domain type is defined exactly once in this package. Other packages import from @robota-sdk/dag-core and never re-declare these contracts.

Type Ownership

All types below are the canonical SSOT definitions. Other dag-* packages must import them from @robota-sdk/dag-core.

TypeLocationPurpose
TDagDefinitionStatustypes/domain.tsDefinition lifecycle status: draft, published, deprecated
TPortValueTypetypes/domain.tsPort data types: string, number, boolean, object, array, binary
TBinaryKindtypes/domain.tsBinary payload kind: image, video, audio, file
TNodeConfigValuetypes/domain.tsRecursive config value type (primitives, objects, arrays)
TNodeConfigRecordtypes/domain.tsNode configuration record (alias for INodeConfigObject)
TAssetReferencetypes/domain.tsDiscriminated union for asset-by-id or asset-by-uri references
TDagRunStatustypes/domain.tsDAG run states: created, queued, running, success, failed, cancelled
TTaskRunStatustypes/domain.tsTask run states: created, queued, running, success, failed, upstream_failed, skipped, cancelled
TDagTriggerTypetypes/domain.tsTrigger types: manual, scheduled, api
IPortDefinitiontypes/domain.tsPort schema (key, type, required, binary constraints, list constraints)
INodeManifesttypes/domain.tsNode registration manifest (type, display name, category, ports, config schema)
ICostPolicytypes/domain.tsRun-level cost budget configuration (runCreditLimit, costPolicyVersion)
IDagNodetypes/domain.tsNode instance within a DAG definition
IEdgeBindingtypes/domain.tsSingle output-to-input port binding on an edge
IDagEdgeDefinitiontypes/domain.tsEdge connecting two nodes with bindings
IDagDefinitiontypes/domain.tsComplete DAG definition (nodes, edges, cost policy, schemas)
IDagRuntypes/domain.tsDAG execution run record
ITaskRuntypes/domain.tsIndividual task execution record within a DAG run
IExecutionPathSegmenttypes/domain.tsSegment of the hierarchical execution path
TErrorCategorytypes/error.tsError categories: validation, state_transition, lease, dispatch, task_execution
IDagErrortypes/error.tsCanonical error structure (code, category, message, retryable, context)
TResult<T, E>types/result.tsDiscriminated union result type for all domain operations
INodeLifecycletypes/node-lifecycle.tsFull node lifecycle interface (initialize, validateInput, estimateCost, execute, validateOutput, dispose)
INodeLifecycleFactorytypes/node-lifecycle.tsFactory interface for creating INodeLifecycle instances by node type
INodeManifestRegistrytypes/node-lifecycle.tsRegistry interface for looking up node manifests
INodeTaskHandlertypes/node-lifecycle.tsPartial lifecycle handler (only execute is required)
INodeTaskHandlerRegistrytypes/node-lifecycle.tsRegistry interface for looking up task handlers
IDagNodeDefinitiontypes/node-lifecycle.tsComposite definition combining manifest fields with a task handler
INodeDefinitionAssemblytypes/node-lifecycle.tsAssembly result of manifests and handlers from node definitions
INodeExecutionContexttypes/node-lifecycle.tsExecution context passed to lifecycle methods
INodeExecutionResulttypes/node-lifecycle.tsExecution result with output payload and cost data
ICostEstimatetypes/node-lifecycle.tsCost estimate returned from estimateCost (estimatedCredits, details?)
IRunCostPolicyEvaluatortypes/node-lifecycle.tsInterface for budget enforcement
TRunProgressEventtypes/run-progress.tsDiscriminated union of all run progress event types
IRunProgressEventReportertypes/run-progress.tsInterface for publishing progress events
TPortValueinterfaces/ports.tsUnion of all port value types (primitives, binary, arrays, objects)
TPortPayloadinterfaces/ports.tsKey-value map of port values
IQueuePortinterfaces/ports.tsQueue infrastructure port (enqueue, dequeue, ack, nack)
ILeasePortinterfaces/ports.tsLease infrastructure port (acquire, renew, release, get)
IStoragePortinterfaces/ports.tsStorage infrastructure port (definitions, runs, tasks)
ITaskExecutorPortinterfaces/ports.tsTask execution infrastructure port
IClockPortinterfaces/ports.tsClock infrastructure port (nowIso, nowEpochMs)
IQueueMessageinterfaces/ports.tsQueue message structure
ITaskExecutionInputinterfaces/ports.tsInput payload for task execution
TTaskExecutionResultinterfaces/ports.tsDiscriminated union result from task execution

Public API Surface

ExportKindDescription
DagRunStateMachineClassStatic state machine for DAG run status transitions
TaskRunStateMachineClassStatic state machine for task run status transitions
DagDefinitionValidatorClassValidates IDagDefinition structure (graph acyclicity, port compatibility, binding integrity)
DagDefinitionServiceClassDomain service for DAG definition CRUD and publish lifecycle
TimeSemanticsServiceClassResolves trigger type and logical date with UTC normalization
NodeLifecycleRunnerClassOrchestrates the full node lifecycle sequence (init, validate, estimate, execute, validate output, dispose)
RunCostPolicyEvaluatorClassEvaluates whether estimated cost fits within the run budget
MissingNodeLifecycleFactoryClassSentinel factory that always returns an error (used as default)
LifecycleTaskExecutorPortClassITaskExecutorPort adapter that delegates to NodeLifecycleRunner
buildValidationErrorFunctionError builder for validation category
buildDispatchErrorFunctionError builder for dispatch category
buildLeaseErrorFunctionError builder for lease category
buildTaskExecutionErrorFunctionError builder for task_execution category
buildDagErrorFunctionGeneric error builder for any category
buildListPortHandleKeyFunctionBuilds a list-port handle key string (e.g., images[0])
parseListPortHandleKeyFunctionParses a list-port handle key back to port key and index
DAG_DEFINITION_STATUSConstantDefinition status enum object
DAG_RUN_STATUSConstantDAG run status enum object
TASK_RUN_STATUSConstantTask run status enum object
RUN_EVENTS, TASK_EVENTS, WORKER_EVENTS, SCHEDULER_EVENTSConstantsDomain event name constants
EXECUTION_PROGRESS_EVENTS, TASK_PROGRESS_EVENTSConstantsProgress event name constants
RUN_EVENT_PREFIX, TASK_EVENT_PREFIX, WORKER_EVENT_PREFIX, SCHEDULER_EVENT_PREFIX, EXECUTION_EVENT_PREFIXConstantsEvent prefix strings
DAG_CORE_PACKAGE_NAMEConstantPackage name string @robota-sdk/dag-core

Note: 인메모리 포트 구현체들은 @robota-sdk/dag-adapters-local 패키지로 분리됨. InMemoryStoragePort, InMemoryQueuePort, InMemoryLeasePort, FakeClockPort, SystemClockPort, MockTaskExecutorPort 등은 해당 패키지에서 import.

Extension Points

AbstractNodeDefinition<TSchema>

The primary extension point for node implementors. dag-core defines the IDagNodeDefinition and INodeLifecycle interfaces; the abstract base class and supporting infrastructure are owned by the node authoring package.

Port Interfaces

Consumer packages implement these interfaces to provide infrastructure:

  • IStoragePort -- persistence for definitions, runs, and tasks.
  • IQueuePort -- message queue for task dispatch (enqueue, dequeue, ack, nack).
  • ILeasePort -- distributed lease management (acquire, renew, release).
  • IClockPort -- clock abstraction for deterministic time in tests.
  • ITaskExecutorPort -- task execution delegation.

INodeTaskHandler

A lighter alternative to full INodeLifecycle. Only execute is required; all other lifecycle methods are optional. The RegisteredNodeLifecycle wrapper fills in defaults and adds base port validation for handlers that omit validateInput/validateOutput.

Error Taxonomy

Error Structure

All errors conform to IDagError:

typescript
interface IDagError {
    code: string;
    category: TErrorCategory;
    message: string;
    retryable: boolean;
    context?: Record<string, string | number | boolean>;
}

Error Categories

CategoryDescriptionDefault Retryable
validationSchema, structure, or constraint violationsfalse
state_transitionInvalid state machine transitionsfalse
leaseLease acquisition or renewal failuresfalse
dispatchTask dispatch/queue failurestrue
task_executionErrors during node executionvaries

Error Codes

Validation errors (category: validation, retryable: false):

CodeSourceDescription
DAG_VALIDATION_EMPTY_DAG_IDDagDefinitionValidatordagId is empty
DAG_VALIDATION_INVALID_VERSIONDagDefinitionValidatorversion is not a positive integer
DAG_VALIDATION_EMPTY_NODESDagDefinitionValidatorDAG has no nodes
DAG_VALIDATION_EMPTY_NODE_IDDagDefinitionValidatornodeId is empty
DAG_VALIDATION_DUPLICATE_NODE_IDDagDefinitionValidatorduplicate nodeId
DAG_VALIDATION_NODE_TYPE_REMOVEDDagDefinitionValidatordeprecated node type used
DAG_VALIDATION_EMPTY_INPUT_KEYDagDefinitionValidatorinput port key is empty
DAG_VALIDATION_EMPTY_OUTPUT_KEYDagDefinitionValidatoroutput port key is empty
DAG_VALIDATION_DUPLICATE_INPUT_KEYDagDefinitionValidatorduplicate input port key within a node
DAG_VALIDATION_DUPLICATE_OUTPUT_KEYDagDefinitionValidatorduplicate output port key within a node
DAG_VALIDATION_INVALID_INPUT_ORDERDagDefinitionValidatorinput port order is not a non-negative integer
DAG_VALIDATION_INVALID_OUTPUT_ORDERDagDefinitionValidatoroutput port order is not a non-negative integer
DAG_VALIDATION_INVALID_INPUT_MIN_ITEMSDagDefinitionValidatorinput port minItems is invalid
DAG_VALIDATION_INVALID_INPUT_MAX_ITEMSDagDefinitionValidatorinput port maxItems is invalid
DAG_VALIDATION_INVALID_INPUT_ITEM_RANGEDagDefinitionValidatorminItems exceeds maxItems
DAG_VALIDATION_EDGE_FROM_NOT_FOUNDDagDefinitionValidatoredge references nonexistent source node
DAG_VALIDATION_EDGE_TO_NOT_FOUNDDagDefinitionValidatoredge references nonexistent target node
DAG_VALIDATION_BINDING_REQUIREDDagDefinitionValidatoredge has no bindings
DAG_VALIDATION_BINDING_OUTPUT_NOT_FOUNDDagDefinitionValidatorbinding references nonexistent output port
DAG_VALIDATION_BINDING_INPUT_NOT_FOUNDDagDefinitionValidatorbinding references nonexistent input port
DAG_VALIDATION_BINDING_INPUT_KEY_DUPLICATEDagDefinitionValidatormultiple outputs map to same input in one edge
DAG_VALIDATION_BINDING_INPUT_KEY_CONFLICTDagDefinitionValidatormultiple upstream edges map to same input
DAG_VALIDATION_BINDING_TYPE_MISMATCHDagDefinitionValidatoroutput and input port types are incompatible
DAG_VALIDATION_CYCLE_DETECTEDDagDefinitionValidatorDAG contains a cycle
DAG_VALIDATION_INVALID_COST_LIMITDagDefinitionValidatorcost limit is not positive
DAG_VALIDATION_INVALID_COST_POLICY_VERSIONDagDefinitionValidatorcost policy version is not positive
DAG_VALIDATION_TEST_ENTRY_NODE_COUNT_INVALIDDagDefinitionValidatortest DAG has wrong entry node count
DAG_VALIDATION_TEST_ENTRY_NODE_TYPE_INVALIDDagDefinitionValidatortest DAG entry node is wrong type
DAG_VALIDATION_DUPLICATE_VERSIONDagDefinitionServicedefinition with same dagId and version already exists
DAG_VALIDATION_DEFINITION_NOT_FOUNDDagDefinitionServicedefinition does not exist
DAG_VALIDATION_UPDATE_ONLY_DRAFTDagDefinitionServiceonly draft definitions can be updated
DAG_VALIDATION_PUBLISH_ONLY_DRAFTDagDefinitionServiceonly draft definitions can be published
DAG_VALIDATION_MISSING_LOGICAL_DATETimeSemanticsServicescheduled trigger requires logicalDate
DAG_VALIDATION_INVALID_LOGICAL_DATETimeSemanticsServicelogicalDate is not valid ISO-8601
DAG_VALIDATION_NODE_CONFIG_SCHEMA_INVALIDnode definition base classnode config fails Zod schema parse
DAG_VALIDATION_NODE_INPUT_MISSINGnode I/O accessorrequired input key is missing
DAG_VALIDATION_NODE_INPUT_TYPE_MISMATCHnode I/O accessor, lifecycle wrapperinput value type does not match port type
DAG_VALIDATION_NODE_INPUT_MIN_ITEMS_NOT_SATISFIEDnode I/O accessor, lifecycle wrapperlist input has fewer items than minItems
DAG_VALIDATION_NODE_INPUT_MAX_ITEMS_EXCEEDEDnode I/O accessor, lifecycle wrapperlist input has more items than maxItems
DAG_VALIDATION_NODE_REQUIRED_INPUT_MISSINGlifecycle wrapperrequired input port value is missing
DAG_VALIDATION_NODE_REQUIRED_OUTPUT_MISSINGlifecycle wrapperrequired output port value is missing
DAG_VALIDATION_NODE_OUTPUT_TYPE_MISMATCHlifecycle wrapperoutput value type does not match port type
DAG_VALIDATION_NODE_OUTPUT_MIN_ITEMS_NOT_SATISFIEDlifecycle wrapperlist output has fewer items than minItems
DAG_VALIDATION_NODE_OUTPUT_MAX_ITEMS_EXCEEDEDlifecycle wrapperlist output has more items than maxItems
DAG_VALIDATION_NODE_LIFECYCLE_NOT_REGISTEREDlifecycle factory, MissingNodeLifecycleFactoryno lifecycle registered for node type
DAG_VALIDATION_NODE_DEFINITION_MISSINGLifecycleTaskExecutorPorttask execution input lacks nodeDefinition
DAG_VALIDATION_NODE_MANIFEST_NOT_FOUNDLifecycleTaskExecutorPortno manifest registered for node type
DAG_VALIDATION_NEGATIVE_ESTIMATED_COSTRunCostPolicyEvaluatorestimated cost is negative
DAG_VALIDATION_COST_LIMIT_EXCEEDEDRunCostPolicyEvaluatorestimated run cost exceeds budget
DAG_VALIDATION_MEDIA_REFERENCE_INVALIDmedia reference value objectmedia reference structure is invalid
DAG_VALIDATION_MEDIA_REFERENCE_XOR_REQUIREDmedia reference value objectexactly one of assetId or uri must be provided
DAG_VALIDATION_MEDIA_REFERENCE_TYPE_MISMATCHmedia reference value objectreferenceType does not match provided fields

State transition errors (category: state_transition, retryable: false):

CodeSourceDescription
DAG_STATE_TRANSITION_INVALIDDagRunStateMachine, TaskRunStateMachineattempted transition is not allowed

Task execution errors (category: task_execution, retryable: varies):

CodeSourceDescription
DAG_TASK_EXECUTION_DISPOSE_FAILEDNodeLifecycleRunnernode dispose step failed after successful execution

State Lifecycle

DagRun State Machine

States: created, queued, running, success, failed, cancelled

Terminal states: success, failed, cancelled

created --QUEUE--> queued --START--> running --COMPLETE_SUCCESS--> success
created --CANCEL--> cancelled
queued --CANCEL--> cancelled
running --COMPLETE_FAILURE--> failed
running --CANCEL--> cancelled

Each transition emits a domain event with the run.* prefix (e.g., run.queued, run.running).

TaskRun State Machine

States: created, queued, running, success, failed, upstream_failed, skipped, cancelled

Terminal states: success, upstream_failed, skipped, cancelled

Note: The failed state is NOT terminal — it has a single explicit policy gate: RETRY transitions back to queued. This is intentional: a failed task may be retried via the DLQ reinject mechanism. Consumer packages implementing run finalization must treat failed as terminal only when no remaining retries exist (i.e., a failed task with no remaining retries is effectively terminal for DAG completion evaluation).

Finalization Semantics

For DAG run finalization (determining success vs failed outcome):

  • failed is the only task status that contributes to a failed DAG run outcome.
  • upstream_failed, skipped, and cancelled are non-failure terminal states — they do not cause the DAG run to fail.
  • A DAG run is success when all tasks are in terminal states AND no task is failed.
created --QUEUE--> queued --START--> running --COMPLETE_SUCCESS--> success
created --CANCEL--> cancelled
queued --UPSTREAM_FAIL--> upstream_failed
queued --SKIP--> skipped
queued --CANCEL--> cancelled
running --COMPLETE_FAILURE--> failed
running --CANCEL--> cancelled
failed --RETRY--> queued

Each transition emits a domain event with the task.* prefix (e.g., task.queued, task.running).

Event Architecture

dag-core defines event name constants but does not own an event bus or emitter. Event prefixes owned by this package:

PrefixConstantDomain
runRUN_EVENT_PREFIXDAG run state changes
taskTASK_EVENT_PREFIXTask run state changes
workerWORKER_EVENT_PREFIXWorker lifecycle events
schedulerSCHEDULER_EVENT_PREFIXScheduler evaluation events
executionEXECUTION_EVENT_PREFIXExecution progress events

Progress event types are defined as TRunProgressEvent (discriminated union) with reporter interface IRunProgressEventReporter.

Dependencies

DependencyPurpose
zodRuntime schema validation for node configs and media references
zod-to-json-schemaConverts Zod schemas to JSON Schema 7 for manifest configSchema

No peer dependencies.

Class Contract Registry

Internal Implementations

Implementations owned by this package:

InterfaceImplementorKindLocation
INodeLifecycleFactoryMissingNodeLifecycleFactorysentinelsrc/lifecycle/node-lifecycle-runner.ts
IRunCostPolicyEvaluatorRunCostPolicyEvaluatorproductionsrc/lifecycle/node-lifecycle-runner.ts
ITaskExecutorPortLifecycleTaskExecutorPortproductionsrc/lifecycle/lifecycle-task-executor-port.ts

Note: 인메모리 포트 어댑터(InMemoryStoragePort, InMemoryQueuePort, InMemoryLeasePort, FakeClockPort, SystemClockPort, MockTaskExecutorPort)는 @robota-sdk/dag-adapters-local 패키지로 분리됨. 해당 패키지의 SPEC.md 참조.

Interfaces Designed for External Implementation

The following interfaces are defined by dag-core and intended to be implemented by consumer packages. Each consumer package documents its own implementations in its SPEC.md.

InterfaceExpected Implementor Role
IDagNodeDefinitionNode authoring packages (abstract base class)
INodeLifecycleNode authoring packages (lifecycle wrapper)
INodeLifecycleFactoryNode authoring packages (factory from handler registry)
INodeManifestRegistryNode authoring packages (manifest lookup)
INodeTaskHandlerRegistryNode authoring packages (handler lookup)
IStoragePortPersistence adapters (file, database)
IQueuePortMessage queue adapters
ILeasePortDistributed lease adapters
IClockPortClock adapters (system, deterministic)
ITaskExecutorPortTask execution adapters

Test Strategy

Current Test Files

FileCoverage
__tests__/definition-service.test.tsDagDefinitionValidator (duplicate nodeId, cycle detection), DagDefinitionService (publish invalid, update non-draft)
__tests__/time-semantics.test.tsTimeSemanticsService (manual/api/scheduled triggers, UTC normalization, invalid date rejection)

Coverage Gaps

The following areas lack dedicated unit tests in this package:

  • DagRunStateMachine and TaskRunStateMachine: No tests for valid transitions, invalid transitions, or domain event emission. May be tested indirectly by consumer packages.
  • NodeLifecycleRunner: No tests for the full lifecycle sequence (init, validate, estimate, budget check, execute, validate output, dispose) or cost policy evaluation.
  • RunCostPolicyEvaluator: No tests for budget enforcement (negative cost, limit exceeded, within budget).
  • LifecycleTaskExecutorPort: No tests for manifest lookup, node definition validation, or runner delegation.
  • DagDefinitionValidator: Partial coverage. Missing tests for edge binding validation, port type compatibility, cost policy validation, list port handle resolution, and many specific validation codes.
  • MediaReference, StaticNodeManifestRegistry, StaticNodeTaskHandlerRegistry: Owned by the node authoring package; tested there.
  • In-memory testing ports: Extracted to @robota-sdk/dag-adapters-local. Tests belong there. That package also provides FileStoragePort (file-based IStoragePort) and FileCostMetaStorage (file-based ICostMetaStoragePort from @robota-sdk/dag-cost).

Released under the MIT License.