Skip to content

feat: add ExecutionGraph, CompletionTracker, and Task model for async scheduler#356

Open
andreatgretel wants to merge 5 commits intomainfrom
andreatgretel/feat/async-generators-and-task-queue-foundation
Open

feat: add ExecutionGraph, CompletionTracker, and Task model for async scheduler#356
andreatgretel wants to merge 5 commits intomainfrom
andreatgretel/feat/async-generators-and-task-queue-foundation

Conversation

@andreatgretel
Copy link
Contributor

@andreatgretel andreatgretel commented Feb 26, 2026

Summary

PR 1 of 4 in the async generators & task-queue builder plan. Adds the foundational data structures — ExecutionGraph, CompletionTracker, and Task/TaskResult/TaskTrace — that the async scheduler (PR 3) will consume. No existing behavior changes; all new modules under engine/dataset_builders/utils/.

Changes

Added

  • execution_graph.py — Column-level DAG built from config dependencies. Supports topological ordering (Kahn's, cached), critical path, cell-level dependency resolution, side-effect column mapping, Mermaid visualization, upfront task count estimation, cached upstream_by_strategy, and a create() factory classmethod.
  • completion_tracker.py — Tracks per-cell and per-batch completion state across row groups. Uses an event-driven frontier — readiness is computed incrementally on mark_complete/mark_batch_complete/drop_row via _enqueue_downstream, so get_ready_tasks returns in O(frontier) instead of scanning all columns × rows × row groups (O(C × R × G)) per tick. Handles row drops and batch-level markers.
  • task_model.py — Frozen dataclasses for Task (hashable work unit), TaskResult (outcome), and TaskTrace (timing trace). Includes ColumnName, RowGroupIndex, RowIndex type aliases for self-documenting signatures.
  • test_execution_graph.py (381 lines) — Tests for graph construction, topological order, critical path, cell dependencies, side-effects, Mermaid output, cycle detection, task counts.
  • test_completion_tracker.py (257 lines) — Tests for mark/query, batch completion, row drops, frontier-based readiness resolution, multi-row-group scenarios.
  • test_task_model.py (87 lines) — Tests for equality, hashing, set membership, defaults.

Changed

Total: +1,250 / -29 lines across 9 files (6 new, 3 modified). ~58% of added lines are tests (725 test / 506 source).

Attention Areas

Reviewers: Please pay special attention to the following:

  • completion_tracker.py — Event-driven frontier logic in _enqueue_downstream and _reevaluate_batch_tasks. This is the core optimization: cell completions do O(fan_out), batch completions check downstream rows, and get_ready_tasks is just a frontier filter.
  • execution_graph.py — Core DAG logic. The cell_dependencies method resolves side-effect columns and maps generation strategy to readiness granularity (cell vs batch). upstream_by_strategy is cached and used by the frontier logic. This is the contract that PR 3's scheduler will rely on.

Test plan

  • All new tests pass — 188 passed (pytest tests/engine/dataset_builders/utils/)
  • make check-all passes (lint + format)
  • Existing test suite unaffected — no imports from these modules yet

Description updated with AI

… scheduler

Add the foundational data structures for the async task-queue dataset
builder (plan #346, PR 1/4):

- ExecutionGraph: column-level static DAG with topological ordering,
  critical path, task counts, cell-dependency resolution, Mermaid output,
  and side-effect column mapping (__trace, __reasoning_content).
- CompletionTracker: lightweight (column, row_group, row_index) completion
  state with row dropping and ready-task enumeration.
- Task/TaskResult/TaskTrace: frozen hashable task dataclass, result
  container, and opt-in tracing record.

All three are pure data structures with no side effects on the existing
codebase. They live in new modules under engine/dataset_builders/utils/
and are only imported by code introduced in later PRs.

56 unit tests covering graph construction, validation, dependency
resolution, completion tracking, row drops, and task model semantics.

Refs #346
Add `is_ready` and `is_batch_ready` methods to CompletionTracker to
simplify `ready_tasks`. Cache topological order in ExecutionGraph since
the graph is immutable after construction. Move DatasetBuilderColumnConfigT
type alias to multi_column_configs. Fix license header years.
@andreatgretel andreatgretel requested a review from a team as a code owner February 26, 2026 21:59
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 26, 2026

Greptile Summary

This PR introduces the foundational data structures for the async task-queue scheduler (part of a 4-PR initiative). The implementation adds three core components that work together to enable dependency-aware task scheduling:

What was added:

  • ExecutionGraph - A column-level DAG that encodes dependencies between columns, handles side-effect column resolution, computes topological ordering using Kahn's algorithm, and provides cell-level dependency resolution based on generation strategies (cell-by-cell vs full-column)
  • CompletionTracker - An event-driven frontier-based tracker that maintains which cells/batches are complete, determines which tasks are ready to execute by consulting the execution graph, and efficiently handles row drops and batch completion markers in O(frontier) time
  • Task/TaskResult/TaskTrace - Immutable work units and their outcomes, with the Task being frozen and hashable for use in sets and as frontier elements

Code quality observations:

  • Excellent test coverage (62% of added lines are tests: 725 test lines / 446 source lines)
  • Clean separation of concerns between static graph structure (ExecutionGraph) and dynamic completion state (CompletionTracker)
  • Proper use of caching (topological order, upstream-by-strategy splits) to avoid recomputation
  • Well-documented with clear docstrings explaining O(n) complexity and design decisions
  • No behavioral changes to existing code (these modules are not yet imported anywhere)

Architecture notes:

  • The frontier-based design in CompletionTracker enables efficient ready-task queries without scanning all column×row×row_group combinations
  • Side-effect columns (e.g., __trace, __reasoning_content) are properly resolved to their producers during graph construction
  • The code correctly handles both cell-by-cell dependencies (same row index) and batch dependencies (entire row group) when determining task readiness

Confidence Score: 5/5

  • This PR is safe to merge - it adds isolated foundational modules with no integration points to existing code
  • Perfect score due to: (1) all new code in isolated utils/ directory with no imports from existing modules, (2) comprehensive test suite with 188 passing tests covering edge cases like circular dependencies, row drops, and complex DAGs, (3) clean implementations of well-understood algorithms (Kahn's topological sort, longest path), (4) excellent code quality with proper caching and O(n) complexity documentation, and (5) zero risk of breaking existing functionality since nothing imports these modules yet
  • No files require special attention - all implementations are solid with excellent test coverage

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py Column-level DAG implementation with Kahn's topological sort, critical path analysis, side-effect resolution, and cell dependency calculation - clean implementation with proper caching
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py Event-driven frontier-based completion tracking with proper handling of batch/cell granularity and row drops - efficient O(frontier) ready task queries
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/task_model.py Simple frozen dataclasses for Task (hashable), TaskResult, and TaskTrace with clear type aliases - straightforward and correct

Class Diagram

%%{init: {'theme': 'neutral'}}%%
classDiagram
    class ExecutionGraph {
        -dict~ColumnName, set~ _upstream
        -dict~ColumnName, set~ _downstream
        -dict~ColumnName, GenerationStrategy~ _strategies
        -dict~ColumnName, ColumnName~ _side_effect_map
        -list~ColumnName~ _columns
        -set~Task~ _frontier
        +add_column(name, strategy)
        +add_edge(upstream, downstream)
        +topological_order() list
        +critical_path() list
        +cell_dependencies() list
        +create(configs, strategies)$ ExecutionGraph
    }

    class CompletionTracker {
        -dict~RowGroupIndex, dict~ _completed
        -dict~RowGroupIndex, set~ _dropped
        -ExecutionGraph _graph
        -set~Task~ _frontier
        +mark_complete(column, row_group, row_index)
        +mark_batch_complete(column, row_group, size)
        +get_ready_tasks(dispatched) list~Task~
        +drop_row(row_group, row_index)
        +is_complete(column, row_group, row_index) bool
        -_enqueue_downstream(column, row_group, row_index)
        -_seed_frontier()
    }

    class Task {
        <<frozen>>
        +ColumnName column
        +RowGroupIndex row_group
        +RowIndex row_index
        +str task_type
    }

    class TaskResult {
        +Task task
        +str status
        +Any output
        +Exception error
        +bool retryable
    }

    class TaskTrace {
        +ColumnName column
        +RowGroupIndex row_group
        +RowIndex row_index
        +str task_type
        +float dispatched_at
        +float completed_at
        +from_task(task)$ TaskTrace
    }

    CompletionTracker --> ExecutionGraph : uses for dependencies
    CompletionTracker --> Task : manages in frontier
    TaskResult --> Task : wraps outcome
    TaskTrace --> Task : tracks timing
Loading

Last reviewed commit: c30abb4

@@ -0,0 +1,133 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: thoughts on putting these resources in a dedicated module somewhere outside of dataset_builders? May be async_helpers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought about it but these modules are only consumed by the dataset builder and share types with the other utils here (dag.py, concurrency.py, async_concurrency.py). moving them out would scatter tightly-coupled code without reducing coupling. keeping them in dataset_builders/utils/ for now — happy to revisit if they get reused elsewhere.

def is_complete(self, column: str, row_group: int, row_index: int) -> bool:
return row_index in self._completed.get(row_group, {}).get(column, set())

def all_complete(self, cells: list[tuple[str, int, int | None]]) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: all_complete -> is_all_complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def __init__(self) -> None:
# row_group → column → set of completed local row indices
self._completed: dict[int, dict[str, set[int]]] = defaultdict(lambda: defaultdict(set))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Use type aliases (or a NamedTuple) for row/column/row-group coordinates

The nested type dict[int, dict[str, set[int]]] in CompletionTracker._completed is hard to reason about at a glance — you have to mentally map "outer int = row group, str = column, inner int = row index" every time you read it. The same (str, int, int | None) tuple pattern also appears repeatedly across both CompletionTracker and ExecutionGraph.

Type aliases would help:

# In task_model.py or a shared types module
from typing import TypeAlias

RowIndex: TypeAlias = int
RowGroup: TypeAlias = int
ColumnName: TypeAlias = str

Then signatures become self-documenting:

# Before
self._completed: dict[int, dict[str, set[int]]]

# After
self._completed: dict[RowGroup, dict[ColumnName, set[RowIndex]]]
# Before
def mark_complete(self, column: str, row_group: int, row_index: int) -> None:

# After
def mark_complete(self, column: ColumnName, row_group: RowGroup, row_index: RowIndex) -> None:

You could also replace the tuple[str, int, int | None] scattered across both modules with a NamedTuple:

class CellCoord(NamedTuple):
    column: ColumnName
    row_group: RowGroup
    row_index: RowIndex | None

This lets you write coord.column instead of coord[0], is still hashable and tuple-compatible, and makes cell_dependencies and all_complete easier to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added ColumnName, RowGroup, RowIndex type aliases in task_model.py and applied them across CompletionTracker and ExecutionGraph. skipped the CellCoord namedtuple — the destructuring pattern for col, rg, ri in cells is already clear and used consistently, and the namedtuple adds allocation overhead in a hot loop.

Comment on lines +180 to +181
graph._columns.append(name)
graph._strategies[name] = strategies[name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can change to accessing public api right? There are a few instances of this pattern in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call. added add_column(), add_edge(), set_side_effect(), and resolve_side_effect() to ExecutionGraph and rewrote build_execution_graph to use them.

status: str = ""
error: str | None = None

@staticmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Prefer @classmethod over @staticmethod for TaskTrace.from_task

@classmethod is the standard Python convention for alternative constructors:

# Current
@staticmethod
def from_task(task: Task) -> TaskTrace:
    return TaskTrace(...)

# Preferred
@classmethod
def from_task(cls, task: Task) -> TaskTrace:
    return cls(...)

Using cls(...) instead of hardcoding TaskTrace(...) means the constructor works correctly with subclasses, and more importantly signals "this is an alternative constructor" idiomatically. Minor point since TaskTrace is unlikely to be subclassed, but it's a common convention worth following.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, switched to @classmethod + cls(...)

- Rename all_complete → is_all_complete for boolean method convention
- Add ColumnName, RowGroup, RowIndex type aliases for readability
- Add public mutation API to ExecutionGraph (add_column, add_edge,
  set_side_effect, resolve_side_effect) and rewrite build_execution_graph
  to use it instead of private attributes
- Change TaskTrace.from_task from @staticmethod to @classmethod
from typing import Any, Literal, TypeAlias

ColumnName: TypeAlias = str
RowGroup: TypeAlias = int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is technically also an index right? RowGroup > RowGroupIndex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, renamed across all three modules

from data_designer.engine.dataset_builders.utils.task_model import ColumnName, RowGroup, RowIndex


@dataclass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a dataclass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, converted to a plain class with __init__

return "\n".join(lines)


def build_execution_graph(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably mostly stylistic:

This could be a factory create class method of the ExecutionGraph class itself:

@classmethod
def create(cls, column_configs: list[DatasetBuilderColumnConfigT], strategies: dict[ColumnName, GenerationStrategy]) -> Self:
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the logic into ExecutionGraph.create(), kept build_execution_graph as a thin deprecated wrapper so existing call sites still work

- Rename RowGroup type alias to RowGroupIndex for consistency
- Convert ExecutionGraph from dataclass to plain class
- Move build_execution_graph logic to ExecutionGraph.create() classmethod
Comment on lines +52 to +75
def is_ready(
self,
column: ColumnName,
row_group: RowGroupIndex,
row_index: RowIndex,
graph: ExecutionGraph,
row_group_size: int,
) -> bool:
"""Check if all upstream columns are done for this (column, row_group, row_index)."""
deps = graph.cell_dependencies(column, row_group, row_index, row_group_size)
return self.is_all_complete(deps)

def is_batch_ready(
self,
column: ColumnName,
row_group: RowGroupIndex,
row_group_size: int,
graph: ExecutionGraph,
) -> bool:
"""Check if all upstream columns are done for all non-dropped rows in the row group."""
deps = graph.cell_dependencies(column, row_group, None, row_group_size)
# Dropped rows don't need their upstream cells complete
deps = [(c, rg, ri) for c, rg, ri in deps if ri is None or not self.is_dropped(rg, ri)]
return self.is_all_complete(deps)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit detail: could these methods instead take whatever graph.cell_dependencies returns as a dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moot now — get_ready_tasks no longer scans or checks dependencies. it just returns [t for t in self._frontier if t not in dispatched]. dependency resolution moved into _enqueue_downstream, which fires incrementally on each mark_complete / mark_batch_complete using graph.upstream_by_strategy. this turns the scheduler tick from O(C × R × G) to O(downstream_fan_out) per completion.

Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andreatgretel a few more comments related to perf!

Optimization Review

High Impact

1. get_ready_tasks is O(C × R × G) on every scheduler tick

This scans every column × every row × every row group on each call. With 10 columns, 10k records, buffer_size=100, that's ~100k iterations per tick, each triggering cell_dependencies() + is_all_complete().

Two suggestions:

  • Early skip for completed column×row_group pairs in the cell-by-cell branch. Before the inner row loop, a quick check like len(completed.get(col, set())) + len(dropped) >= rg_size would let you skip entire blocks.
  • Incremental/event-driven readiness (future PR): maintain a frontier set updated on mark_complete instead of full-scanning. This turns the scheduler from poll-based to event-driven.

2. cell_dependencies allocates a new list + tuples every call

Called per-cell inside the hot loop. For a 100-row batch with 3 upstream columns: 100 list allocations + 300 tuple allocations per column per row group per tick. Since the graph is immutable, the dependency pattern for a given column is always the same — only (row_group, row_index) varies. A cached descriptor that is_all_complete interprets directly could avoid most allocations.

3. is_batch_ready builds full dep list then filters it

deps = graph.cell_dependencies(column, row_group, None, row_group_size)
deps = [(c, rg, ri) for c, rg, ri in deps if ri is None or not self.is_dropped(rg, ri)]

For a full-column downstream of a 1000-row cell-by-cell column, this builds 1000 tuples then creates a second filtered list. Consider checking dropped rows inline or passing the dropped set into the dependency resolution.

Low Impact (fine to defer)

4. topological_order() and columns copy on every accesstopological_order() does return list(cache) and is called once per column per row group in get_ready_tasks. Since the graph is immutable and callers don't mutate the result, an internal _topological_order that returns the cached list directly (skipping the copy) would help in the hot path. Same for the columns property.

5. is_all_complete repeated dict lookups — Each (col, rg, ri) tuple triggers self._completed.get(rg, {}).get(col, set()) with temporary empty dict/set allocations on misses. Hoisting the row-group lookup outside the per-cell loop would reduce overhead.

6. _upstream/_downstream are defaultdict but accessors use .get(key, set()) — Allocates a fresh empty set on every miss. Minor, but switching to plain dict would make the no-side-effect intent explicit and avoid the allocation.

Summary

The two highest-impact changes are (1) early-skip logic in get_ready_tasks and (2) reducing per-cell allocations in cell_dependencies. Everything else is micro-optimization that can wait until profiling confirms it matters. Great foundation overall.

@andreatgretel
Copy link
Contributor Author

@nabinchha update on the optimization review after the event-driven frontier refactor:

1. get_ready_tasks O(C × R × G) per tick — addressed. get_ready_tasks is now [t for t in self._frontier if t not in dispatched]. Readiness is computed incrementally in _enqueue_downstream on each mark_complete/mark_batch_complete, so cost is O(downstream_fan_out) per completion instead of O(C × R × G) per tick.

2. cell_dependencies allocations per call — no longer in the hot path. The frontier logic uses graph.upstream_by_strategy (cached) directly. No per-cell list/tuple allocations on each tick.

3. is_batch_ready builds full dep list then filters — removed. Batch readiness is checked inline by _are_cell_ups_complete inside _enqueue_downstream and _reevaluate_batch_tasks, no intermediate list construction.

4–6 (topological_order copies, is_all_complete lookups, defaultdict) — already addressed in previous commits or no longer in the hot path.

Replace the poll-based get_ready_tasks (O(C × R × G) per tick) with an
event-driven frontier maintained on mark_complete/mark_batch_complete/
drop_row. get_ready_tasks now returns O(frontier) instead of scanning
all columns × rows × row groups.
@andreatgretel andreatgretel requested a review from nabinchha March 2, 2026 20:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants