feat: add ExecutionGraph, CompletionTracker, and Task model for async scheduler#356
Conversation
… scheduler Add the foundational data structures for the async task-queue dataset builder (plan #346, PR 1/4): - ExecutionGraph: column-level static DAG with topological ordering, critical path, task counts, cell-dependency resolution, Mermaid output, and side-effect column mapping (__trace, __reasoning_content). - CompletionTracker: lightweight (column, row_group, row_index) completion state with row dropping and ready-task enumeration. - Task/TaskResult/TaskTrace: frozen hashable task dataclass, result container, and opt-in tracing record. All three are pure data structures with no side effects on the existing codebase. They live in new modules under engine/dataset_builders/utils/ and are only imported by code introduced in later PRs. 56 unit tests covering graph construction, validation, dependency resolution, completion tracking, row drops, and task model semantics. Refs #346
Add `is_ready` and `is_batch_ready` methods to CompletionTracker to simplify `ready_tasks`. Cache topological order in ExecutionGraph since the graph is immutable after construction. Move DatasetBuilderColumnConfigT type alias to multi_column_configs. Fix license header years.
Greptile SummaryThis PR introduces the foundational data structures for the async task-queue scheduler (part of a 4-PR initiative). The implementation adds three core components that work together to enable dependency-aware task scheduling: What was added:
Code quality observations:
Architecture notes:
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py | Column-level DAG implementation with Kahn's topological sort, critical path analysis, side-effect resolution, and cell dependency calculation - clean implementation with proper caching |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py | Event-driven frontier-based completion tracking with proper handling of batch/cell granularity and row drops - efficient O(frontier) ready task queries |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/task_model.py | Simple frozen dataclasses for Task (hashable), TaskResult, and TaskTrace with clear type aliases - straightforward and correct |
Class Diagram
%%{init: {'theme': 'neutral'}}%%
classDiagram
class ExecutionGraph {
-dict~ColumnName, set~ _upstream
-dict~ColumnName, set~ _downstream
-dict~ColumnName, GenerationStrategy~ _strategies
-dict~ColumnName, ColumnName~ _side_effect_map
-list~ColumnName~ _columns
-set~Task~ _frontier
+add_column(name, strategy)
+add_edge(upstream, downstream)
+topological_order() list
+critical_path() list
+cell_dependencies() list
+create(configs, strategies)$ ExecutionGraph
}
class CompletionTracker {
-dict~RowGroupIndex, dict~ _completed
-dict~RowGroupIndex, set~ _dropped
-ExecutionGraph _graph
-set~Task~ _frontier
+mark_complete(column, row_group, row_index)
+mark_batch_complete(column, row_group, size)
+get_ready_tasks(dispatched) list~Task~
+drop_row(row_group, row_index)
+is_complete(column, row_group, row_index) bool
-_enqueue_downstream(column, row_group, row_index)
-_seed_frontier()
}
class Task {
<<frozen>>
+ColumnName column
+RowGroupIndex row_group
+RowIndex row_index
+str task_type
}
class TaskResult {
+Task task
+str status
+Any output
+Exception error
+bool retryable
}
class TaskTrace {
+ColumnName column
+RowGroupIndex row_group
+RowIndex row_index
+str task_type
+float dispatched_at
+float completed_at
+from_task(task)$ TaskTrace
}
CompletionTracker --> ExecutionGraph : uses for dependencies
CompletionTracker --> Task : manages in frontier
TaskResult --> Task : wraps outcome
TaskTrace --> Task : tracks timing
Last reviewed commit: c30abb4
| @@ -0,0 +1,133 @@ | |||
| # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |||
There was a problem hiding this comment.
super nit: thoughts on putting these resources in a dedicated module somewhere outside of dataset_builders? May be async_helpers?
There was a problem hiding this comment.
thought about it but these modules are only consumed by the dataset builder and share types with the other utils here (dag.py, concurrency.py, async_concurrency.py). moving them out would scatter tightly-coupled code without reducing coupling. keeping them in dataset_builders/utils/ for now — happy to revisit if they get reused elsewhere.
| def is_complete(self, column: str, row_group: int, row_index: int) -> bool: | ||
| return row_index in self._completed.get(row_group, {}).get(column, set()) | ||
|
|
||
| def all_complete(self, cells: list[tuple[str, int, int | None]]) -> bool: |
There was a problem hiding this comment.
super nit: all_complete -> is_all_complete
|
|
||
| def __init__(self) -> None: | ||
| # row_group → column → set of completed local row indices | ||
| self._completed: dict[int, dict[str, set[int]]] = defaultdict(lambda: defaultdict(set)) |
There was a problem hiding this comment.
Suggestion: Use type aliases (or a NamedTuple) for row/column/row-group coordinates
The nested type dict[int, dict[str, set[int]]] in CompletionTracker._completed is hard to reason about at a glance — you have to mentally map "outer int = row group, str = column, inner int = row index" every time you read it. The same (str, int, int | None) tuple pattern also appears repeatedly across both CompletionTracker and ExecutionGraph.
Type aliases would help:
# In task_model.py or a shared types module
from typing import TypeAlias
RowIndex: TypeAlias = int
RowGroup: TypeAlias = int
ColumnName: TypeAlias = strThen signatures become self-documenting:
# Before
self._completed: dict[int, dict[str, set[int]]]
# After
self._completed: dict[RowGroup, dict[ColumnName, set[RowIndex]]]# Before
def mark_complete(self, column: str, row_group: int, row_index: int) -> None:
# After
def mark_complete(self, column: ColumnName, row_group: RowGroup, row_index: RowIndex) -> None:You could also replace the tuple[str, int, int | None] scattered across both modules with a NamedTuple:
class CellCoord(NamedTuple):
column: ColumnName
row_group: RowGroup
row_index: RowIndex | NoneThis lets you write coord.column instead of coord[0], is still hashable and tuple-compatible, and makes cell_dependencies and all_complete easier to follow.
There was a problem hiding this comment.
added ColumnName, RowGroup, RowIndex type aliases in task_model.py and applied them across CompletionTracker and ExecutionGraph. skipped the CellCoord namedtuple — the destructuring pattern for col, rg, ri in cells is already clear and used consistently, and the namedtuple adds allocation overhead in a hot loop.
| graph._columns.append(name) | ||
| graph._strategies[name] = strategies[name] |
There was a problem hiding this comment.
these can change to accessing public api right? There are a few instances of this pattern in this file.
There was a problem hiding this comment.
good call. added add_column(), add_edge(), set_side_effect(), and resolve_side_effect() to ExecutionGraph and rewrote build_execution_graph to use them.
| status: str = "" | ||
| error: str | None = None | ||
|
|
||
| @staticmethod |
There was a problem hiding this comment.
Nit: Prefer @classmethod over @staticmethod for TaskTrace.from_task
@classmethod is the standard Python convention for alternative constructors:
# Current
@staticmethod
def from_task(task: Task) -> TaskTrace:
return TaskTrace(...)
# Preferred
@classmethod
def from_task(cls, task: Task) -> TaskTrace:
return cls(...)Using cls(...) instead of hardcoding TaskTrace(...) means the constructor works correctly with subclasses, and more importantly signals "this is an alternative constructor" idiomatically. Minor point since TaskTrace is unlikely to be subclassed, but it's a common convention worth following.
There was a problem hiding this comment.
done, switched to @classmethod + cls(...)
- Rename all_complete → is_all_complete for boolean method convention - Add ColumnName, RowGroup, RowIndex type aliases for readability - Add public mutation API to ExecutionGraph (add_column, add_edge, set_side_effect, resolve_side_effect) and rewrite build_execution_graph to use it instead of private attributes - Change TaskTrace.from_task from @staticmethod to @classmethod
| from typing import Any, Literal, TypeAlias | ||
|
|
||
| ColumnName: TypeAlias = str | ||
| RowGroup: TypeAlias = int |
There was a problem hiding this comment.
nit: This is technically also an index right? RowGroup > RowGroupIndex?
There was a problem hiding this comment.
done, renamed across all three modules
| from data_designer.engine.dataset_builders.utils.task_model import ColumnName, RowGroup, RowIndex | ||
|
|
||
|
|
||
| @dataclass |
There was a problem hiding this comment.
Does this need to be a dataclass?
There was a problem hiding this comment.
nope, converted to a plain class with __init__
| return "\n".join(lines) | ||
|
|
||
|
|
||
| def build_execution_graph( |
There was a problem hiding this comment.
nit: probably mostly stylistic:
This could be a factory create class method of the ExecutionGraph class itself:
@classmethod
def create(cls, column_configs: list[DatasetBuilderColumnConfigT], strategies: dict[ColumnName, GenerationStrategy]) -> Self:
...
There was a problem hiding this comment.
moved the logic into ExecutionGraph.create(), kept build_execution_graph as a thin deprecated wrapper so existing call sites still work
- Rename RowGroup type alias to RowGroupIndex for consistency - Convert ExecutionGraph from dataclass to plain class - Move build_execution_graph logic to ExecutionGraph.create() classmethod
| def is_ready( | ||
| self, | ||
| column: ColumnName, | ||
| row_group: RowGroupIndex, | ||
| row_index: RowIndex, | ||
| graph: ExecutionGraph, | ||
| row_group_size: int, | ||
| ) -> bool: | ||
| """Check if all upstream columns are done for this (column, row_group, row_index).""" | ||
| deps = graph.cell_dependencies(column, row_group, row_index, row_group_size) | ||
| return self.is_all_complete(deps) | ||
|
|
||
| def is_batch_ready( | ||
| self, | ||
| column: ColumnName, | ||
| row_group: RowGroupIndex, | ||
| row_group_size: int, | ||
| graph: ExecutionGraph, | ||
| ) -> bool: | ||
| """Check if all upstream columns are done for all non-dropped rows in the row group.""" | ||
| deps = graph.cell_dependencies(column, row_group, None, row_group_size) | ||
| # Dropped rows don't need their upstream cells complete | ||
| deps = [(c, rg, ri) for c, rg, ri in deps if ri is None or not self.is_dropped(rg, ri)] | ||
| return self.is_all_complete(deps) |
There was a problem hiding this comment.
nit detail: could these methods instead take whatever graph.cell_dependencies returns as a dependency?
There was a problem hiding this comment.
moot now — get_ready_tasks no longer scans or checks dependencies. it just returns [t for t in self._frontier if t not in dispatched]. dependency resolution moved into _enqueue_downstream, which fires incrementally on each mark_complete / mark_batch_complete using graph.upstream_by_strategy. this turns the scheduler tick from O(C × R × G) to O(downstream_fan_out) per completion.
There was a problem hiding this comment.
@andreatgretel a few more comments related to perf!
Optimization Review
High Impact
1. get_ready_tasks is O(C × R × G) on every scheduler tick
This scans every column × every row × every row group on each call. With 10 columns, 10k records, buffer_size=100, that's ~100k iterations per tick, each triggering cell_dependencies() + is_all_complete().
Two suggestions:
- Early skip for completed column×row_group pairs in the cell-by-cell branch. Before the inner row loop, a quick check like
len(completed.get(col, set())) + len(dropped) >= rg_sizewould let you skip entire blocks. - Incremental/event-driven readiness (future PR): maintain a frontier set updated on
mark_completeinstead of full-scanning. This turns the scheduler from poll-based to event-driven.
2. cell_dependencies allocates a new list + tuples every call
Called per-cell inside the hot loop. For a 100-row batch with 3 upstream columns: 100 list allocations + 300 tuple allocations per column per row group per tick. Since the graph is immutable, the dependency pattern for a given column is always the same — only (row_group, row_index) varies. A cached descriptor that is_all_complete interprets directly could avoid most allocations.
3. is_batch_ready builds full dep list then filters it
deps = graph.cell_dependencies(column, row_group, None, row_group_size)
deps = [(c, rg, ri) for c, rg, ri in deps if ri is None or not self.is_dropped(rg, ri)]For a full-column downstream of a 1000-row cell-by-cell column, this builds 1000 tuples then creates a second filtered list. Consider checking dropped rows inline or passing the dropped set into the dependency resolution.
Low Impact (fine to defer)
4. topological_order() and columns copy on every access — topological_order() does return list(cache) and is called once per column per row group in get_ready_tasks. Since the graph is immutable and callers don't mutate the result, an internal _topological_order that returns the cached list directly (skipping the copy) would help in the hot path. Same for the columns property.
5. is_all_complete repeated dict lookups — Each (col, rg, ri) tuple triggers self._completed.get(rg, {}).get(col, set()) with temporary empty dict/set allocations on misses. Hoisting the row-group lookup outside the per-cell loop would reduce overhead.
6. _upstream/_downstream are defaultdict but accessors use .get(key, set()) — Allocates a fresh empty set on every miss. Minor, but switching to plain dict would make the no-side-effect intent explicit and avoid the allocation.
Summary
The two highest-impact changes are (1) early-skip logic in get_ready_tasks and (2) reducing per-cell allocations in cell_dependencies. Everything else is micro-optimization that can wait until profiling confirms it matters. Great foundation overall.
|
@nabinchha update on the optimization review after the event-driven frontier refactor: 1. 2. 3. 4–6 (topological_order copies, is_all_complete lookups, defaultdict) — already addressed in previous commits or no longer in the hot path. |
Replace the poll-based get_ready_tasks (O(C × R × G) per tick) with an event-driven frontier maintained on mark_complete/mark_batch_complete/ drop_row. get_ready_tasks now returns O(frontier) instead of scanning all columns × rows × row groups.
Summary
PR 1 of 4 in the async generators & task-queue builder plan. Adds the foundational data structures —
ExecutionGraph,CompletionTracker, andTask/TaskResult/TaskTrace— that the async scheduler (PR 3) will consume. No existing behavior changes; all new modules underengine/dataset_builders/utils/.Changes
Added
execution_graph.py— Column-level DAG built from config dependencies. Supports topological ordering (Kahn's, cached), critical path, cell-level dependency resolution, side-effect column mapping, Mermaid visualization, upfront task count estimation, cachedupstream_by_strategy, and acreate()factory classmethod.completion_tracker.py— Tracks per-cell and per-batch completion state across row groups. Uses an event-driven frontier — readiness is computed incrementally onmark_complete/mark_batch_complete/drop_rowvia_enqueue_downstream, soget_ready_tasksreturns in O(frontier) instead of scanning all columns × rows × row groups (O(C × R × G)) per tick. Handles row drops and batch-level markers.task_model.py— Frozen dataclasses forTask(hashable work unit),TaskResult(outcome), andTaskTrace(timing trace). IncludesColumnName,RowGroupIndex,RowIndextype aliases for self-documenting signatures.test_execution_graph.py(381 lines) — Tests for graph construction, topological order, critical path, cell dependencies, side-effects, Mermaid output, cycle detection, task counts.test_completion_tracker.py(257 lines) — Tests for mark/query, batch completion, row drops, frontier-based readiness resolution, multi-row-group scenarios.test_task_model.py(87 lines) — Tests for equality, hashing, set membership, defaults.Changed
Total: +1,250 / -29 lines across 9 files (6 new, 3 modified). ~58% of added lines are tests (725 test / 506 source).
Attention Areas
completion_tracker.py— Event-driven frontier logic in_enqueue_downstreamand_reevaluate_batch_tasks. This is the core optimization: cell completions do O(fan_out), batch completions check downstream rows, andget_ready_tasksis just a frontier filter.execution_graph.py— Core DAG logic. Thecell_dependenciesmethod resolves side-effect columns and maps generation strategy to readiness granularity (cell vs batch).upstream_by_strategyis cached and used by the frontier logic. This is the contract that PR 3's scheduler will rely on.Test plan
pytest tests/engine/dataset_builders/utils/)make check-allpasses (lint + format)Description updated with AI