Skip to content

Check for inputs with Unites #251

@NiveditJain

Description

@NiveditJain

Check for inputs when a node Unites

Labels: core, validation, graph-semantics

Context
Given a linear graph A -> B -> C -> D, if D unites B then D is allowed to take inputs only from B and from the ancestors of B (here that is A). D must not accept inputs from C because C is not in the ancestry of B.

This generalizes to any DAG: a node N that unites a set of nodes {U1, U2, ...} may only source inputs from the union of each Ui and their transitive ancestors.


Problem

Nodes marked as uniting can currently bind inputs from any upstream provider. This breaks the intended semantics of uniting which are meant to restrict the visible input surface. We need compile time and runtime validation that enforces the rule.

Definitions

  • Unites: a property on a node template that lists one or more node names it unites. Example: D.unites = ["B"].
  • Allowed Providers Set (APS): for a node N, APS is closure(unites(N)) = ancestors(U1) ∪ {U1} ∪ ... ∪ ancestors(Uk) ∪ {Uk}.
  • Input Provider: a node name or state that produced an output referenced by N.inputs.

Expected behavior

  1. When a node N defines unites = [U1, ..., Uk], any input binding of N that references outputs must come from nodes in APS(N).
  2. Literals and system-provided inputs are always allowed.
  3. If an input references a node outside APS(N), validation should fail with a clear error at graph compile time if possible, and at runtime before N transitions to RUNNABLE.
  4. If unites is empty or not set, existing behavior is unchanged.

Examples

  • Linear: A -> B -> C -> D, D.unites = [B].

    • Allowed: A, B.
    • Not allowed: C.
  • Fan-in: A -> B -> D, A -> C -> D, D.unites = [B].

    • Allowed: B and ancestors of B which include A.
    • Not allowed: C.
  • Multiple targets: D.unites = [B, C].

    • Allowed: A, B, C and all of their ancestors.
  • Invalid target: D.unites = [X] where X is not an ancestor of D.

    • Compile time error: "D.unites lists X which is not an ancestor of D".

Edge cases and policy

  • Cross-namespace or cross-run references: disallow. Providers must be in the same namespace, graph_name, and run_id.
  • Skipped or failed unite targets: if a target is SKIPPED or FAILED, the scheduler should follow the existing policy for missing inputs. This proposal does not alter retry or skip semantics.
  • Dynamic edges: APS is computed from the realized graph for the run_id at the time N is prepared. Dynamic edge additions must revalidate APS.
  • Aliases: if inputs support aliasing like from: B.output_x as x, the provider check uses the source node B.
  • Literals and secrets: literals are allowed. Secrets or server side config resolved at runtime are allowed.

Data model

We already have does_unites: bool on state. Extend node template schema to carry unites: list[str].

// NodeTemplate (SDK and API)
interface NodeTemplate {
  name: string
  // ...
  does_unites?: boolean
  unites?: string[] // names of nodes N unites
}

Compile time validation (graph builder)

  1. For every node N with unites:

    • Assert each Ui exists and is an ancestor of N in the template DAG.
    • Precompute APS(N) by walking reverse edges from each Ui and store it on the compiled plan for fast checks.
  2. For any static input wiring known at compile time, assert providers are in APS(N).

# Pseudocode
from collections import deque

def ancestors(adj_rev, u):
    seen = {u}
    q = deque([u])
    while q:
        x = q.popleft()
        for p in adj_rev.get(x, []):
            if p not in seen:
                seen.add(p)
                q.append(p)
    return seen

def compute_aps(adj_rev, unites_list):
    aps = set()
    for u in unites_list:
        aps |= ancestors(adj_rev, u)
    return aps

Runtime validation (StateManager)

Before transitioning N from READY to RUNNABLE or RUNNING, validate all referenced providers for N.inputs against APS(N).

// On state materialization for node N
const aps = getPrecomputedAPS(N) // from compiled plan
for (const input of N.inputs) {
  if (input.type === 'ref') {
    const provider = input.nodeName
    if (!aps.has(provider)) {
      throw new ValidationError(
        `Input provider ${provider} is not allowed by unites on ${N.name}. ` +
        `Allowed: ${[...aps].sort().join(', ')}`
      )
    }
  }
}

Additionally enforce namespace, graph_name, and run_id equality for referenced states.

API and SDK

  • SDK should expose a helper to declare uniting behavior.
Node(
    name="D",
    unites=["B"],
)
  • Provide a small linter rule: validate_unites(graph) that prints a diff of invalid bindings.

Tests

  • Linear happy path where D.unites = [B] and inputs from A and B pass.
  • Linear negative where D reads from C and fails with the expected error message.
  • Fan-in where D.unites = [B] rejects inputs from C.
  • Multiple targets [B, C] accepts both providers and their ancestors.
  • Invalid target not ancestor of D fails at compile time.
  • Cross-run reference is rejected.

Acceptance criteria

  • Graphs that violate unites provider rules fail early with clear errors that list the allowed providers.
  • Valid graphs run without regression in scheduling or retries.
  • APS computation is cached per compiled plan and adds negligible overhead.
  • Documentation updated with examples and rationale.

Docs

Add a section to "Graph semantics" that explains Unites with diagrams. Include the running example A -> B -> C -> D and a fan-in example, and show APS visually.

Metadata

Metadata

Assignees

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions