Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions phlex/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ cet_make_library(
edge_creation_policy.cpp
edge_maker.cpp
filter.cpp
flush_counter.cpp
framework_graph.cpp
glue.cpp
message.cpp
Expand Down
15 changes: 8 additions & 7 deletions phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "phlex/core/declared_unfold.hpp"
#include "phlex/model/data_cell_counter.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/utilities/hashing.hpp"

#include "fmt/std.h"
#include "spdlog/spdlog.h"
Expand All @@ -12,23 +13,23 @@ namespace phlex::experimental {
std::string const& child_layer_name) :
parent_{std::const_pointer_cast<product_store>(parent)},
node_name_{std::move(node_name)},
child_layer_name_{child_layer_name}
child_layer_name_{child_layer_name},
child_layer_hash_{hash(parent->index()->layer_hash(), identifier{child_layer_name_}.hash())}
{
}

product_store_const_ptr generator::make_child(std::size_t const i, products new_products)
{
auto child_index = parent_->index()->make_child(child_layer_name_, i);
++child_counts_[child_index->layer_hash()];
++child_counts_;
return std::make_shared<product_store>(child_index, node_name_, std::move(new_products));
}

flush_counts_ptr generator::flush_result() const
data_cell_counts_ptr generator::flush_result() const
{
if (not child_counts_.empty()) {
return std::make_shared<flush_counts const>(std::move(child_counts_));
}
return nullptr;
auto flush_counts = std::make_shared<data_cell_counts>();
flush_counts->emplace(child_layer_hash_, child_counts_);
return flush_counts;
}

declared_unfold::declared_unfold(algorithm_name name,
Expand Down
47 changes: 21 additions & 26 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "phlex/core/products_consumer.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/data_cell_tracker.hpp"
#include "phlex/model/handle.hpp"
#include "phlex/model/identifier.hpp"
#include "phlex/model/product_specification.hpp"
Expand Down Expand Up @@ -39,22 +40,18 @@ namespace phlex::experimental {
explicit generator(product_store_const_ptr const& parent,
algorithm_name node_name,
std::string const& child_layer_name);
flush_counts_ptr flush_result() const;

product_store_const_ptr make_child_for(std::size_t const data_cell_number,
products new_products)
{
return make_child(data_cell_number, std::move(new_products));
}
data_cell_counts_ptr flush_result() const;
product_store_const_ptr make_child(std::size_t i, products new_products);

private:
product_store_const_ptr make_child(std::size_t i, products new_products);
product_store_ptr parent_;
algorithm_name node_name_;
// References declared_unfold::child_layer_, which outlives this short-lived object.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
std::string const& child_layer_name_;
std::map<data_cell_index::hash_type, std::size_t> child_counts_;
std::size_t child_layer_hash_;
std::size_t child_counts_ = 0;
};

class PHLEX_CORE_EXPORT declared_unfold : public products_consumer {
Expand All @@ -66,10 +63,10 @@ namespace phlex::experimental {
~declared_unfold() override;

virtual tbb::flow::sender<message>& output_port() = 0;
virtual tbb::flow::sender<data_cell_index_ptr>& output_index_port() = 0;
virtual tbb::flow::sender<index_message>& output_index_port() = 0;
virtual tbb::flow::sender<index_flush>& flush_sender() = 0;
virtual product_specifications const& output() const = 0;
virtual std::size_t product_count() const = 0;
virtual flusher_t& flusher() = 0;

std::string const& child_layer() const noexcept { return child_layer_; }

Expand Down Expand Up @@ -109,19 +106,14 @@ namespace phlex::experimental {
unfold_{g,
concurrency,
[this, p = std::move(predicate), ufold = std::move(unfold)](
messages_t<num_inputs> const& messages, auto&) {
messages_t<num_inputs> const& messages, auto& outputs) {
auto const& msg = most_derived(messages);
auto const& store = msg.store;

std::size_t const original_message_id{msg_counter_};
generator g{store, this->full_name(), child_layer()};
call(p, ufold, store->index(), g, messages, std::make_index_sequence<num_inputs>{});

flusher_.try_put({.index = store->index(),
.counts = g.flush_result(),
.original_id = original_message_id});
}},
flusher_{g}
std::get<2>(outputs).try_put({.index = store->index(), .counts = g.flush_result()});
}}
{
if constexpr (num_inputs > 1ull) {
make_edge(join_, unfold_);
Expand All @@ -142,12 +134,15 @@ namespace phlex::experimental {
{
return tbb::flow::output_port<0>(unfold_);
}
tbb::flow::sender<data_cell_index_ptr>& output_index_port() override
tbb::flow::sender<index_message>& output_index_port() override
{
return tbb::flow::output_port<1>(unfold_);
}
tbb::flow::sender<index_flush>& flush_sender() override
{
return tbb::flow::output_port<2>(unfold_);
}
product_specifications const& output() const override { return output_; }
flusher_t& flusher() override { return flusher_; }

template <std::size_t... Is>
void call(Predicate const& predicate,
Expand Down Expand Up @@ -181,10 +176,10 @@ namespace phlex::experimental {
}
++product_count_;

auto child = g.make_child_for(counter++, std::move(new_products));
tbb::flow::output_port<0>(unfold_).try_put(
{.store = child, .id = msg_counter_.fetch_add(1)});
tbb::flow::output_port<1>(unfold_).try_put(child->index());
auto child = g.make_child(counter++, std::move(new_products));
auto const msg_id = msg_counter_.fetch_add(1);
tbb::flow::output_port<0>(unfold_).try_put({.store = child, .id = msg_id});
tbb::flow::output_port<1>(unfold_).try_put({.index = child->index(), .msg_id = msg_id});
}
}

Expand All @@ -195,9 +190,9 @@ namespace phlex::experimental {
input_retriever_types<input_args> input_{input_arguments<input_args>()};
product_specifications output_;
join_or_none_t<num_inputs> join_;
tbb::flow::multifunction_node<messages_t<num_inputs>, std::tuple<message, data_cell_index_ptr>>
tbb::flow::multifunction_node<messages_t<num_inputs>,
std::tuple<message, index_message, index_flush>>
unfold_;
flusher_t flusher_;
std::atomic<std::size_t> msg_counter_{}; // Is this sufficient? Probably not.
std::atomic<std::size_t> calls_{};
std::atomic<std::size_t> product_count_{};
Expand Down
26 changes: 12 additions & 14 deletions phlex/core/edge_maker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ namespace phlex::experimental {
edge_maker(Args&... args);

template <typename... Args>
void operator()(tbb::flow::graph& g,
index_router& multi,
std::map<std::string, filter>& filters,
declared_outputs& outputs,
declared_providers& providers,
Args&... consumers);
std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
operator()(std::map<std::string, filter>& filters,
declared_outputs& outputs,
declared_providers& providers,
Args&... consumers);

private:
template <typename T>
Expand Down Expand Up @@ -100,12 +99,11 @@ namespace phlex::experimental {
}

template <typename... Args>
void edge_maker::operator()(tbb::flow::graph& g,
index_router& multi,
std::map<std::string, filter>& filters,
declared_outputs& outputs,
declared_providers& providers,
Args&... consumers)
std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
edge_maker::operator()(std::map<std::string, filter>& filters,
declared_outputs& outputs,
declared_providers& providers,
Args&... consumers)
{
// Create edges to outputs
for (auto const& [output_name, output_node] : outputs) {
Expand All @@ -126,15 +124,15 @@ namespace phlex::experimental {

if (head_ports.empty()) {
// This can happen for jobs that only execute the driver, which is helpful for debugging
return;
return {};
}

auto provider_input_ports = make_provider_edges(std::move(head_ports), providers);

std::map<std::string, named_index_ports> multilayer_join_index_ports;
(multilayer_join_index_ports.merge(multilayer_ports(consumers)), ...);

multi.finalize(g, std::move(provider_input_ports), std::move(multilayer_join_index_ports));
return std::make_tuple(std::move(provider_input_ports), std::move(multilayer_join_index_ports));
}
}

Expand Down
105 changes: 105 additions & 0 deletions phlex/core/flush_counter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#include "phlex/core/flush_counter.hpp"

#include "spdlog/spdlog.h"

#include <functional>
#include <mutex>
#include <ranges>
#include <utility>

namespace phlex::experimental {

flush_counter::flush_counter(data_cell_index_ptr index, std::size_t expected_flush_count) :
index_{std::move(index)}, expected_flush_count_{expected_flush_count}
{
}

std::size_t flush_counter::expected_total_count() const
{
return std::ranges::fold_left(expected_counts_ | std::views::values, 0uz, std::plus{});
}

std::size_t flush_counter::processed_total_count() const
{
return std::ranges::fold_left(counts_ | std::views::values, 0uz, std::plus{});
}

std::size_t flush_counter::committed_total_count() const
{
return std::ranges::fold_left(committed_counts_ | std::views::values, 0uz, std::plus{});
}

std::size_t flush_counter::committed_count_for_layer(
data_cell_index::hash_type const layer_hash) const
{
return committed_counts_.count(layer_hash);
}

void flush_counter::update_committed_counts(data_cell_counts const& committed_counts)
{
for (auto const& [layer_hash, count] : committed_counts) {
committed_counts_.add_to(layer_hash, count);
}
}

void flush_counter::update_expected_counts(data_cell_counts const& expected_counts)
{
for (auto const& [layer_hash, count] : expected_counts) {
expected_counts_.add_to(layer_hash, count);
}
// The release store makes all add_to writes above visible to any thread that
// subsequently loads received_flush_count_ with memory_order_acquire.
received_flush_count_.fetch_add(1, std::memory_order_release);
}

void flush_counter::send_flush()
{
if (flush_callback_) {
flush_callback_(shared_from_this());
} else {
spdlog::warn("No flush callback set for index: {}", index_->to_string());
}
}

bool flush_counter::done()
{
// The acquire load pairs with the release store in update_expected_counts, ensuring that once we
// observe the full received count we also see all accumulated expected_counts_ writes.
auto const received = received_flush_count_.load(std::memory_order_acquire);
if (received == 0) {
return false;
}
// When expected_flush_count_ > 0, we know exactly how many unfolds will send a flush
// message for this index. Block until all of them have arrived so that expected_counts_
// reflects the union of all child layers, not just those from the first unfold to finish.
if (expected_flush_count_ > 0 and received < expected_flush_count_) {
return false;
}

// All expected flush messages have arrived; check that processed child counts match.
bool const result = std::ranges::all_of(expected_counts_, [this](auto const& entry) {
auto const& [layer_hash, expected] = entry;
return counts_.count(layer_hash) == expected.load();
});

if (result) {
std::call_once(commit_once_, [this] { commit(); });
}

return result;
}

void flush_counter::commit()
{
for (auto const& [layer_hash, count] : counts_) {
committed_counts_.add_to(layer_hash, count.load());
}

// Clear processed counts to free memory. expected_counts_ is intentionally kept:
// after commit, counts_ is empty while expected_counts_ is not, so any subsequent
// call to done() (which is guarded by call_once but may be reached concurrently)
// will find the all_of check failing and return false rather than re-triggering commit.
counts_ = {};
}

} // namespace phlex::experimental
63 changes: 63 additions & 0 deletions phlex/core/flush_counter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#ifndef PHLEX_CORE_FLUSH_COUNTER_HPP
#define PHLEX_CORE_FLUSH_COUNTER_HPP

#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/data_cell_tracker.hpp"

#include <atomic>
#include <cstddef>
#include <functional>
#include <memory>
#include <mutex>

namespace phlex::experimental {

class flush_counter : public std::enable_shared_from_this<flush_counter> {
using flush_counter_callback_t = std::function<void(std::shared_ptr<flush_counter>)>;

public:
// expected_flush_count controls how many update_expected_counts() calls must arrive before done()
// can return true. The default of 0 means a single call is sufficient (the common case
// when only one unfold consumes this index's layer). A value greater than 1 means that
// multiple unfolds produce children from the same parent layer.
explicit flush_counter(data_cell_index_ptr index, std::size_t expected_flush_count);

data_cell_index_ptr const index() const { return index_; }
std::size_t expected_total_count() const;
std::size_t processed_total_count() const;
std::size_t committed_total_count() const;
std::size_t committed_count_for_layer(data_cell_index::hash_type layer_hash) const;
data_cell_counts const& committed_counts() const { return committed_counts_; }

// Merges expected_counts into the accumulated expected counts. Each call represents one
// flush message arriving (e.g. one unfold completing for this index).
void update_expected_counts(data_cell_counts const& expected_counts);
void update_committed_counts(data_cell_counts const& committed_counts);
void increment(data_cell_index::hash_type const layer_hash) { counts_.increment(layer_hash); }

void set_flush_callback(flush_counter_callback_t callback)
{
flush_callback_ = std::move(callback);
}
void send_flush();
bool done();

private:
void commit();

data_cell_index_ptr const index_;
std::once_flag commit_once_;
data_cell_counts committed_counts_;
data_cell_counts counts_;
// Accumulated expected child counts from all unfold flush messages.
data_cell_counts expected_counts_;
std::atomic<std::size_t> received_flush_count_{0};
// Number of flush messages expected from unfolds. Zero means source-driven: any single
// update_expected_counts call is sufficient to unblock done().
std::size_t expected_flush_count_{0};
flush_counter_callback_t flush_callback_;
};

} // namespace phlex::experimental

#endif // PHLEX_CORE_FLUSH_COUNTER_HPP
Loading
Loading