Skip to content

dag-orchestrator SPEC.md

Scope

@robota-sdk/dag-orchestrator is the orchestration layer that bridges DAG definitions to Prompt API (ComfyUI-compatible) execution. It owns the translation of IDagDefinition into IPromptRequest, run lifecycle management (create, start, poll, result), cost estimation gating, and HTTP communication with a Prompt API server. This package is backend-agnostic: it works against both the Robota dag-runtime-server and a native ComfyUI server through the IPromptApiClientPort abstraction.

Boundaries

ResponsibilityOwnerNOT this package
DAG domain types (IDagDefinition, IPromptRequest, TResult, IDagError, IRunResult)@robota-sdk/dag-coreImported, not owned
Runtime execution (node execution, asset storage, queue processing)dag-runtime-serverNot touched
HTTP server routing, request handling, SSE/WebSocket eventsdag-orchestrator-serverConsumer of this package
DAG storage and persistencedag-orchestrator-server (in-memory or external)Not owned
UI and designer interactionsdag-designer, web appNot owned
Node definitions and catalogdag-core, dag-nodes-*Not owned

Architecture Overview

The package follows a ports-and-adapters architecture with two service layers:

index.ts (public surface)
  |
  +-- services/
  |     +-- OrchestratorRunService      (run lifecycle: create, start, poll, result)
  |     +-- PromptOrchestratorService   (prompt submission with cost policy gating)
  |
  +-- adapters/
  |     +-- HttpPromptApiClient         (IPromptApiClientPort -> HTTP fetch)
  |     +-- translateDefinitionToPrompt (IDagDefinition -> IPromptRequest pure function)
  |
  +-- interfaces/
  |     +-- IPromptApiClientPort        (port to Prompt API server)
  |     +-- ICostEstimatorPort          (port for cost estimation)
  |     +-- ICostPolicyEvaluatorPort    (port for cost policy evaluation)
  |
  +-- types/
        +-- orchestrator-types.ts       (SSOT types for orchestration config)

Key design decisions:

  • All public methods return TResult<T, IDagError>. No exceptions are thrown for domain failures.
  • OrchestratorRunService manages run state in-memory via a Map<string, IRunState>. It is stateful and scoped to the server process lifetime.
  • PromptOrchestratorService is stateless; each call delegates to the injected ports.
  • translateDefinitionToPrompt is a pure function (no side effects, no I/O).

Type Ownership

Types owned by this package (SSOT):

TypeLocationPurpose
IPromptCostEstimatetypes/orchestrator-types.tsPer-prompt cost estimation result (totalEstimatedCredits, perNode with estimatedCredits)
IPromptCostPolicytypes/orchestrator-types.tsMax cost threshold configuration (maxCreditsPerPrompt)
IRetryPolicytypes/orchestrator-types.tsRetry configuration (max retries, backoff, retryable error codes)
ITimeoutPolicytypes/orchestrator-types.tsPrompt timeout configuration
IOrchestratorConfigtypes/orchestrator-types.tsCombined orchestrator configuration (cost + retry + timeout policies)
IOrchestratedPromptRequesttypes/orchestrator-types.tsPrompt request bundled with orchestrator config
IOrchestratedPromptResponsetypes/orchestrator-types.tsPrompt response bundled with optional cost estimate
IPromptApiClientPortinterfaces/prompt-api-client-port.tsPort interface for Prompt API server communication
ICostEstimatorPortinterfaces/orchestrator-policy-port.tsPort interface for cost estimation
ICostPolicyEvaluatorPortinterfaces/orchestrator-policy-port.tsPort interface for cost policy evaluation

Types imported from @robota-sdk/dag-core (not owned here):

IDagDefinition, IDagError, IPromptRequest, IPromptResponse, IQueueStatus, IQueueAction, THistory, TObjectInfo, ISystemStats, TResult, TPortPayload, IRunResult, IRunNodeTrace, IRunNodeError, TRunProgressEvent, TPromptInputValue, TPrompt, IDagEdgeDefinition

Public API Surface

