From f091da4ae62cfae57e03e1b43feb530455f3e059 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Fri, 24 Apr 2026 08:02:17 -0500 Subject: [PATCH 1/4] Demonstrate failure for multi-layer transforms to receive a data product from an unfold --- test/unfold.cpp | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/test/unfold.cpp b/test/unfold.cpp index dfbd7bd5b..01edb2497 100644 --- a/test/unfold.cpp +++ b/test/unfold.cpp @@ -137,3 +137,51 @@ TEST_CASE("Splitting the processing", "[graph]") CHECK(g.execution_count("add_numbers") == 20); CHECK(g.execution_count("check_sum_same") == index_limit); } + +// ======================================================================================= +// This test exercises a multi-layer transform whose two inputs come from different data +// layers: one from the unfolded (child) layer and one from the parent (event) layer. +// +/* Index Router */ +/* | */ +/* provide_max_number (event layer) */ +/* | \ */ +/* unfold/iota (creates "subevent" children) */ +/* | \ */ +/* | \ */ +/* (subevent) (event, repeated) */ +/* new_number max_number */ +/* \ / */ +/* multi-layer transform: max_number + new_number */ +// ======================================================================================= +TEST_CASE("Multi-layer transform with one input from an unfold", "[graph]") +{ + constexpr auto index_limit = 2u; + + experimental::layer_generator gen; + gen.add_layer("event", {"job", index_limit}); + + experimental::framework_graph g{driver_for_test(gen)}; + + g.provide("provide_max_number", provide_max_number, concurrency::unlimited) + .output_product(product_query{.creator = "input", .layer = "event", .suffix = "max_number"}); + + g.unfold("iota", &iota::predicate, &iota::unfold, concurrency::unlimited, "subevent") + .input_family(product_query{.creator = "input", .layer = "event", .suffix = "max_number"}) + .output_product_suffixes("new_number"); + + g.transform( + "add_max_and_new", + [](unsigned int i, unsigned int j) { return i + j; }, + concurrency::unlimited) + .input_family(product_query{.creator = "iota", .layer = "subevent", .suffix = "new_number"}, + product_query{.creator = "input", .layer = "event", .suffix = "max_number"}) + .output_product_suffixes("result"); + + g.execute(); + + // event 0: max_number=10, new_number in [0,9] -> 10 executions + // event 1: max_number=20, new_number in [0,19] -> 20 executions + CHECK(g.execution_count("iota") == index_limit); + CHECK(g.execution_count("add_max_and_new") == 30u); +} From 2f4e2fb7aa661989922dbb904f976afbda73d467 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Mon, 11 May 2026 15:31:33 -0500 Subject: [PATCH 2/4] Introduce data_cell_tracker and child_tracker constructs --- phlex/model/CMakeLists.txt | 4 + phlex/model/child_tracker.cpp | 106 ++++++++++ phlex/model/child_tracker.hpp | 66 ++++++ phlex/model/data_cell_tracker.cpp | 150 ++++++++++++++ phlex/model/data_cell_tracker.hpp | 92 +++++++++ test/CMakeLists.txt | 8 + test/child_tracker_propagation_test.cpp | 259 ++++++++++++++++++++++++ test/data_cell_tracker_test.cpp | 114 +++++++++++ 8 files changed, 799 insertions(+) create mode 100644 phlex/model/child_tracker.cpp create mode 100644 phlex/model/child_tracker.hpp create mode 100644 phlex/model/data_cell_tracker.cpp create mode 100644 phlex/model/data_cell_tracker.hpp create mode 100644 test/child_tracker_propagation_test.cpp create mode 100644 test/data_cell_tracker_test.cpp diff --git a/phlex/model/CMakeLists.txt b/phlex/model/CMakeLists.txt index 340ac8eae..5e9bb87cb 100644 --- a/phlex/model/CMakeLists.txt +++ b/phlex/model/CMakeLists.txt @@ -4,7 +4,9 @@ cet_make_library( SHARED SOURCE algorithm_name.cpp + child_tracker.cpp data_cell_counter.cpp + data_cell_tracker.cpp fixed_hierarchy.cpp data_layer_hierarchy.cpp data_cell_index.cpp @@ -29,7 +31,9 @@ install( fixed_hierarchy.hpp fwd.hpp handle.hpp + child_tracker.hpp data_cell_counter.hpp + data_cell_tracker.hpp data_layer_hierarchy.hpp data_cell_index.hpp identifier.hpp diff --git a/phlex/model/child_tracker.cpp b/phlex/model/child_tracker.cpp new file mode 100644 index 000000000..31f942c7f --- /dev/null +++ b/phlex/model/child_tracker.cpp @@ -0,0 +1,106 @@ +#include "phlex/model/child_tracker.hpp" + +#include "spdlog/spdlog.h" + +#include +#include +#include +#include + +namespace phlex::experimental { + + child_tracker::child_tracker(data_cell_index_ptr index, std::size_t expected_flush_count) : + index_{std::move(index)}, expected_flush_count_{expected_flush_count} + { + } + + std::size_t child_tracker::expected_total_count() const + { + return std::ranges::fold_left(expected_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t child_tracker::processed_total_count() const + { + return std::ranges::fold_left(processed_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t child_tracker::committed_total_count() const + { + return std::ranges::fold_left(committed_counts_ | std::views::values, 0uz, std::plus{}); + } + + std::size_t child_tracker::committed_count_for_layer( + data_cell_index::hash_type const layer_hash) const + { + return committed_counts_.count(layer_hash); + } + + void child_tracker::update_committed_counts(data_cell_counts const& committed_counts) + { + for (auto const& [layer_hash, count] : committed_counts) { + committed_counts_.add_to(layer_hash, count); + } + } + + void child_tracker::update_expected_counts(data_cell_counts const& expected_counts) + { + for (auto const& [layer_hash, count] : expected_counts) { + expected_counts_.add_to(layer_hash, count); + } + ++received_flush_count_; + } + + void child_tracker::update_expected_count(data_cell_index::hash_type const layer_hash, + std::size_t const count) + { + expected_counts_.add_to(layer_hash, count); + ++received_flush_count_; + } + + void child_tracker::send_flush() + { + if (flush_callback_) { + flush_callback_(shared_from_this()); + } else { + spdlog::warn("No flush callback set for index: {}", index_->to_string()); + } + } + + bool child_tracker::all_children_accounted() + { + auto const received = received_flush_count_.load(); + if (received == 0) { + return false; + } + + // Block until all flush counts expected from unfolds have arrived so that expected_counts_ + // reflects the union of all child layers. + if (expected_flush_count_ > 0 and received < expected_flush_count_) { + return false; + } + + // All expected flush messages have arrived; check that processed child counts match. + bool const result = std::ranges::all_of(expected_counts_, [this](auto const& entry) { + auto const& [layer_hash, expected] = entry; + return processed_counts_.count(layer_hash) == expected.load(); + }); + + if (result) { + std::call_once(commit_once_, [this] { commit(); }); + } + + return result; + } + + void child_tracker::commit() + { + for (auto const& [layer_hash, count] : processed_counts_) { + committed_counts_.add_to(layer_hash, count.load()); + } + + // At some point, we might consider clearing the processed_counts_ and expected_counts_ maps + // to free memory, but for now we can just leave them as-is since the child_tracker will + // likely be destroyed soon after commit() is called. + } + +} // namespace phlex::experimental diff --git a/phlex/model/child_tracker.hpp b/phlex/model/child_tracker.hpp new file mode 100644 index 000000000..ad694ca44 --- /dev/null +++ b/phlex/model/child_tracker.hpp @@ -0,0 +1,66 @@ +#ifndef PHLEX_MODEL_CHILD_TRACKER_HPP +#define PHLEX_MODEL_CHILD_TRACKER_HPP + +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_cell_tracker.hpp" +#include "phlex/phlex_model_export.hpp" + +#include +#include +#include +#include +#include + +namespace phlex::experimental { + + class PHLEX_MODEL_EXPORT child_tracker : public std::enable_shared_from_this { + using flush_callback_t = std::function)>; + + public: + // expected_flush_count controls how many update_expected_counts() calls must arrive before + // all_children_accounted() can return true. The default of 0 means a single call is + // sufficient (the common case when only one unfold consumes this index's layer). A value + // greater than 1 means that multiple unfolds produce children from the same parent layer. + explicit child_tracker(data_cell_index_ptr index, std::size_t expected_flush_count); + + data_cell_index_ptr const index() const { return index_; } + std::size_t expected_total_count() const; + std::size_t processed_total_count() const; + std::size_t committed_total_count() const; + std::size_t committed_count_for_layer(data_cell_index::hash_type layer_hash) const; + data_cell_counts const& committed_counts() const { return committed_counts_; } + + // Merges expected_counts into the accumulated expected counts. Each call represents one + // flush message arriving (e.g. one unfold completing for this index). + void update_expected_counts(data_cell_counts const& expected_counts); + // Single-entry variant used when an unfold reports its child count directly (no map needed). + void update_expected_count(data_cell_index::hash_type layer_hash, std::size_t count); + void update_committed_counts(data_cell_counts const& committed_counts); + void increment(data_cell_index::hash_type const layer_hash) + { + processed_counts_.increment(layer_hash); + } + + void set_flush_callback(flush_callback_t callback) { flush_callback_ = std::move(callback); } + void send_flush(); + bool all_children_accounted(); + + private: + void commit(); + + data_cell_index_ptr const index_; + std::once_flag commit_once_; + data_cell_counts committed_counts_; + data_cell_counts processed_counts_; + // Accumulated expected child counts from all unfolds. + data_cell_counts expected_counts_; + std::atomic received_flush_count_{0}; + // Number of flush messages expected from unfolds. Zero means source-driven: any single + // update_expected_counts call is sufficient to unblock all_children_accounted(). + std::size_t expected_flush_count_{0}; + flush_callback_t flush_callback_; + }; + +} // namespace phlex::experimental + +#endif // PHLEX_MODEL_CHILD_TRACKER_HPP diff --git a/phlex/model/data_cell_tracker.cpp b/phlex/model/data_cell_tracker.cpp new file mode 100644 index 000000000..a82b7bfb6 --- /dev/null +++ b/phlex/model/data_cell_tracker.cpp @@ -0,0 +1,150 @@ +#include "phlex/model/data_cell_tracker.hpp" + +#include "spdlog/spdlog.h" + +#include +#include +#include + +namespace { + auto make_data_cell_counts(phlex::data_cell_index_ptr const& index) + { + auto result = std::make_shared(); + result->emplace(index->layer_hash(), 1); + return result; + } +} + +namespace phlex::experimental { + + // ========================================================================================= + // data_cell_counts implementation + void data_cell_counts::emplace(std::size_t layer_hash, std::size_t value) + { + map_.emplace(layer_hash, value); + } + + // ========================================================================================= + // data_cell_tracker implementation + data_cell_tracker::~data_cell_tracker() + { + if (pending_flushes_.empty()) { + return; + } + spdlog::warn("Cached pending flushes at destruction:"); + for (auto const& [index, flush_counts] : pending_flushes_ | std::views::values) { + spdlog::warn(" Index: {}", index->to_string()); + for (auto const& [layer_hash, count] : *flush_counts) { + spdlog::warn(" {} = {}", layer_hash, count.load()); + } + } + } + + index_flushes data_cell_tracker::closeout(data_cell_index_ptr const& received_index) + { + // Always update the cached index. The logic below uses the previous cached index to + // determine what flushes to emit. + auto cached_index = std::exchange(cached_index_, received_index); + + // Just beginning job (or ending a job that immediately threw an exception) + if (cached_index == nullptr) { + return {}; + } + + // Ending job. Backout to the job layer and emit flush tokens for all closed-out indices. + // + // Example: + // Current index: [run: 4, spill: 7] + // Received index: nullptr (end of job) + // Actions: + // a. Emit flush token for [run: 4, spill: 7] + // b. Emit flush token for [run: 4] + // c. Remove remaining flushes from cache + if (received_index == nullptr) { + // The "std::move" empties the cache, so we don't need to manually clear it after. + return std::move(pending_flushes_) | std::views::values | std::ranges::to(); + } + + assert(received_index); + assert(cached_index); + + // A parent must exist at this point + auto received_parent = received_index->parent(); + assert(received_parent); + + // Received index is immediate child of current index. + // + // Example: + // Current index: [run: 4] + // Received index: [run: 4, spill: 6] + // Actions: + // a. Initialize count for [run: 4] + if (received_parent->hash() == cached_index->hash()) { + create_parent_count(received_parent, received_index); + return {}; + } + + auto cached_parent = cached_index->parent(); + + // Received index is a sibling of the current index. Increment parent count and move + // current index to the new sibling. + // + // Example: + // Current index: [run: 4, spill: 6] + // Received index: [run: 4, spill: 7] + // Actions: + // a. Increment count for [run: 4] + if (cached_parent and received_parent->hash() == cached_parent->hash()) { + increment_parent_count(received_parent, received_index); + return {}; + } + + // Received index is a parent of the cached index. This means we've closed out the + // cached index and all of its siblings, and are moving back up the hierarchy. We need + // to emit flush tokens for all closed-out indices. + // + // Example: + // Cached index: [run: 4, spill: 6, subspill: 2, subsubspill: 3] + // Received index: [run: 4, spill: 7] + // Actions: + // a. Emit flush token for [run: 4, spill: 6, subspill: 2] + // b. Emit flush token for [run: 4, spill: 6] + // c. Remove relevant flush tokens from cache + // d. Increment count for [run: 4] + index_flushes result; + auto recursive_parent = cached_parent; + while (recursive_parent != received_parent) { + if (recursive_parent == nullptr) { + // We will get here if the received parent is at a layer lower than the cached parent and is + // not an immediate child of the cached parent. The recursive parent walks all the way back + // up to the root without finding the received parent. This means the received index is not + // an ancestor of the cached index, and is invalid. + throw std::runtime_error( + fmt::format("Received index {}, which is not an immediate child of {}", + received_index->to_string(), + cached_index->to_string())); + } + auto fh = pending_flushes_.extract(recursive_parent->hash()); + result.push_back(fh.mapped()); + recursive_parent = recursive_parent->parent(); + } + increment_parent_count(received_parent, received_index); + return result; + } + + void data_cell_tracker::create_parent_count(data_cell_index_ptr const& parent, + data_cell_index_ptr const& child) + { + pending_flushes_.emplace(parent->hash(), + index_flush{.index = parent, .counts = make_data_cell_counts(child)}); + } + + void data_cell_tracker::increment_parent_count(data_cell_index_ptr const& parent, + data_cell_index_ptr const& child) + { + auto it = pending_flushes_.find(parent->hash()); + // This is only called for siblings, so the parent count must already exist in the cache. + assert(it != pending_flushes_.end()); + it->second.counts->increment(child->layer_hash()); + } +} diff --git a/phlex/model/data_cell_tracker.hpp b/phlex/model/data_cell_tracker.hpp new file mode 100644 index 000000000..ba37574fc --- /dev/null +++ b/phlex/model/data_cell_tracker.hpp @@ -0,0 +1,92 @@ +#ifndef PHLEX_MODEL_DATA_CELL_TRACKER_HPP +#define PHLEX_MODEL_DATA_CELL_TRACKER_HPP + +#include "phlex/phlex_model_export.hpp" + +#include "phlex/model/data_cell_index.hpp" + +#include "oneapi/tbb/concurrent_unordered_map.h" + +#include +#include +#include +#include +#include + +namespace phlex::experimental { + class PHLEX_MODEL_EXPORT data_cell_counts { + public: + void emplace(std::size_t layer_hash, std::size_t value); + + void increment(data_cell_index::hash_type layer_hash) { ++map_[layer_hash]; } + void add_to(std::size_t layer_hash, std::size_t value) { map_[layer_hash] += value; } + + auto begin() const { return map_.begin(); } + auto end() const { return map_.end(); } + + auto size() const { return map_.size(); } + + std::size_t count(data_cell_index::hash_type layer_hash) const + { + auto it = map_.find(layer_hash); + return it != map_.end() ? it->second.load() : 0; + } + + private: + tbb::concurrent_unordered_map> map_; + }; + + using data_cell_counts_ptr = std::shared_ptr; + using data_cell_counts_const_ptr = std::shared_ptr; + + struct PHLEX_MODEL_EXPORT index_flush { + data_cell_index_ptr index; + // Ideally, the counts field should be a `data_cell_counts_const_ptr` to ensure immutability. + // However, this type is also used for incrementing counters, so it must be mutable. + data_cell_counts_ptr counts; + }; + + using index_flushes = std::vector; + + // A simpler flush message sent by an unfold to the index_router. Unlike index_flush, which + // carries a map of child counts, unfold_flush carries a single (layer_hash, count) pair + // because each unfold produces children in exactly one child layer. + struct PHLEX_MODEL_EXPORT unfold_flush { + data_cell_index_ptr index; + data_cell_index::hash_type layer_hash{}; + std::size_t count{}; + }; + + // The `closeout_then_emit` struct carries flushes that must be emitted + // (to close out already-emitted indices) before emitting `index_to_emit`. + struct PHLEX_MODEL_EXPORT closeout_then_emit { + index_flushes closeout_flushes{}; + data_cell_index_ptr index_to_emit{nullptr}; + }; + + class PHLEX_MODEL_EXPORT data_cell_tracker { + public: + data_cell_tracker() = default; + ~data_cell_tracker(); + + data_cell_tracker(data_cell_tracker const&) = delete; + data_cell_tracker(data_cell_tracker&&) = delete; + data_cell_tracker& operator=(data_cell_tracker const&) = delete; + data_cell_tracker& operator=(data_cell_tracker&&) = delete; + + // Computes and returns the set of indices whose processing is now complete, given that + // the next index to be processed is `index`. A null `index` signals end-of-job and + // returns all remaining pending flushes. + index_flushes closeout(data_cell_index_ptr const& index); + + private: + void create_parent_count(data_cell_index_ptr const& parent, data_cell_index_ptr const& child); + void increment_parent_count(data_cell_index_ptr const& parent, + data_cell_index_ptr const& child); + + data_cell_index_ptr cached_index_{nullptr}; + std::map pending_flushes_; + }; +} + +#endif // PHLEX_MODEL_DATA_CELL_TRACKER_HPP diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 79b60c0f4..924ba8400 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -176,6 +176,14 @@ cet_test(data_cell_index USE_CATCH2_MAIN SOURCE data_cell_index.cpp LIBRARIES cet_test(fixed_hierarchy_test USE_CATCH2_MAIN SOURCE fixed_hierarchy_test.cpp LIBRARIES phlex::model ) +cet_test(data_cell_tracker USE_CATCH2_MAIN SOURCE data_cell_tracker_test.cpp LIBRARIES + phlex::model + TBB::tbb +) +cet_test(child_tracker_propagation USE_CATCH2_MAIN SOURCE child_tracker_propagation_test.cpp LIBRARIES + phlex::core_internal + layer_generator_internal +) cet_test(product_handle USE_CATCH2_MAIN SOURCE product_handle.cpp LIBRARIES phlex::core_internal ) diff --git a/test/child_tracker_propagation_test.cpp b/test/child_tracker_propagation_test.cpp new file mode 100644 index 000000000..f044f8b56 --- /dev/null +++ b/test/child_tracker_propagation_test.cpp @@ -0,0 +1,259 @@ +// ======================================================================================= +// This test verifies that child_trackers correctly accumulate committed child counts +// through a two-level unfold hierarchy (job → run → spill). A standalone +// `index_counter` node manages flush counting without the rest of the framework_graph +// machinery. The graph is: +// +// source (job index) +// | +// index_counter +// | +// unfold_into_run (8 runs per job) +// | +// unfold_into_spill (2 spills per run) +// +// The test checks that: +// - each run-level counter records 2 committed spill children +// - the job-level counter records 8 committed run children and +// 16 committed spill children (propagated up from the run level) +// ======================================================================================= + +#include "phlex/model/child_tracker.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_cell_tracker.hpp" +#include "phlex/utilities/hashing.hpp" +#include "plugins/layer_generator.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/cfg/env.h" +#include "spdlog/spdlog.h" + +#include +#include +#include + +using namespace phlex; +using namespace phlex::experimental; +using namespace oneapi; + +namespace { + class index_counter { + public: + using node_t = tbb::flow::multifunction_node< + closeout_then_emit, + std::tuple>>; + + index_counter(index_counter const&) = delete; + index_counter(index_counter&&) = delete; + index_counter& operator=(index_counter const&) = delete; + index_counter& operator=(index_counter&&) = delete; + + explicit index_counter(tbb::flow::graph& g, std::vector layer_names) : + node_{g, + tbb::flow::unlimited, + [this](closeout_then_emit input, auto& output_ports) { + auto&& index = input.index_to_emit; + auto&& flushes = input.closeout_flushes; + process(index, std::move(flushes)); + std::get<0>(output_ports).try_put(index); + }}, + index_receiver_{g, + tbb::flow::unlimited, + [this](data_cell_index_ptr index) -> tbb::flow::continue_msg { + assert(index); + flush_if_done(index); + return {}; + }}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](unfold_flush input) { + auto&& [index, layer_hash, count] = input; + counter(index)->update_expected_count(layer_hash, count); + flush_if_done(index); + }}, + layer_names_{std::move(layer_names)} + { + } + + ~index_counter() + { + assert(counters_.empty() && "All counters should have been flushed by the end of the test"); + } + + tbb::flow::receiver& input_port() { return node_; } + tbb::flow::receiver& index_receiver() { return index_receiver_; } + auto& output_port() { return tbb::flow::output_port<0>(node_); } + auto& flush_port() { return tbb::flow::output_port<1>(node_); } + auto& flush_receiver() { return flush_receiver_; } + + void process(data_cell_index_ptr const& index, index_flushes flushes) + { + assert(index); + update_flush_counts(std::move(flushes)); + flush_if_done(index); + } + + bool index_is_lowest_layer(data_cell_index_ptr const& index) const + { + return index->layer_name() == layer_names_.back(); + } + + void update_flush_counts(index_flushes flushes) + { + for (auto const& [index, flush_counts] : flushes) { + counter(index)->update_expected_counts(*flush_counts); + } + } + + std::shared_ptr counter(data_cell_index_ptr const& index) + { + if (const_accessor a; counters_.find(a, index->hash())) { + return a->second; + } + + accessor a; + counters_.emplace(a, index->hash(), std::make_shared(index, 0)); + a->second->set_flush_callback( + [this](std::shared_ptr fc) { flush_port().try_put(fc); }); + return a->second; + } + + void flush_if_done(data_cell_index_ptr index) + { + assert(index); + + auto parent = index->parent(); + + // Lowest-layer indices do not have their own counters. + // Increment the parent counter for the lowest-layer index instead. + if (index_is_lowest_layer(index)) { + // Sometimes the lowest layer is the job, which has no parent + if (parent) { + counter(parent)->increment(index->layer_hash()); + } + index = parent; + } + + while (index) { + // Use an exclusive accessor so that only one thread can release a given counter. + // This prevents double-release when multiple threads call + // flush_if_done concurrently for the same index. + accessor a; + if (not counters_.find(a, index->hash())) { + // This can happen when two threads process the same parent index, + // and one of them releases it before the other completes. + return; + } + + if (not a->second->all_children_accounted()) { + return; + } + + a->second->send_flush(); + + if (auto parent = index->parent()) { + auto parent_counter = counter(parent); + parent_counter->update_committed_counts(a->second->committed_counts()); + parent_counter->increment(index->layer_hash()); + index = parent; + } + counters_.erase(a); + } + } + + private: + using counters_t = tbb::concurrent_hash_map>; + using accessor = counters_t::accessor; + using const_accessor = counters_t::const_accessor; + + counters_t counters_; + node_t node_; + tbb::flow::function_node index_receiver_; + tbb::flow::function_node flush_receiver_; + std::vector layer_names_; + }; + + using unfold_node_t = tbb::flow::multifunction_node< + data_cell_index_ptr, + std::tuple>; + + unfold_node_t make_unfold_node(tbb::flow::graph& g, std::string layer_name, int count) + { + return unfold_node_t{ + g, + tbb::flow::unlimited, + [layer_name = std::move(layer_name), count](data_cell_index_ptr input, auto& output_ports) { + auto& [output_port, to_index_receiver, to_flush_receiver] = output_ports; + for (int const i : std::views::iota(0, count)) { + auto child = input->make_child(layer_name, i); + output_port.try_put(child); + to_index_receiver.try_put(child); + } + auto const layer_hash = + experimental::hash(input->layer_hash(), identifier{layer_name}.hash()); + to_flush_receiver.try_put( + {.index = input, .layer_hash = layer_hash, .count = static_cast(count)}); + }}; + } +} + +TEST_CASE("Flush counts propagate through nested unfolds", "[graph]") +{ + spdlog::cfg::load_env_levels(); + + experimental::layer_generator gen; + experimental::framework_driver driver{driver_for_test(gen).driver}; + data_cell_tracker cell_tracker; + + tbb::flow::graph g; + tbb::flow::input_node src{ + g, [&driver, &cell_tracker](tbb::flow_control& fc) mutable -> closeout_then_emit { + if (auto item = driver()) { + return {.closeout_flushes = cell_tracker.closeout(*item), .index_to_emit = *item}; + } + fc.stop(); + return {}; + }}; + + index_counter ic{g, {"job", "run", "spill"}}; + + auto unfold_into_run = make_unfold_node(g, "run", 8); + auto unfold_into_spill = make_unfold_node(g, "spill", 2); + + auto run_layer_hash = experimental::hash("job"_id.hash(), "run"_id.hash()); + auto spill_layer_hash = experimental::hash("job"_id.hash(), "run"_id.hash(), "spill"_id.hash()); + + tbb::flow::function_node> flusher{ + g, + tbb::flow::serial, + [run_layer_hash, spill_layer_hash](std::shared_ptr counter) { + if (counter->index()->layer_name() == "run") { + CHECK(counter->committed_count_for_layer(spill_layer_hash) == 2); + CHECK(counter->expected_total_count() == 2); + CHECK(counter->processed_total_count() == 2); + return; + } + + REQUIRE(counter->index()->layer_name() == "job"); + // Checking for nested layers to ensure that flush counts are properly propagated up + // the hierarchy + CHECK(counter->committed_count_for_layer(spill_layer_hash) == 16); + CHECK(counter->committed_count_for_layer(run_layer_hash) == 8); + CHECK(counter->expected_total_count() == 8); // Only immediate children are expected. + CHECK(counter->processed_total_count() == 8); // Only immediate children are processed. + }}; + + make_edge(src, ic.input_port()); + make_edge(ic.output_port(), unfold_into_run); + make_edge(output_port<0>(unfold_into_run), unfold_into_spill); + make_edge(output_port<1>(unfold_into_run), ic.index_receiver()); + make_edge(output_port<2>(unfold_into_run), ic.flush_receiver()); + make_edge(output_port<1>(unfold_into_spill), ic.index_receiver()); + make_edge(output_port<2>(unfold_into_spill), ic.flush_receiver()); + make_edge(ic.flush_port(), flusher); + + src.activate(); + g.wait_for_all(); +} diff --git a/test/data_cell_tracker_test.cpp b/test/data_cell_tracker_test.cpp new file mode 100644 index 000000000..5afc40051 --- /dev/null +++ b/test/data_cell_tracker_test.cpp @@ -0,0 +1,114 @@ +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_cell_tracker.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers_string.hpp" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/spdlog.h" + +using namespace phlex; +using namespace phlex::experimental; + +namespace { + void use_ostream_logger(std::ostringstream& oss) + { + auto ostream_sink = std::make_shared(oss); + auto ostream_logger = std::make_shared("my_logger", ostream_sink); + spdlog::set_default_logger(ostream_logger); + } +} + +TEST_CASE("Test data-cell tracker", "[graph]") +{ + data_cell_tracker tracker; + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + auto spill5 = run4->make_child("spill", 5); + auto spill6 = run4->make_child("spill", 6); + auto subspill2 = spill6->make_child("subspill", 2); + auto run5 = job_index->make_child("run", 5); + + CHECK(tracker.closeout(job_index).empty()); + CHECK(tracker.closeout(run4).empty()); + CHECK(tracker.closeout(spill5).empty()); + CHECK(tracker.closeout(spill6).empty()); + CHECK(tracker.closeout(subspill2).empty()); + + auto flushes = tracker.closeout(run5); + REQUIRE(flushes.size() == 2); + + auto spill6_flush = flushes[0]; + CHECK(spill6_flush.index == spill6); + REQUIRE(spill6_flush.counts->size() == 1); // Should only be "subspill" layer + CHECK(spill6_flush.counts->count(subspill2->layer_hash()) == 1); // subspill 2 + + auto run4_flush = flushes[1]; + CHECK(run4_flush.index == run4); + REQUIRE(run4_flush.counts->size() == 1); // Should only be "spill" layer + CHECK(run4_flush.counts->count(spill5->layer_hash()) == 2); // spills 5 and 6 + + flushes = tracker.closeout(nullptr); + REQUIRE(flushes.size() == 1); // only job should have a flush count + + auto job_flush = flushes[0]; + CHECK(job_flush.index == job_index); + REQUIRE(job_flush.counts->size() == 1); // Should only be "run" layer + CHECK(job_flush.counts->count(run4->layer_hash()) == 2); // runs 4 and 5 +} + +TEST_CASE("Test data-cell tracker with multiple hierarchy branches", "[graph]") +{ + data_cell_tracker tracker; + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + auto calib1 = job_index->make_child("calib", 1); + auto run5 = job_index->make_child("run", 5); + + CHECK(tracker.closeout(job_index).empty()); + CHECK(tracker.closeout(run4).empty()); + CHECK(tracker.closeout(calib1).empty()); + CHECK(tracker.closeout(run5).empty()); + + auto flushes = tracker.closeout(nullptr); + REQUIRE(flushes.size() == 1); // only job should have a flush count + auto job_flush = flushes[0]; + CHECK(job_flush.index == job_index); + REQUIRE(job_flush.counts->size() == 2); // Should have "run" and "calib" layers + CHECK(job_flush.counts->count(run4->layer_hash()) == 2); // run 4 and 5 + CHECK(job_flush.counts->count(calib1->layer_hash()) == 1); // calib 1 +} + +TEST_CASE("Test data-cell tracker with missing intermediate layers", "[graph]") +{ + data_cell_tracker tracker; + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + auto spill2 = run4->make_child("spill", 2); + + CHECK(tracker.closeout(job_index).empty()); + + CHECK_THROWS_WITH(tracker.closeout(spill2), + "Received index [run:4, spill:2], which is not an immediate child of []"); +} + +TEST_CASE("Cached flush counts at destruction generate warning message", "[graph]") +{ + std::ostringstream oss; + use_ostream_logger(oss); + auto tracker = std::make_unique(); + + auto job_index = data_cell_index::job(); + auto run4 = job_index->make_child("run", 4); + + CHECK(tracker->closeout(job_index).empty()); + CHECK(tracker->closeout(run4).empty()); + + tracker.reset(); // Invoke destructor to trigger warning message + auto const warning = oss.str(); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Cached pending flushes at destruction:")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Index: []")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("7457871974376244100 = 1")); +} From b26f4f078f5543f0a2e61247d54084700ede503f Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Mon, 27 Apr 2026 17:54:59 -0500 Subject: [PATCH 3/4] Support multi-layer transforms with products from unfolds --- phlex/core/declared_unfold.cpp | 15 +- phlex/core/declared_unfold.hpp | 50 ++-- phlex/core/edge_maker.hpp | 26 +- phlex/core/framework_graph.cpp | 104 +++++--- phlex/core/framework_graph.hpp | 5 +- phlex/core/index_router.cpp | 434 +++++++++++++++++++++----------- phlex/core/index_router.hpp | 136 +++++----- phlex/model/fixed_hierarchy.cpp | 7 +- phlex/model/fixed_hierarchy.hpp | 4 + 9 files changed, 475 insertions(+), 306 deletions(-) diff --git a/phlex/core/declared_unfold.cpp b/phlex/core/declared_unfold.cpp index 640a92433..f3849baff 100644 --- a/phlex/core/declared_unfold.cpp +++ b/phlex/core/declared_unfold.cpp @@ -1,6 +1,6 @@ #include "phlex/core/declared_unfold.hpp" -#include "phlex/model/data_cell_counter.hpp" #include "phlex/model/handle.hpp" +#include "phlex/utilities/hashing.hpp" #include "fmt/std.h" #include "spdlog/spdlog.h" @@ -12,25 +12,18 @@ namespace phlex::experimental { std::string const& child_layer_name) : parent_{std::const_pointer_cast(parent)}, node_name_{std::move(node_name)}, - child_layer_name_{child_layer_name} + child_layer_name_{child_layer_name}, + child_layer_hash_{hash(parent->index()->layer_hash(), identifier{child_layer_name_}.hash())} { } product_store_const_ptr generator::make_child(std::size_t const i, products new_products) { auto child_index = parent_->index()->make_child(child_layer_name_, i); - ++child_counts_[child_index->layer_hash()]; + ++child_counts_; return std::make_shared(child_index, node_name_, std::move(new_products)); } - flush_counts_ptr generator::flush_result() const - { - if (not child_counts_.empty()) { - return std::make_shared(std::move(child_counts_)); - } - return nullptr; - } - declared_unfold::declared_unfold(algorithm_name name, std::vector predicates, product_queries input_products, diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index 76855927b..de481e47c 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -11,6 +11,7 @@ #include "phlex/core/products_consumer.hpp" #include "phlex/model/algorithm_name.hpp" #include "phlex/model/data_cell_index.hpp" +#include "phlex/model/data_cell_tracker.hpp" #include "phlex/model/handle.hpp" #include "phlex/model/identifier.hpp" #include "phlex/model/product_specification.hpp" @@ -39,22 +40,19 @@ namespace phlex::experimental { explicit generator(product_store_const_ptr const& parent, algorithm_name node_name, std::string const& child_layer_name); - flush_counts_ptr flush_result() const; - product_store_const_ptr make_child_for(std::size_t const data_cell_number, - products new_products) - { - return make_child(data_cell_number, std::move(new_products)); - } + std::size_t child_layer_hash() const { return child_layer_hash_; } + std::size_t child_count() const { return child_counts_; } + product_store_const_ptr make_child(std::size_t i, products new_products); private: - product_store_const_ptr make_child(std::size_t i, products new_products); product_store_ptr parent_; algorithm_name node_name_; // References declared_unfold::child_layer_, which outlives this short-lived object. // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members) std::string const& child_layer_name_; - std::map child_counts_; + std::size_t child_layer_hash_; + std::size_t child_counts_ = 0; }; class PHLEX_CORE_EXPORT declared_unfold : public products_consumer { @@ -66,10 +64,10 @@ namespace phlex::experimental { ~declared_unfold() override; virtual tbb::flow::sender& output_port() = 0; - virtual tbb::flow::sender& output_index_port() = 0; + virtual tbb::flow::sender& output_index_port() = 0; + virtual tbb::flow::sender& flush_sender() = 0; virtual product_specifications const& output() const = 0; virtual std::size_t product_count() const = 0; - virtual flusher_t& flusher() = 0; std::string const& child_layer() const noexcept { return child_layer_; } @@ -109,19 +107,16 @@ namespace phlex::experimental { unfold_{g, concurrency, [this, p = std::move(predicate), ufold = std::move(unfold)]( - messages_t const& messages, auto&) { + messages_t const& messages, auto& outputs) { auto const& msg = most_derived(messages); auto const& store = msg.store; - std::size_t const original_message_id{msg_counter_}; generator g{store, this->full_name(), child_layer()}; call(p, ufold, store->index(), g, messages, std::make_index_sequence{}); - - flusher_.try_put({.index = store->index(), - .counts = g.flush_result(), - .original_id = original_message_id}); - }}, - flusher_{g} + std::get<2>(outputs).try_put({.index = store->index(), + .layer_hash = g.child_layer_hash(), + .count = g.child_count()}); + }} { if constexpr (num_inputs > 1ull) { make_edge(join_, unfold_); @@ -142,12 +137,15 @@ namespace phlex::experimental { { return tbb::flow::output_port<0>(unfold_); } - tbb::flow::sender& output_index_port() override + tbb::flow::sender& output_index_port() override { return tbb::flow::output_port<1>(unfold_); } + tbb::flow::sender& flush_sender() override + { + return tbb::flow::output_port<2>(unfold_); + } product_specifications const& output() const override { return output_; } - flusher_t& flusher() override { return flusher_; } template void call(Predicate const& predicate, @@ -181,10 +179,10 @@ namespace phlex::experimental { } ++product_count_; - auto child = g.make_child_for(counter++, std::move(new_products)); - tbb::flow::output_port<0>(unfold_).try_put( - {.store = child, .id = msg_counter_.fetch_add(1)}); - tbb::flow::output_port<1>(unfold_).try_put(child->index()); + auto child = g.make_child(counter++, std::move(new_products)); + auto const msg_id = msg_counter_.fetch_add(1); + tbb::flow::output_port<0>(unfold_).try_put({.store = child, .id = msg_id}); + tbb::flow::output_port<1>(unfold_).try_put({.index = child->index(), .msg_id = msg_id}); } } @@ -195,9 +193,9 @@ namespace phlex::experimental { input_retriever_types input_{input_arguments()}; product_specifications output_; join_or_none_t join_; - tbb::flow::multifunction_node, std::tuple> + tbb::flow::multifunction_node, + std::tuple> unfold_; - flusher_t flusher_; std::atomic msg_counter_{}; // Is this sufficient? Probably not. std::atomic calls_{}; std::atomic product_count_{}; diff --git a/phlex/core/edge_maker.hpp b/phlex/core/edge_maker.hpp index c9bedb832..a6e8c90e4 100644 --- a/phlex/core/edge_maker.hpp +++ b/phlex/core/edge_maker.hpp @@ -33,12 +33,11 @@ namespace phlex::experimental { edge_maker(Args&... args); template - void operator()(tbb::flow::graph& g, - index_router& multi, - std::map& filters, - declared_outputs& outputs, - declared_providers& providers, - Args&... consumers); + std::tuple> + operator()(std::map& filters, + declared_outputs& outputs, + declared_providers& providers, + Args&... consumers); private: template @@ -100,12 +99,11 @@ namespace phlex::experimental { } template - void edge_maker::operator()(tbb::flow::graph& g, - index_router& multi, - std::map& filters, - declared_outputs& outputs, - declared_providers& providers, - Args&... consumers) + std::tuple> + edge_maker::operator()(std::map& filters, + declared_outputs& outputs, + declared_providers& providers, + Args&... consumers) { // Create edges to outputs for (auto const& [output_name, output_node] : outputs) { @@ -126,7 +124,7 @@ namespace phlex::experimental { if (head_ports.empty()) { // This can happen for jobs that only execute the driver, which is helpful for debugging - return; + return {}; } auto provider_input_ports = make_provider_edges(std::move(head_ports), providers); @@ -134,7 +132,7 @@ namespace phlex::experimental { std::map multilayer_join_index_ports; (multilayer_join_index_ports.merge(multilayer_ports(consumers)), ...); - multi.finalize(g, std::move(provider_input_ports), std::move(multilayer_join_index_ports)); + return std::make_tuple(std::move(provider_input_ports), std::move(multilayer_join_index_ports)); } } diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 3e32feb69..b8d897ef7 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -29,15 +29,20 @@ namespace phlex::experimental { fixed_hierarchy_{std::move(bundle.hierarchy)}, driver_{std::move(bundle.driver)}, src_{graph_, - [this](tbb::flow_control& fc) mutable -> data_cell_index_ptr { + [this](tbb::flow_control& fc) mutable -> closeout_then_emit { if (auto item = driver_()) { - return index_router_.route(*item); + return {.closeout_flushes = cell_tracker_.closeout(*item), .index_to_emit = *item}; } - index_router_.drain(); fc.stop(); return {}; }}, index_router_{graph_}, + index_receiver_{graph_, + tbb::flow::unlimited, + [this](closeout_then_emit const& input) -> data_cell_index_ptr { + auto&& [closeout_flushes, index_to_emit] = input; + return index_router_.route(index_to_emit, closeout_flushes); + }}, hierarchy_node_{graph_, tbb::flow::unlimited, [this](data_cell_index_ptr const& index) -> tbb::flow::continue_msg { @@ -53,7 +58,8 @@ namespace phlex::experimental { { if (shutdown_on_error_) { // When in an error state, we need to sanely pop the layer stack and wait for any tasks to finish. - index_router_.drain(); + auto remaining_flushes = cell_tracker_.closeout(nullptr); + index_router_.drain(std::move(remaining_flushes)); graph_.wait_for_all(); } } @@ -89,6 +95,10 @@ namespace phlex::experimental { { src_.activate(); graph_.wait_for_all(); + + // Now back out of all remaining layers + index_router_.drain(cell_tracker_.closeout(nullptr)); + graph_.wait_for_all(); } namespace { @@ -135,49 +145,65 @@ namespace phlex::experimental { filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.unfolds)); filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.transforms)); - edge_maker make_edges{nodes_.transforms, nodes_.folds, nodes_.unfolds}; - make_edges(graph_, - index_router_, - filters_, - nodes_.outputs, - nodes_.providers, - nodes_.predicates, - nodes_.observers, - nodes_.folds, - nodes_.unfolds, - nodes_.transforms); - - std::map flushers_from_unfolds; + std::set unfold_input_layer_names; + // Count how many distinct unfold nodes consume each input layer. When that count is + // greater than one, the child_tracker for an index in that layer must collect a flush + // message from every unfold before it knows the total number of children it will see. + std::map unfold_count_per_input_layer; for (auto const& n : nodes_.unfolds | std::views::values) { - flushers_from_unfolds.try_emplace(identifier{n->child_layer()}, &n->flusher()); + for (auto const& input : n->input()) { + if (!static_cast(input.layer).empty()) { + unfold_input_layer_names.insert(input.layer); + ++unfold_count_per_input_layer[identifier{input.layer}]; + } + } } - // Connect edges between all nodes, the graph-wide flusher, and the unfolds' flushers - auto connect_with_flusher = - [this, unfold_flushers = std::move(flushers_from_unfolds)](auto& consumers) { - for (auto& n : consumers | std::views::values) { - std::set flushers; - // For providers - for (product_query const& pq : n->input()) { - if (auto it = unfold_flushers.find(pq.layer); it != unfold_flushers.end()) { - flushers.insert(it->second); - } else { - flushers.insert(&index_router_.flusher()); - } - } - for (flusher_t* flusher : flushers) { - make_edge(*flusher, n->flush_port()); - } - } - }; - connect_with_flusher(nodes_.folds); + std::vector unfold_output_layer_names; + for (auto const& n : nodes_.unfolds | std::views::values) { + unfold_output_layer_names.emplace_back(n->child_layer()); + } + + index_router_.establish_layers( + fixed_hierarchy_.layer_paths(), + std::vector(unfold_input_layer_names.begin(), unfold_input_layer_names.end()), + unfold_output_layer_names); + index_router_.register_unfold_count_per_input_layer(std::move(unfold_count_per_input_layer)); + + edge_maker make_edges{nodes_.transforms, nodes_.folds, nodes_.unfolds}; + auto [provider_input_ports, multilayer_join_index_ports] = + make_edges(filters_, + nodes_.outputs, + nodes_.providers, + nodes_.unfolds, + // Consumers of data products below + nodes_.predicates, + nodes_.observers, + nodes_.folds, + nodes_.unfolds, + nodes_.transforms); + if (not std::empty(provider_input_ports)) { + index_router_.finalize( + graph_, std::move(provider_input_ports), std::move(multilayer_join_index_ports)); + } // The hierarchy node is used to report which data layers have been seen by the // framework. To assemble the report, data-cell indices emitted by the input node are // recorded as well as any data-cell indices emitted by an unfold. - make_edge(src_, hierarchy_node_); + + // FIXME: Eventually the separate index_receiver_ and index_router_.index_receiver() may be combined. + // Should also consider whether inline tasks can be used. + make_edge(src_, index_receiver_); + make_edge(index_receiver_, hierarchy_node_); + make_edge(index_router_.index_receiver(), hierarchy_node_); + + for (auto& [_, node] : nodes_.folds) { + make_edge(index_router_.flusher(), node->flush_port()); + } + for (auto& [_, node] : nodes_.unfolds) { - make_edge(node->output_index_port(), hierarchy_node_); + make_edge(node->output_index_port(), index_router_.index_receiver()); + make_edge(node->flush_sender(), index_router_.flush_receiver()); } } } diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index 373917517..ee8a75567 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -11,6 +11,7 @@ #include "phlex/core/message.hpp" #include "phlex/core/node_catalog.hpp" #include "phlex/driver.hpp" +#include "phlex/model/data_cell_tracker.hpp" #include "phlex/model/data_layer_hierarchy.hpp" #include "phlex/model/product_store.hpp" #include "phlex/module.hpp" @@ -166,8 +167,10 @@ namespace phlex::experimental { tbb::flow::graph graph_{}; framework_driver driver_; std::vector registration_errors_{}; - tbb::flow::input_node src_; + data_cell_tracker cell_tracker_{}; + tbb::flow::input_node src_; index_router index_router_; + tbb::flow::function_node index_receiver_; tbb::flow::function_node hierarchy_node_; bool shutdown_on_error_{false}; }; diff --git a/phlex/core/index_router.cpp b/phlex/core/index_router.cpp index 821731494..7b0ce143f 100644 --- a/phlex/core/index_router.cpp +++ b/phlex/core/index_router.cpp @@ -1,8 +1,10 @@ #include "phlex/core/index_router.hpp" +#include "phlex/model/child_tracker.hpp" #include "phlex/model/product_store.hpp" #include "fmt/std.h" #include "oneapi/tbb/flow_graph.h" +#include "phlex/utilities/hashing.hpp" #include "spdlog/spdlog.h" #include @@ -13,118 +15,155 @@ using namespace phlex::experimental; namespace { - std::string delimited_layer_path(std::string_view const layer_path) + using layer_path_t = std::vector; + + std::size_t layer_hash_for_path(layer_path_t const& layer_path) { - if (not layer_path.starts_with("/")) { - return fmt::format("/{}", layer_path); + std::size_t result = "job"_id.hash(); + for (auto const& layer_name : layer_path | std::views::drop(1)) { + result = hash(result, identifier{layer_name}.hash()); } - return std::string{layer_path}; + return result; } - void send_messages(phlex::data_cell_index_ptr const& index, - std::size_t message_id, - phlex::experimental::detail::multilayer_slots const& slots) + bool is_strict_prefix(layer_path_t const& candidate, layer_path_t const& other) { - for (auto& slot : slots) { - slot->put_message(index, message_id); - } + // FIXME: Use std::ranges::starts_with(other, candidate) once the compilers support it (C++23) + return candidate.size() < other.size() and + std::ranges::mismatch(other, candidate).in2 == std::ranges::end(candidate); } -} -namespace phlex::experimental { - - //======================================================================================== - // layer_scope implementation - - detail::layer_scope::layer_scope(flush_counters& counters, - flusher_t& flusher, - detail::multilayer_slots const& slots_for_layer, - data_cell_index_ptr index, - std::size_t const message_id) : - counters_{counters}, - flusher_{flusher}, - slots_{slots_for_layer}, - index_{index}, - message_id_{message_id} + std::string delimited_layer_path(std::string_view const layer_path) { - // FIXME: Only for folds right now - counters_.update(index_); + if (not layer_path.starts_with("/")) { + return fmt::format("/{}", layer_path); + } + return std::string{layer_path}; } - detail::layer_scope::~layer_scope() + flush_counts_ptr make_flush_counts_ptr(std::shared_ptr const& fc) { - // To consider: We may want to skip the following logic if the framework prematurely - // needs to shut down. Keeping it enabled allows in-flight folds to - // complete. However, in some cases it may not be desirable to do this. - - for (auto& slot : slots_) { - slot->put_end_token(index_); + auto const& committed = fc->committed_counts(); + if (committed.size() == 0) { + return nullptr; } - - // The following is for fold nodes only (temporary until the release of fold results are incorporated - // into the above paradigm). - auto flush_result = counters_.extract(index_); - flush_counts_ptr result; - if (not flush_result.empty()) { - result = std::make_shared(std::move(flush_result)); + std::map child_counts; + for (auto const& [layer_hash, count] : committed) { + child_counts.emplace(layer_hash, count.load()); } - flusher_.try_put({index_, std::move(result), message_id_}); + return std::make_shared(std::move(child_counts)); } +} - std::size_t detail::layer_scope::depth() const { return index_->depth(); } +namespace phlex::experimental { //======================================================================================== // multilayer_slot implementation + namespace detail { + class multilayer_slot { + public: + multilayer_slot(tbb::flow::graph& g, + identifier layer, + tbb::flow::receiver* flush_port, + tbb::flow::receiver* input_port); + + void put_message(data_cell_index_ptr const& index, std::size_t message_id); + void put_end_token(data_cell_index_ptr const& index, child_tracker const& fc); + + bool matches_exactly(std::string const& layer_path) const; + bool is_parent_of(data_cell_index_ptr const& index) const; + + private: + identifier layer_; + index_set_node broadcaster_; + flush_node flusher_; + }; + + multilayer_slot::multilayer_slot(tbb::flow::graph& g, + identifier layer, + tbb::flow::receiver* flush_port, + tbb::flow::receiver* input_port) : + layer_{std::move(layer)}, broadcaster_{g}, flusher_{g} + { + make_edge(broadcaster_, *input_port); + make_edge(flusher_, *flush_port); + } - detail::multilayer_slot::multilayer_slot(tbb::flow::graph& g, - identifier layer, - tbb::flow::receiver* flush_port, - tbb::flow::receiver* input_port) : - layer_{std::move(layer)}, broadcaster_{g}, flusher_{g} - { - make_edge(broadcaster_, *input_port); - make_edge(flusher_, *flush_port); - } + void multilayer_slot::put_message(data_cell_index_ptr const& index, std::size_t message_id) + { + if (layer_ == index->layer_name()) { + broadcaster_.try_put({.index = index, .msg_id = message_id, .cache = false}); + return; + } - void detail::multilayer_slot::put_message(data_cell_index_ptr const& index, - std::size_t message_id) - { - if (layer_ == index->layer_name()) { - broadcaster_.try_put({.index = index, .msg_id = message_id, .cache = false}); - return; + broadcaster_.try_put({.index = index->parent(layer_), .msg_id = message_id}); } - // Flush values are only used for indices that are *not* the "lowest" in the branch - // of the hierarchy. - ++counter_; - broadcaster_.try_put({.index = index->parent(layer_), .msg_id = message_id}); - } + void multilayer_slot::put_end_token(data_cell_index_ptr const& index, child_tracker const& fc) + { + // We're going to have to be a little more careful about this. The committed total count may + // not be enough granularity for some downstream nodes. + flusher_.try_put({.index = index, .count = static_cast(fc.committed_total_count())}); + } - void detail::multilayer_slot::put_end_token(data_cell_index_ptr const& index) - { - auto count = std::exchange(counter_, 0); - if (count == 0) { - // See comment above about flush values - return; + bool multilayer_slot::matches_exactly(std::string const& layer_path) const + { + return layer_path.ends_with(delimited_layer_path(static_cast(layer_))); } - flusher_.try_put({.index = index, .count = count}); + bool multilayer_slot::is_parent_of(data_cell_index_ptr const& index) const + { + return index->parent(layer_) != nullptr; + } } - bool detail::multilayer_slot::matches_exactly(std::string const& layer_path) const + //======================================================================================== + // index_router implementation + index_router::index_router(tbb::flow::graph& g) : + index_receiver_{g, + tbb::flow::unlimited, + [this](index_message const& msg) -> data_cell_index_ptr { + auto const& [index, message_id, _] = msg; + assert(index); + return route(index, index_is_lowest_layer(index), message_id); + }}, + flush_receiver_{g, + tbb::flow::unlimited, + [this](unfold_flush input) -> tbb::flow::continue_msg { + auto&& [index, layer_hash, count] = input; + counter(index)->update_expected_count(layer_hash, count); + // Because the flush receiver receives flush values, the index cannot + // correspond to a lowest layer. + flush_if_done(index, false); + return {}; + }}, + flusher_{g} { - return layer_path.ends_with(delimited_layer_path(static_cast(layer_))); } - bool detail::multilayer_slot::is_parent_of(data_cell_index_ptr const& index) const + void index_router::establish_layers( + std::vector> const& layer_paths_from_driver, + std::vector unfold_input_layer_names, + std::vector unfold_output_layer_names) { - return index->parent(layer_) != nullptr; - } - - //======================================================================================== - // index_router implementation + auto sorted_layer_paths = layer_paths_from_driver; + std::ranges::sort(sorted_layer_paths); + + std::vector> lowest_layer_candidates; + // In sorted order, a path can only be a prefix of paths that follow it. + for (std::size_t i = 0; i < sorted_layer_paths.size(); ++i) { + bool const is_not_lowest_layer = + i + 1 < sorted_layer_paths.size() and + is_strict_prefix(sorted_layer_paths[i], sorted_layer_paths[i + 1]); + if (is_not_lowest_layer) { + auto const layer_hash = layer_hash_for_path(sorted_layer_paths[i]); + is_lowest_layer_hashes_.emplace(layer_hash, false); + } + } - index_router::index_router(tbb::flow::graph& g) : flusher_{g} {} + unfold_input_layer_names_ = unfold_input_layer_names; + unfold_output_layer_names_ = unfold_output_layer_names; + } void index_router::finalize(tbb::flow::graph& g, provider_input_ports_t provider_input_ports, @@ -132,12 +171,10 @@ namespace phlex::experimental { { // We must have at least one provider port, or there can be no data to process. assert(!provider_input_ports.empty()); - provider_input_ports_ = std::move(provider_input_ports); // Create the index-set broadcast nodes for providers - for (auto& [pq, provider_port] : provider_input_ports_ | std::views::values) { - auto [it, _] = broadcasters_.try_emplace(static_cast(pq.layer), - std::make_shared(g)); + for (auto& [pq, provider_port] : provider_input_ports | std::views::values) { + auto [it, _] = broadcasters_.insert({pq.layer, std::make_shared(g)}); make_edge(*it->second, *provider_port); } @@ -150,76 +187,139 @@ namespace phlex::experimental { auto entry = std::make_shared(g, layer, flush_port, input_port); casters.push_back(entry); } - multibroadcasters_.try_emplace(identifier{node_name}, std::move(casters)); + multibroadcasters_.insert({identifier{node_name}, std::move(casters)}); } } - data_cell_index_ptr index_router::route(data_cell_index_ptr const index) + data_cell_index_ptr index_router::route(data_cell_index_ptr const index, index_flushes flushes) { - backout_to(index); + update_flush_counts(std::move(flushes)); + return route(index, index_is_lowest_layer(index), received_indices_.fetch_add(1)); + } - auto message_id = received_indices_.fetch_add(1); + data_cell_index_ptr index_router::route(data_cell_index_ptr index, + bool const is_lowest_layer, + std::size_t const message_id) + { + if (auto broadcaster = get_provider_broadcaster(index)) { + broadcaster->try_put({.index = index, .msg_id = message_id}); + } - send_to_provider_index_nodes(index, message_id); - auto const& slots_for_layer = send_to_multilayer_join_nodes(index, message_id); + auto [routing_slots, flushing_slots] = get_multilayer_slots(index); + for (auto const& slot : *routing_slots) { + slot->put_message(index, message_id); + } - layers_.emplace(counters_, flusher_, slots_for_layer, index, message_id); + // There should be no counter if the index is a lowest layer + if (not is_lowest_layer) { + counter(index)->set_flush_callback( + [this, flushing_slots = std::move(flushing_slots), index, message_id]( + std::shared_ptr fc) { + for (auto const& slot : *flushing_slots) { + slot->put_end_token(index, *fc); + } + + // Used only for folds, until folds use the slot infrastructure above. + flusher_.try_put({index, make_flush_counts_ptr(fc), message_id}); + }); + } + + flush_if_done(index, is_lowest_layer); return index; } - void index_router::backout_to(data_cell_index_ptr const index) + void index_router::drain(index_flushes flushes) { update_flush_counts(std::move(flushes)); } + + void index_router::register_unfold_count_per_input_layer(std::map counts) { - assert(index); - auto const new_depth = index->depth(); - while (not empty(layers_) and new_depth <= layers_.top().depth()) { - layers_.pop(); - } + // Called once during finalize(), before any indices are routed, so no concurrent access. + unfold_count_per_input_layer_ = std::move(counts); } - void index_router::drain() + bool index_router::index_is_lowest_layer(data_cell_index_ptr const& index) { - while (not empty(layers_)) { - layers_.pop(); + auto it = is_lowest_layer_hashes_.find(index->layer_hash()); + if (it != is_lowest_layer_hashes_.end()) { + return it->second; + } + + if (std::ranges::contains(unfold_input_layer_names_, index->layer_name())) { + // FIXME: Need to make sure that the index is a child of existing layers + return is_lowest_layer_hashes_.emplace(index->layer_hash(), false).first->second; } + + if (std::ranges::contains(unfold_output_layer_names_, index->layer_name())) { + return is_lowest_layer_hashes_.emplace(index->layer_hash(), true).first->second; + } + + // If the index is neither and input or an output to an unfold, it is assumed to be a lowest layer. + return is_lowest_layer_hashes_.emplace(index->layer_hash(), true).first->second; } - void index_router::send_to_provider_index_nodes(data_cell_index_ptr const& index, - std::size_t const message_id) + detail::index_set_node_ptr index_router::get_provider_broadcaster( + data_cell_index_ptr const& index) { - if (auto it = matched_broadcasters_.find(index->layer_hash()); - it != matched_broadcasters_.end()) { - // Not all layers will have a corresponding broadcaster - if (it->second) { - it->second->try_put({.index = index, .msg_id = message_id}); - } - return; + auto const layer_hash = index->layer_hash(); + if (auto it = matched_broadcasters_.find(layer_hash); it != matched_broadcasters_.end()) { + return it->second; } std::string const layerish_path{static_cast(index->layer_name())}; auto broadcaster = index_node_for(layerish_path); - if (broadcaster) { - broadcaster->try_put({.index = index, .msg_id = message_id}); + matched_broadcasters_.insert({layer_hash, broadcaster}); + return broadcaster; + } + + auto index_router::index_node_for(std::string const& layer_path) -> detail::index_set_node_ptr + { + std::string const search_token = delimited_layer_path(layer_path); + + std::vector candidates; + for (auto it = broadcasters_.begin(), e = broadcasters_.end(); it != e; ++it) { + if (search_token.ends_with(delimited_layer_path(static_cast(it->first)))) { + candidates.push_back(it); + } } - // We cache the result of the lookup even if there is no broadcaster for this layer, - // to avoid repeated lookups for layers that don't have broadcasters. - matched_broadcasters_.try_emplace(index->layer_hash(), broadcaster); + + if (candidates.size() == 1ull) { + return candidates[0]->second; + } + + if (candidates.empty()) { + return nullptr; + } + + std::string msg = fmt::format("Multiple layers match specification {}:\n", layer_path); + for (auto const& it : candidates) { + msg += fmt::format("\n- {}", it->first); + } + throw std::runtime_error(msg); } - detail::multilayer_slots const& index_router::send_to_multilayer_join_nodes( - data_cell_index_ptr const& index, std::size_t const message_id) + std::pair + index_router::get_multilayer_slots(data_cell_index_ptr const& index) { auto const layer_hash = index->layer_hash(); - if (auto it = matched_routing_entries_.find(layer_hash); it != matched_routing_entries_.end()) { - send_messages(index, message_id, it->second); - return matched_flushing_entries_.find(layer_hash)->second; + // Fast path: shared lock allows concurrent reads of cached entries. + { + matched_multilayer_const_accessor acc; + if (matched_multilayer_entries_.find(acc, layer_hash)) { + return {acc->second.routing, acc->second.flushing}; + } } - auto routing_it = matched_routing_entries_.try_emplace(layer_hash).first; - auto flushing_it = matched_flushing_entries_.try_emplace(layer_hash).first; + // Slow path: exclusive lock serializes concurrent cache misses for the same layer. + matched_multilayer_accessor acc; + auto const inserted = matched_multilayer_entries_.insert(acc, layer_hash); + if (not inserted) { + return {acc->second.routing, acc->second.flushing}; + } auto const layer_path = index->layer_path(); + detail::multilayer_slots routing_slots; + detail::multilayer_slots flushing_slots; // For each multi-layer join node, determine which slots are relevant to this index. // Routing entries: All slots from a node are added if (1) at least one slot exactly @@ -236,7 +336,7 @@ namespace phlex::experimental { for (auto& slot : slots) { if (slot->matches_exactly(layer_path)) { has_exact_match = true; - flushing_it->second.push_back(slot); + flushing_slots.push_back(slot); matching_slots.push_back(slot); ++matched_count; } else if (slot->is_parent_of(index)) { @@ -248,38 +348,86 @@ namespace phlex::experimental { // Add all matching slots to routing entries only if we have an exact match and // all slots from this node matched something (either exactly or as a parent). if (has_exact_match and matched_count == slots.size()) { - routing_it->second.insert(routing_it->second.end(), - std::make_move_iterator(matching_slots.begin()), - std::make_move_iterator(matching_slots.end())); + routing_slots.insert(routing_slots.end(), + std::make_move_iterator(matching_slots.begin()), + std::make_move_iterator(matching_slots.end())); } } - send_messages(index, message_id, routing_it->second); - return flushing_it->second; + + acc->second.routing = + std::make_shared(std::move(routing_slots)); + acc->second.flushing = + std::make_shared(std::move(flushing_slots)); + return {acc->second.routing, acc->second.flushing}; } - auto index_router::index_node_for(std::string const& layer_path) -> detail::index_set_node_ptr + void index_router::update_flush_counts(index_flushes flushes) { - std::string const search_token = delimited_layer_path(layer_path); - - std::vector candidates; - for (auto it = broadcasters_.begin(), e = broadcasters_.end(); it != e; ++it) { - if (search_token.ends_with(delimited_layer_path(static_cast(it->first)))) { - candidates.push_back(it); - } + for (auto const& [index, flush_counts] : flushes) { + counter(index)->update_expected_counts(*flush_counts); + flush_if_done(index, false); } + } - if (candidates.size() == 1ull) { - return candidates[0]->second; + std::shared_ptr index_router::counter(data_cell_index_ptr const& index) + { + if (const_accessor a; child_trackers_.find(a, index->hash())) { + return a->second; } - if (candidates.empty()) { - return nullptr; + accessor a; + // If multiple unfolds consume this layer, the counter must wait for a flush message + // from each of them before it can evaluate done(). Without this, the first unfold to + // finish could cause the counter to fire before the others have reported their counts. + std::size_t const expected_flush_count = [&]() -> std::size_t { + auto it = unfold_count_per_input_layer_.find(index->layer_name()); + return it != unfold_count_per_input_layer_.end() ? it->second : 0; + }(); + child_trackers_.emplace( + a, index->hash(), std::make_shared(index, expected_flush_count)); + return a->second; + } + + void index_router::flush_if_done(data_cell_index_ptr index, bool const is_lowest_layer) + { + assert(index); + + auto parent = index->parent(); + + // Lowest-layer indices do not have their own counters. + // Increment the parent counter for the lowest-layer index instead. + if (is_lowest_layer) { + // Sometimes the lowest layer is the job, which has no parent + if (parent) { + counter(parent)->increment(index->layer_hash()); + } + index = parent; } - std::string msg = fmt::format("Multiple layers match specification {}:\n", layer_path); - for (auto const& it : candidates) { - msg += fmt::format("\n- {}", it->first); + while (index) { + // Use an exclusive accessor so that only one thread can release a given counter. + // This prevents double-release when multiple threads call + // flush_if_done concurrently for the same index. + accessor a; + if (not child_trackers_.find(a, index->hash())) { + // This can happen when two threads process the same parent index, + // and one of them releases it before the other completes. + return; + } + + if (not a->second->all_children_accounted()) { + return; + } + + a->second->send_flush(); + + if (auto parent = index->parent()) { + auto parent_counter = counter(parent); + parent_counter->update_committed_counts(a->second->committed_counts()); + parent_counter->increment(index->layer_hash()); + index = parent; + } + child_trackers_.erase(a); } - throw std::runtime_error(msg); } } diff --git a/phlex/core/index_router.hpp b/phlex/core/index_router.hpp index 4dc7f08f6..2ab72d271 100644 --- a/phlex/core/index_router.hpp +++ b/phlex/core/index_router.hpp @@ -5,10 +5,13 @@ #include "phlex/core/fwd.hpp" #include "phlex/core/message.hpp" +#include "phlex/model/child_tracker.hpp" #include "phlex/model/data_cell_counter.hpp" #include "phlex/model/data_cell_index.hpp" #include "phlex/model/identifier.hpp" +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_unordered_map.h" #include "oneapi/tbb/flow_graph.h" #include @@ -32,55 +35,9 @@ namespace phlex::experimental { // join operation. It: // (a) routes index messages to either the matching layer or its data-layer parent, and // (b) emits flush tokens to the repeater to evict a cached data product from memory. - class PHLEX_CORE_EXPORT multilayer_slot { - public: - multilayer_slot(tbb::flow::graph& g, - identifier layer, - tbb::flow::receiver* flush_port, - tbb::flow::receiver* input_port); - - void put_message(data_cell_index_ptr const& index, std::size_t message_id); - void put_end_token(data_cell_index_ptr const& index); - - bool matches_exactly(std::string const& layer_path) const; - bool is_parent_of(data_cell_index_ptr const& index) const; - - private: - identifier layer_; - detail::index_set_node broadcaster_; - detail::flush_node flusher_; - int counter_ = 0; - }; - + class multilayer_slot; using multilayer_slots = std::vector>; - - // A layer_scope object is an RAII object that manages layer-scoped operations during - // data-cell-index routing. It updates flush counters on construction and ensures cleanup - // (flushing end tokens and releasing fold results) on destruction. - class PHLEX_CORE_EXPORT layer_scope { - public: - layer_scope(flush_counters& counters, - flusher_t& flusher, - multilayer_slots const& slots_for_layer, - data_cell_index_ptr index, - std::size_t message_id); - ~layer_scope(); - layer_scope(layer_scope const&) = delete; - layer_scope& operator=(layer_scope const&) = delete; - layer_scope(layer_scope&&) = delete; - layer_scope& operator=(layer_scope&&) = delete; - std::size_t depth() const; - - private: - // Non-owning references to externally-owned state; layer_scope is an RAII guard. - // NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members) - flush_counters& counters_; - flusher_t& flusher_; - multilayer_slots const& slots_; - // NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members) - data_cell_index_ptr index_; - std::size_t message_id_; - }; + using multilayer_slots_ptr = std::shared_ptr; } class PHLEX_CORE_EXPORT index_router { @@ -101,49 +58,88 @@ namespace phlex::experimental { using provider_input_ports_t = std::map; explicit index_router(tbb::flow::graph& g); - data_cell_index_ptr route(data_cell_index_ptr index); + data_cell_index_ptr route(data_cell_index_ptr index, index_flushes flushes); + + void establish_layers(std::vector> const& layer_paths_from_driver, + std::vector unfold_input_layer_names, + std::vector unfold_output_layer_names); + + // Registers how many unfolds produce children from each input layer. Must be called + // before execution so that child_trackers are initialized with the correct expected + // child count when they are first created. + void register_unfold_count_per_input_layer(std::map counts); void finalize(tbb::flow::graph& g, provider_input_ports_t provider_input_ports, std::map multilayers); - void drain(); + void drain(index_flushes flushes); flusher_t& flusher() { return flusher_; } + tbb::flow::function_node& index_receiver() + { + return index_receiver_; + } + tbb::flow::function_node& flush_receiver() { return flush_receiver_; } + private: - void backout_to(data_cell_index_ptr store); - void send_to_provider_index_nodes(data_cell_index_ptr const& index, std::size_t message_id); - detail::multilayer_slots const& send_to_multilayer_join_nodes(data_cell_index_ptr const& index, - std::size_t message_id); + data_cell_index_ptr route(data_cell_index_ptr index, + bool is_lowest_layer, + std::size_t message_id); + bool index_is_lowest_layer(data_cell_index_ptr const& index); detail::index_set_node_ptr index_node_for(std::string const& layer); - - provider_input_ports_t provider_input_ports_; + detail::index_set_node_ptr get_provider_broadcaster(data_cell_index_ptr const& index); + std::pair get_multilayer_slots( + data_cell_index_ptr const& index); + void update_flush_counts(index_flushes flushes); + std::shared_ptr counter(data_cell_index_ptr const& index); + void flush_if_done(data_cell_index_ptr index, bool is_lowest_layer); + + tbb::flow::function_node index_receiver_; + tbb::flow::function_node flush_receiver_; std::atomic received_indices_{}; flusher_t flusher_; - flush_counters counters_; - std::stack layers_; + tbb::concurrent_unordered_map is_lowest_layer_hashes_; + std::vector unfold_input_layer_names_; + std::vector unfold_output_layer_names_; // ========================================================================================== // Routing to provider nodes // The following maps are used to route data-cell indices to provider nodes. // The first map is from layer name to the corresponding broadcaster node. - std::unordered_map broadcasters_; + tbb::concurrent_unordered_map broadcasters_; // The second map is a cache from a layer hash matched to a broadcaster node, to avoid // repeated lookups for the same layer. - std::unordered_map matched_broadcasters_; + tbb::concurrent_unordered_map matched_broadcasters_; // ========================================================================================== // Routing to multi-layer join nodes - // The first map is from the node name to the corresponding broadcaster nodes and flush - // nodes. - std::unordered_map multibroadcasters_; - // The second map is a cache from a layer hash matched to a set of multilayer slots, to - // avoid repeated lookups for the same layer. - std::unordered_map matched_routing_entries_; - // The third map is a cache from a layer hash matched to a set of multilayer slots for the - // purposes of flushing, to avoid repeated lookups for the same layer during flushing. - std::unordered_map matched_flushing_entries_; - }; + // The first map is from the node name to the corresponding broadcaster nodes and flush nodes. + tbb::concurrent_unordered_map multibroadcasters_; + + // This struct lets get_multilayer_slots return routing and flushing slots together, + // instead of passing concurrent_hash_map accessors as output parameters. + struct matched_multilayer_slots { + detail::multilayer_slots_ptr routing; + detail::multilayer_slots_ptr flushing; + }; + // Cache from layer hash to matched routing/flushing slots for that layer. + using matched_multilayer_entries_t = + tbb::concurrent_hash_map; + using matched_multilayer_accessor = matched_multilayer_entries_t::accessor; + using matched_multilayer_const_accessor = matched_multilayer_entries_t::const_accessor; + matched_multilayer_entries_t matched_multilayer_entries_; + // ========================================================================================== + // Child trackers + using trackers_t = tbb::concurrent_hash_map>; + using accessor = trackers_t::accessor; + using const_accessor = trackers_t::const_accessor; + trackers_t child_trackers_; + + // Number of unfolds that will send flush messages for each input layer. Used to + // initialize child_trackers with the correct expected child count. + std::map unfold_count_per_input_layer_; + }; } #endif // PHLEX_CORE_INDEX_ROUTER_HPP diff --git a/phlex/model/fixed_hierarchy.cpp b/phlex/model/fixed_hierarchy.cpp index 0508eeaa8..737474dcf 100644 --- a/phlex/model/fixed_hierarchy.cpp +++ b/phlex/model/fixed_hierarchy.cpp @@ -57,7 +57,8 @@ namespace { } namespace phlex { - + // ================================================================================ + // data_cell_cursor implementation data_cell_cursor::data_cell_cursor(data_cell_index_ptr index, fixed_hierarchy const& h, experimental::async_driver& d) : @@ -76,13 +77,15 @@ namespace phlex { std::string data_cell_cursor::layer_path() const { return index_->layer_path(); } + // ================================================================================ + // fixed_hierarchy implementation fixed_hierarchy::fixed_hierarchy(std::initializer_list> layer_paths) : fixed_hierarchy(std::vector>(layer_paths)) { } fixed_hierarchy::fixed_hierarchy(std::vector> layer_paths) : - layer_hashes_(std::from_range, build_hashes(layer_paths)) + layer_paths_(std::move(layer_paths)), layer_hashes_(std::from_range, build_hashes(layer_paths_)) { } diff --git a/phlex/model/fixed_hierarchy.hpp b/phlex/model/fixed_hierarchy.hpp index 18af5b606..07b9c3025 100644 --- a/phlex/model/fixed_hierarchy.hpp +++ b/phlex/model/fixed_hierarchy.hpp @@ -49,6 +49,9 @@ namespace phlex { explicit fixed_hierarchy(std::initializer_list> layer_paths); explicit fixed_hierarchy(std::vector> layer_paths); + // Returns the layer paths for this fixed hierarchy. + auto const& layer_paths() const { return layer_paths_; } + void validate(data_cell_index_ptr const& index) const; // Yields the job-level data-cell index to the provided driver and returns a @@ -57,6 +60,7 @@ namespace phlex { data_cell_cursor yield_job(experimental::async_driver& d) const; private: + std::vector> layer_paths_; std::vector layer_hashes_; }; From f4599e5dec2bf2bc880a9b925379a59a2bbb94d5 Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Fri, 8 May 2026 09:00:27 -0500 Subject: [PATCH 4/4] Re-enable different_hierarchies test (now fold_duplicate_layer_name) --- test/CMakeLists.txt | 7 ++----- ..._hierarchies.cpp => fold_duplicate_layer_name_test.cpp} | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) rename test/{different_hierarchies.cpp => fold_duplicate_layer_name_test.cpp} (97%) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 924ba8400..0f7b288c0 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -80,18 +80,15 @@ cet_test( ) cet_test( - different_hierarchies + fold_duplicate_layer_name USE_CATCH2_MAIN SOURCE - different_hierarchies.cpp + fold_duplicate_layer_name_test.cpp LIBRARIES phlex::core_internal spdlog::spdlog layer_generator_internal ) -# Disable pending resolution of [Change fold-result caching to use -# `multilayer_join_node`](https://github.com/Framework-R-D/phlex/issues/359) -set_tests_properties(different_hierarchies PROPERTIES DISABLED TRUE) cet_test(filter_impl USE_CATCH2_MAIN SOURCE filter_impl.cpp LIBRARIES phlex::core_internal diff --git a/test/different_hierarchies.cpp b/test/fold_duplicate_layer_name_test.cpp similarity index 97% rename from test/different_hierarchies.cpp rename to test/fold_duplicate_layer_name_test.cpp index 399024d3e..d170f7da0 100644 --- a/test/different_hierarchies.cpp +++ b/test/fold_duplicate_layer_name_test.cpp @@ -48,7 +48,7 @@ namespace { void add(std::atomic& counter, unsigned int number) { counter += number; } } -TEST_CASE("Different hierarchies used with fold", "[graph]") +TEST_CASE("Fold different layer paths with same trailing name", "[graph]") { // job -> run -> event layers constexpr auto index_limit = 2u;