ExportKindDescription
PromptOrchestratorServiceclassStateless orchestrator: submits prompts with optional cost policy gating, delegates queue/history/objectInfo/systemStats to API client
OrchestratorRunServiceclassStateful run lifecycle manager: createRun, startRun, createAndStartRun, getRunStatus, getRunResult, recordEvent
HttpPromptApiClientclassHTTP adapter implementing IPromptApiClientPort via fetch()
translateDefinitionToPromptfunctionPure translator: IDagDefinition + TPortPayload -> IPromptRequest
IPromptApiClientPortinterface (type export)Port for Prompt API server communication
ICostEstimatorPortinterface (type export)Port for cost estimation logic
ICostPolicyEvaluatorPortinterface (type export)Port for cost policy evaluation logic
IPromptCostEstimateinterface (type export)Cost estimation result shape
IPromptCostPolicyinterface (type export)Cost policy configuration shape
IRetryPolicyinterface (type export)Retry policy configuration shape
ITimeoutPolicyinterface (type export)Timeout policy configuration shape
IOrchestratorConfiginterface (type export)Combined orchestrator configuration
IOrchestratedPromptRequestinterface (type export)Request bundled with config
IOrchestratedPromptResponseinterface (type export)Response bundled with cost estimate
CelCostEstimatorAdapterclassICostEstimatorPort adapter using CEL-based cost formulas from @robota-sdk/dag-cost

Extension Points

PortPurposeImplementors
IPromptApiClientPortCommunicate with a Prompt API server (ComfyUI or Robota runtime)HttpPromptApiClient (built-in), in-memory stubs (tests)
ICostEstimatorPortEstimate execution cost given TPrompt and TObjectInfoCelCostEstimatorAdapter (built-in), consumer-provided
ICostPolicyEvaluatorPortEvaluate whether an estimated cost passes a policy thresholdConsumer-provided; no built-in implementation

All three ports are injected via constructor DI. Consumers can provide custom implementations without modifying this package.

Error Taxonomy

All errors use IDagError from @robota-sdk/dag-core with TResult<T, IDagError> return types.

CodeCategoryRetryableSourceDescription
ORCHESTRATOR_EMPTY_DEFINITIONvalidationNotranslateDefinitionToPromptDefinition has zero nodes
ORCHESTRATOR_RUN_NOT_FOUNDvalidationNoOrchestratorRunServicedagRunId does not exist in the run map
ORCHESTRATOR_RUN_ALREADY_STARTEDvalidationNoOrchestratorRunService.startRunAttempted to start a run that is not in pending status
ORCHESTRATOR_RUN_NOT_COMPLETEDvalidationNoOrchestratorRunService.getRunResultRun has not been started or has not completed yet
NETWORK_ERRORdispatchYesHttpPromptApiClientFailed to connect to Prompt API server
HTTP_<status>validationYes (5xx) / No (4xx)HttpPromptApiClientNon-OK HTTP response from Prompt API server
COST_LIMIT_EXCEEDEDvalidationNoConsumer-provided ICostPolicyEvaluatorPortCost estimate exceeds policy threshold

Test Strategy

Current test files

FileTypeCoverage
prompt-orchestrator-service.test.tsUnitSubmits with/without cost policy, cost rejection, delegation of getQueue/getHistory/getSystemStats
prompt-api-client-port.test.tsContractVerifies IPromptApiClientPort is implementable as an in-memory stub
backend-interchangeability.test.tsContract (integration-style)Proves HttpPromptApiClient works identically against mock Robota and ComfyUI servers; cross-backend response shape parity
translator-contract.test.tsContractValidates translateDefinitionToPrompt: primitive/object/array config, edge bindings, slot indices, input node, class_type, _meta, empty definition error
run-service-contract.test.tsContractValidates OrchestratorRunService: createRun/startRun lifecycle, dagRunId=promptId, recordEvent accumulation, getRunStatus/getRunResult shapes, dual-index lookup

Coverage gaps

  • Retry and timeout policies: IRetryPolicy and ITimeoutPolicy are defined in types but not implemented in any service. No tests exist for these paths.

Verification commands

bash
pnpm --filter @robota-sdk/dag-orchestrator test
pnpm --filter @robota-sdk/dag-orchestrator build

Class Contract Registry

Class / FunctionImplements / ExtendsPort Consumer
HttpPromptApiClientimplements IPromptApiClientPort--
CelCostEstimatorAdapterimplements ICostEstimatorPortInjects ICostMetaStoragePort from @robota-sdk/dag-cost
PromptOrchestratorService--Injects IPromptApiClientPort, ICostEstimatorPort, ICostPolicyEvaluatorPort
OrchestratorRunService--Injects IPromptApiClientPort
translateDefinitionToPromptpure function (no port)--

Cross-package consumers

Consumer (external)Port consumedPackage
dag-orchestrator-server routesOrchestratorRunService, PromptOrchestratorServiceapps/dag-orchestrator-server

Dependencies

DependencyKindPurpose
@robota-sdk/dag-coreproductionDomain types (IDagDefinition, IPromptRequest, TResult, IDagError, etc.)
@robota-sdk/dag-costproductionCost meta types and CEL evaluator (ICostMetaStoragePort, CelCostEvaluator)
expressdevMock server for backend interchangeability tests
vitestdevTest runner
tsupdevBuild tool

Released under the MIT License.