diff --git a/CMakeLists.txt b/CMakeLists.txt index 2bb3cea4..01da7568 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,7 +22,7 @@ find_package(Boost COMPONENTS iostreams unit_test_framework REQUIRED) daq_protobuf_codegen( opmon/*.proto ) ############################################################################## -daq_add_library( TriggerInhibitAgent.cpp TriggerRecordBuilderData.cpp TPBundleHandler.cpp +daq_add_library( TriggerInhibitAgent.cpp TriggerRecordBuilderData.cpp TPBundleHandler.cpp DFOCore.cpp LINK_LIBRARIES opmonlib::opmonlib ers::ers HighFive appfwk::appfwk logging::logging stdc++fs dfmessages::dfmessages utilities::utilities trigger::trigger detdataformats::detdataformats trgdataformats::trgdataformats) @@ -31,7 +31,8 @@ daq_add_plugin( HDF5DataStore duneDataStore LINK_LIBRARIES dfmodules hdf5lib daq_add_plugin( FragmentAggregatorModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( DataWriterModule duneDAQModule LINK_LIBRARIES dfmodules hdf5libs::hdf5libs iomanager::iomanager ) -daq_add_plugin( DFOModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) +daq_add_plugin( DFOModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) +daq_add_plugin( DFOConsensusModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( TRBModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( TRMonRequestorModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager ) daq_add_plugin( FakeDataProdModule duneDAQModule LINK_LIBRARIES dfmodules iomanager::iomanager) @@ -46,6 +47,11 @@ add_dependencies( HDF5Write_test dfmodules_HDF5DataStore_duneDataStore ) daq_add_unit_test( DFOModule_test LINK_LIBRARIES dfmodules ) add_dependencies( DFOModule_test dfmodules_DFOModule_duneDAQModule) +daq_add_unit_test( DFOCore_test LINK_LIBRARIES dfmodules ) + +daq_add_unit_test( DFOConsensusModule_test LINK_LIBRARIES dfmodules ) +add_dependencies( DFOConsensusModule_test dfmodules_DFOConsensusModule_duneDAQModule) + daq_add_unit_test( TriggerRecordBuilderData_test LINK_LIBRARIES dfmodules) daq_add_unit_test( DataStoreFactory_test LINK_LIBRARIES dfmodules) diff --git a/include/dfmodules/DFODecision.hpp b/include/dfmodules/DFODecision.hpp new file mode 100644 index 00000000..48f8918f --- /dev/null +++ b/include/dfmodules/DFODecision.hpp @@ -0,0 +1,63 @@ +/** + * @file DFODecision.hpp DFODecision message used by DFOConsensusModule. + * + * A DFODecision is broadcast by the responsible DFOConsensusModule to all + * peer DFOs after each TRBModule state change (trigger assignment or + * completion). This allows every DFO in the ensemble to maintain an accurate + * per-TRBModule slot-usage view and issue correct TriggerInhibit messages to + * the MLT. + * + * Fields + * ------ + * run_number – run in which this decision was made + * trigger_number – the TriggerDecision that triggered the state change + * trb_connection_name– connection-ID of the TRBModule whose slot count changed + * trb_slot_count – absolute slot count for that TRB *after* the change + * source_dfo_name – name() of the DFO that generated this message + * is_completion – false = new assignment; true = TRB completed a trigger + * + * Serialisation + * ------------- + * nlohmann/json (to_json / from_json) is provided via + * NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE. For in-process Queue transport this is + * not required; for network (ZMQ / kSendRecv) connections production + * deployments should additionally register a msgpack serialiser in dfmessages. + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef DFMODULES_INCLUDE_DFMODULES_DFODECISION_HPP_ +#define DFMODULES_INCLUDE_DFMODULES_DFODECISION_HPP_ + +#include "daqdataformats/Types.hpp" +#include "nlohmann/json.hpp" + +#include + +namespace dunedaq { +namespace dfmodules { + +struct DFODecision +{ + daqdataformats::run_number_t run_number{ 0 }; + daqdataformats::trigger_number_t trigger_number{ 0 }; + std::string trb_connection_name; ///< Connection-ID of the TRBModule involved + size_t trb_slot_count{ 0 }; ///< TRB slot count after this event + std::string source_dfo_name; ///< Name of the DFO that generated this message + bool is_completion{ false }; ///< true = completion; false = new assignment +}; + +NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(DFODecision, + run_number, + trigger_number, + trb_connection_name, + trb_slot_count, + source_dfo_name, + is_completion) + +} // namespace dfmodules +} // namespace dunedaq + +#endif // DFMODULES_INCLUDE_DFMODULES_DFODECISION_HPP_ diff --git a/plugins/DFOConsensusModule.cpp b/plugins/DFOConsensusModule.cpp new file mode 100644 index 00000000..f6688e1d --- /dev/null +++ b/plugins/DFOConsensusModule.cpp @@ -0,0 +1,665 @@ +/** + * @file DFOConsensusModule.cpp DFOConsensusModule class implementation + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "DFOConsensusModule.hpp" +#include "dfmodules/CommonIssues.hpp" + +#include "dfmodules/opmon/DFOModule.pb.h" + +#include "appmodel/DFOModule.hpp" +#include "confmodel/Connection.hpp" +#include "iomanager/IOManager.hpp" +#include "logging/Logging.hpp" + +#include "trgdataformats/TriggerCandidateData.hpp" + +#include +#include +#include +#include +#include +#include + +/** + * @brief Name used by TRACE TLOG calls from this source file + */ +#define TRACE_NAME "DFOConsensusModule" // NOLINT +enum +{ + TLVL_ENTER_EXIT_METHODS = 5, + TLVL_PEER_ANNOUNCE = 6, + TLVL_PARTITION = 7, + TLVL_TD_FILTER = 10, + TLVL_DFO_DECISION = 11, + TLVL_WATCHDOG = 12 +}; + +namespace dunedaq::dfmodules { + +DFOConsensusModule::DFOConsensusModule(const std::string& name) + : dunedaq::appfwk::DAQModule(name) + , m_core(std::make_unique(name)) +{ + register_command("conf", &DFOConsensusModule::do_conf); + register_command("start", &DFOConsensusModule::do_start); + register_command("drain_dataflow", &DFOConsensusModule::do_stop); + register_command("scrap", &DFOConsensusModule::do_scrap); +} + +void +DFOConsensusModule::init(std::shared_ptr mcfg) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering init() method"; + + auto mdal = mcfg->get_dal(get_name()); + if (!mdal) { + throw appfwk::CommandFailed(ERS_HERE, "init", get_name(), "Unable to retrieve configuration object"); + } + auto iom = iomanager::IOManager::get(); + + for (auto con : mdal->get_inputs()) { + if (con->get_data_type() == datatype_to_string()) { + m_token_connection = con->UID(); + } + if (con->get_data_type() == datatype_to_string()) { + m_td_connection = con->UID(); + } + if (con->get_data_type() == datatype_to_string()) { + m_dfo_decision_input_connection = con->UID(); + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Found DFODecision input connection: " << con->UID(); + } + } + for (auto con : mdal->get_outputs()) { + if (con->get_data_type() == datatype_to_string()) { + m_busy_sender = iom->get_sender(con->UID()); + } + if (con->get_data_type() == datatype_to_string()) { + m_trb_conn_ids.push_back(con->UID()); + } + // Peer DFO output connections carry TriggerDecisionToken messages. + if (con->get_data_type() == datatype_to_string()) { + m_dfo_peer_output_connections.push_back(con->UID()); + TLOG_DEBUG(TLVL_PEER_ANNOUNCE) << get_name() << ": Found peer DFO output connection: " << con->UID(); + } + if (con->get_data_type() == datatype_to_string()) { + m_dfo_decision_output_connections.push_back(con->UID()); + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Found DFODecision output connection: " << con->UID(); + } + } + + if (m_token_connection.empty()) { + throw appfwk::MissingConnection( + ERS_HERE, get_name(), datatype_to_string(), "input"); + } + if (m_td_connection.empty()) { + throw appfwk::MissingConnection( + ERS_HERE, get_name(), datatype_to_string(), "input"); + } + if (m_busy_sender == nullptr) { + throw appfwk::MissingConnection( + ERS_HERE, get_name(), datatype_to_string(), "output"); + } + + m_dfo_conf = mdal->get_configuration(); + m_expected_peers = m_dfo_peer_output_connections.size(); + + // Verify that receivers exist (fetches connection details eagerly) + iom->get_receiver(m_token_connection); + iom->get_receiver(m_td_connection); + + TLOG() << get_name() << ": DFOConsensusModule initialized with " << m_expected_peers << " expected peer DFO(s)" + << " and " << m_dfo_decision_output_connections.size() << " DFODecision output connection(s)"; + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting init() method"; +} + +void +DFOConsensusModule::do_conf(const CommandData_t&) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method"; + + m_core->configure(m_dfo_conf->get_busy_threshold(), + m_dfo_conf->get_free_threshold(), + m_dfo_conf->get_td_send_retries(), + std::chrono::milliseconds(m_dfo_conf->get_general_queue_timeout_ms()), + std::chrono::milliseconds(m_dfo_conf->get_stop_timeout_ms())); + + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method, there are " + << m_core->num_trb_apps() << " TRB apps defined"; +} + +void +DFOConsensusModule::do_start(const CommandData_t& payload) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; + + // Reset peer state from any previous run. + { + std::lock_guard guard(m_peers_mutex); + m_registered_peers.clear(); + } + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts.clear(); + } + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.clear(); + } + + auto run_number = payload.value("run", 0); + + auto iom = iomanager::IOManager::get(); + if (m_busy_sender != nullptr) { + bool is_ready = m_busy_sender->is_ready_for_sending(std::chrono::milliseconds(100)); + TLOG_DEBUG(0) << "The sender for TriggerInhibit messages " << (is_ready ? "is" : "is not") << " ready."; + } + for (auto& trb_conn : m_trb_conn_ids) { + auto sender = iom->get_sender(trb_conn); + if (sender != nullptr) { + bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100)); + TLOG_DEBUG(0) << "The TriggerDecision sender for " << trb_conn << " " + << (is_ready ? "is" : "is not") << " ready."; + } + } + + m_core->start(run_number, + m_busy_sender, + [iom](const std::string& conn) { + return iom->get_sender(conn); + }, + [this](const std::string& name, std::shared_ptr trbd) { + register_node(name, trbd); + }, + [this](const std::shared_ptr& atd, size_t slot_count) { + on_assignment(atd, slot_count); + }, + [this](const std::string& trb_conn, + daqdataformats::trigger_number_t tn, + size_t slot_count) { on_completion(trb_conn, tn, slot_count); }, + [this]() { return is_globally_busy(); }); + + iom->add_callback( + m_token_connection, std::bind(&DFOConsensusModule::on_token, this, std::placeholders::_1)); + + iom->add_callback( + m_td_connection, std::bind(&DFOConsensusModule::on_trigger_decision, this, std::placeholders::_1)); + + if (!m_dfo_decision_input_connection.empty()) { + iom->add_callback( + m_dfo_decision_input_connection, + std::bind(&DFOConsensusModule::on_dfo_decision, this, std::placeholders::_1)); + } + + // Broadcast our identity to all peer DFOs. + send_peer_announcement(); + + // Wait until all expected peers have responded (or the timeout expires). + if (m_expected_peers > 0) { + std::unique_lock lock(m_peers_mutex); + bool all_peers_ready = m_peers_cv.wait_for(lock, s_peer_announce_timeout, [this] { + return m_registered_peers.size() >= m_expected_peers; + }); + if (!all_peers_ready) { + ers::warning(DFOConsensusPeerTimeout( + ERS_HERE, get_name(), m_expected_peers, m_registered_peers.size())); + } + } + + // Determine our partition index from the complete peer set. + compute_partition(); + ers::info(DFOConsensusPartitionInfo(ERS_HERE, get_name(), m_own_index.load(), m_num_dfos.load())); + + // Start the watchdog thread unconditionally so that late-arriving peer + // announcements (i.e. peers that announce after do_start() completes) are + // correctly handled. In standalone mode (single DFO) the watchdog is + // effectively a no-op: every TD is removed from m_pending_tds by + // on_assignment before the 2-second timeout expires, and any that aren't + // are guarded by the "own trigger" check inside watchdog_thread_func. + m_watchdog_running.store(true); + m_watchdog_thread = std::thread(&DFOConsensusModule::watchdog_thread_func, this); + + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method"; +} + +void +DFOConsensusModule::do_stop(const CommandData_t& /*args*/) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; + + // Stop the watchdog thread before halting DFOCore so the watchdog does not + // try to dispatch TDs while DFOCore is draining. + m_watchdog_running.store(false); + if (m_watchdog_thread.joinable()) { + m_watchdog_thread.join(); + } + + m_core->stop(); + + auto iom = iomanager::IOManager::get(); + iom->remove_callback(m_td_connection); + + auto remnants = m_core->flush(); + + iom->remove_callback(m_token_connection); + if (!m_dfo_decision_input_connection.empty()) { + iom->remove_callback(m_dfo_decision_input_connection); + } + + for (auto& r : remnants) { + ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_core->run_number())); + } + + // Clear runtime state. + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.clear(); + } + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts.clear(); + } + + // Reset to standalone mode so a subsequent start is clean. + m_own_index.store(0); + m_num_dfos.store(1); + + TLOG() << get_name() << " successfully stopped"; + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method"; +} + +void +DFOConsensusModule::do_scrap(const CommandData_t& /*args*/) +{ + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method"; + + m_core->scrap(); + + TLOG() << get_name() << " successfully scrapped"; + TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method"; +} + +void +DFOConsensusModule::generate_opmon_data() +{ + auto snap = m_core->take_opmon_snapshot(); + + opmon::DFOInfo info; + info.set_tokens_received(snap.tokens_received); + info.set_decisions_sent(snap.decisions_sent); + info.set_decisions_received(snap.decisions_received); + info.set_waiting_for_decision(snap.waiting_for_decision); + info.set_deciding_destination(snap.deciding_destination); + info.set_forwarding_decision(snap.forwarding_decision); + info.set_waiting_for_token(snap.waiting_for_token); + info.set_processing_token(snap.processing_token); + publish(std::move(info)); + + std::lock_guard guard(m_core->get_trigger_counters_mutex()); + for (auto& [type, counts] : m_core->get_trigger_counters()) { + opmon::TriggerInfo ti; + ti.set_received(counts.received.exchange(0)); + ti.set_completed(counts.completed.exchange(0)); + auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type]; + publish(std::move(ti), { { "type", name } }); + } +} + +// --------------------------------------------------------------------------- +// Peer-announcement helpers +// --------------------------------------------------------------------------- + +void +DFOConsensusModule::send_peer_announcement() +{ + if (m_dfo_peer_output_connections.empty()) + return; + + dfmessages::TriggerDecisionToken announcement; + announcement.run_number = 0; + announcement.trigger_number = s_peer_announce_magic; + announcement.decision_destination = get_name(); + + auto iom = iomanager::IOManager::get(); + for (const auto& conn : m_dfo_peer_output_connections) { + try { + auto announcement_copy = announcement; + iom->get_sender(conn)->send( + std::move(announcement_copy), m_core->queue_timeout()); + TLOG_DEBUG(TLVL_PEER_ANNOUNCE) << get_name() << ": Sent peer announcement to " << conn; + } catch (const ers::Issue& excpt) { + ers::warning(excpt); + } + } +} + +void +DFOConsensusModule::compute_partition() +{ + std::vector ensemble; + { + std::lock_guard guard(m_peers_mutex); + ensemble.push_back(get_name()); + for (const auto& peer : m_registered_peers) { + ensemble.push_back(peer); + } + } + + // Sort names alphabetically to obtain a deterministic, agreed-upon order. + std::sort(ensemble.begin(), ensemble.end()); + + auto it = std::find(ensemble.begin(), ensemble.end(), get_name()); + size_t own_index = (it != ensemble.end()) ? static_cast(std::distance(ensemble.begin(), it)) : 0; + + m_own_index.store(own_index); + m_num_dfos.store(ensemble.size()); + + TLOG_DEBUG(TLVL_PARTITION) << get_name() << ": Partition computed: index=" << own_index << " of " + << ensemble.size() << " DFO(s)"; +} + +// --------------------------------------------------------------------------- +// IOManager callbacks +// --------------------------------------------------------------------------- + +void +DFOConsensusModule::on_token(const dfmessages::TriggerDecisionToken& token) +{ + // A token with run_number==0 and trigger_number==s_peer_announce_magic is a + // DFO peer-announcement rather than a TRB completion token. + if (token.run_number == 0 && token.trigger_number == s_peer_announce_magic) { + TLOG_DEBUG(TLVL_PEER_ANNOUNCE) << get_name() << ": Received peer announcement from " + << token.decision_destination; + bool newly_registered = false; + { + std::lock_guard guard(m_peers_mutex); + auto [it, inserted] = m_registered_peers.insert(token.decision_destination); + newly_registered = inserted; + } + m_peers_cv.notify_all(); + + // If this is a genuinely new peer (e.g., a late joiner), recompute the + // partition so the ensemble stays consistent. + if (newly_registered) { + compute_partition(); + ers::info(DFOConsensusPartitionInfo(ERS_HERE, get_name(), m_own_index.load(), m_num_dfos.load())); + } + return; + } + + // All other tokens are regular TRB completion tokens. + m_core->receive_token(token); +} + +void +DFOConsensusModule::on_trigger_decision(const dfmessages::TriggerDecision& decision) +{ + // Buffer the TD for potential failover monitoring – all DFOs do this. + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds[decision.trigger_number] = { decision, std::chrono::steady_clock::now() }; + } + + size_t num_dfos = m_num_dfos.load(); + size_t own_index = m_own_index.load(); + + // In multi-DFO mode, only the responsible DFO processes the decision. + if (num_dfos > 1 && (decision.trigger_number % num_dfos) != own_index) { + TLOG_DEBUG(TLVL_TD_FILTER) << get_name() << ": Buffered trigger_number " << decision.trigger_number + << " awaiting DFODecision from partition " + << (decision.trigger_number % num_dfos); + return; + } + + // Process via DFOCore; on_assignment callback will broadcast the DFODecision + // and remove the entry from m_pending_tds. + m_core->receive_trigger_decision(decision); +} + +void +DFOConsensusModule::on_dfo_decision(const DFODecision& msg) +{ + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Received DFODecision from " << msg.source_dfo_name + << " trigger=" << msg.trigger_number << " trb=" << msg.trb_connection_name + << " slots=" << msg.trb_slot_count + << " completion=" << std::boolalpha << msg.is_completion; + + // Update shadow slot count for the reporting DFO's TRB. + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts[msg.source_dfo_name][msg.trb_connection_name] = msg.trb_slot_count; + } + + // Remove the pending TD entry now that we know it was handled. + if (!msg.is_completion) { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.erase(msg.trigger_number); + } + + // Recalculate and potentially update the inhibit signal. + m_core->notify_trigger_if_needed(); +} + +// --------------------------------------------------------------------------- +// DFODecision broadcasting +// --------------------------------------------------------------------------- + +void +DFOConsensusModule::broadcast_dfo_decision(daqdataformats::trigger_number_t trigger_number, + const std::string& trb_conn, + size_t trb_slot_count, + bool is_completion) +{ + if (m_dfo_decision_output_connections.empty()) + return; + + DFODecision msg; + msg.run_number = m_core->run_number(); + msg.trigger_number = trigger_number; + msg.trb_connection_name = trb_conn; + msg.trb_slot_count = trb_slot_count; + msg.source_dfo_name = get_name(); + msg.is_completion = is_completion; + + auto iom = iomanager::IOManager::get(); + for (const auto& conn : m_dfo_decision_output_connections) { + try { + auto msg_copy = msg; + iom->get_sender(conn)->send(std::move(msg_copy), m_core->queue_timeout()); + TLOG_DEBUG(TLVL_DFO_DECISION) << get_name() << ": Sent DFODecision to " << conn + << " trigger=" << trigger_number << " trb=" << trb_conn + << " slots=" << trb_slot_count + << " completion=" << std::boolalpha << is_completion; + } catch (const ers::Issue& excpt) { + ers::warning(excpt); + } + } +} + +void +DFOConsensusModule::on_assignment(const std::shared_ptr& atd, + size_t trb_slot_count) +{ + // Broadcast to peer DFOs so they can update their shadow state. + broadcast_dfo_decision(atd->decision.trigger_number, atd->connection_name, trb_slot_count, false); + + // Remove from the pending-TD buffer – this DFO has handled it. + { + std::lock_guard guard(m_pending_tds_mutex); + m_pending_tds.erase(atd->decision.trigger_number); + } +} + +void +DFOConsensusModule::on_completion(const std::string& trb_conn, + daqdataformats::trigger_number_t trigger_number, + size_t trb_slot_count) +{ + // Broadcast to peer DFOs so they can update their shadow state. + broadcast_dfo_decision(trigger_number, trb_conn, trb_slot_count, true); +} + +// --------------------------------------------------------------------------- +// Global busy check +// --------------------------------------------------------------------------- + +bool +DFOConsensusModule::is_globally_busy() const +{ + if (m_core->num_trb_apps() == 0) + return true; // No TRBs known yet – treat as busy. + + // Build aggregate slot count per TRB: own (from DFOCore) + peer (from shadow map). + // If *any* TRB has available capacity the system is not globally busy. + // Busy threshold comes from TriggerRecordBuilderData; we use DFOCore's own + // is_busy() as the baseline for own slots and add peer slots on top. + + // Collect peer totals per TRB connection. + std::map peer_totals; + { + std::lock_guard guard(m_remote_slots_mutex); + for (const auto& [dfo_name, trb_map] : m_remote_slot_counts) { + for (const auto& [trb_conn, count] : trb_map) { + peer_totals[trb_conn] += count; + } + } + } + + // Delegate to DFOCore, which knows the own slots and the busy_threshold per TRB. + // We add peer_totals to each TRB's own count via DFOCore::is_globally_busy(). + return m_core->is_globally_busy(peer_totals); +} + +// --------------------------------------------------------------------------- +// Watchdog / failover +// --------------------------------------------------------------------------- + +void +DFOConsensusModule::watchdog_thread_func() +{ + TLOG_DEBUG(TLVL_WATCHDOG) << get_name() << ": Watchdog thread started"; + + while (m_watchdog_running.load()) { + std::this_thread::sleep_for(s_watchdog_interval); + + if (!m_watchdog_running.load()) + break; + + auto now = std::chrono::steady_clock::now(); + std::vector> timed_out; + + { + std::lock_guard guard(m_pending_tds_mutex); + for (const auto& [tn, ptd] : m_pending_tds) { + if ((now - ptd.received_at) >= s_dfo_decision_timeout) { + timed_out.emplace_back(tn, ptd); + } + } + } + + for (auto& [tn, ptd] : timed_out) { + // Snapshot both atomics together to avoid observing a partially-updated + // partition (compute_partition() updates them sequentially). + size_t num_dfos = m_num_dfos.load(); + size_t own_index = m_own_index.load(); + size_t responsible_index = (num_dfos > 1) ? (tn % num_dfos) : 0; + + // Do not trigger failover if WE were responsible (should not happen, but + // guard against it to avoid self-removal). + if (responsible_index == own_index) { + TLOG_DEBUG(TLVL_WATCHDOG) << get_name() << ": Watchdog: own trigger " << tn + << " still pending – TRBs may be saturated"; + continue; + } + + handle_peer_failure(responsible_index, tn); + } + } + + TLOG_DEBUG(TLVL_WATCHDOG) << get_name() << ": Watchdog thread stopped"; +} + +void +DFOConsensusModule::handle_peer_failure(size_t failed_index, + daqdataformats::trigger_number_t trigger_number) +{ + // Identify the failed DFO name from the sorted ensemble. + std::string failed_dfo_name; + { + std::lock_guard guard(m_peers_mutex); + + std::vector ensemble; + ensemble.push_back(get_name()); + for (const auto& peer : m_registered_peers) { + ensemble.push_back(peer); + } + std::sort(ensemble.begin(), ensemble.end()); + + if (failed_index >= ensemble.size()) { + TLOG_DEBUG(TLVL_WATCHDOG) << get_name() + << ": handle_peer_failure: stale failed_index=" << failed_index + << " (ensemble size=" << ensemble.size() + << "); partition already updated – skipping failover for trigger " + << trigger_number; + return; // Stale; partition has already changed. + } + + failed_dfo_name = ensemble[failed_index]; + if (failed_dfo_name == get_name()) { + TLOG_DEBUG(TLVL_WATCHDOG) << get_name() + << ": handle_peer_failure: resolved failed_index=" << failed_index + << " to self – ignoring (trigger=" << trigger_number << ")"; + return; // Should not happen. + } + + m_registered_peers.erase(failed_dfo_name); + } + + ers::warning(DFOConsensusFailover(ERS_HERE, get_name(), failed_dfo_name, trigger_number)); + + // Recompute partition without the failed DFO. + compute_partition(); + ers::info(DFOConsensusPartitionInfo(ERS_HERE, get_name(), m_own_index.load(), m_num_dfos.load())); + + // Also clear shadow slot counts from the failed DFO. + { + std::lock_guard guard(m_remote_slots_mutex); + m_remote_slot_counts.erase(failed_dfo_name); + } + + // Re-assign all timed-out TDs that now belong to this DFO under the new partition. + // Snapshot both atomics together so the loop sees a consistent partition. + std::vector to_reassign; + { + std::lock_guard guard(m_pending_tds_mutex); + std::vector handled; + size_t new_num = m_num_dfos.load(); + size_t new_own = m_own_index.load(); + for (auto& [tn, ptd] : m_pending_tds) { + size_t new_owner = (new_num > 1) ? (tn % new_num) : 0; + if (new_owner == new_own) { + to_reassign.push_back(ptd.decision); + handled.push_back(tn); + } + } + for (auto tn : handled) { + m_pending_tds.erase(tn); + } + } + + for (const auto& decision : to_reassign) { + TLOG_DEBUG(TLVL_WATCHDOG) << get_name() << ": Failover: reassigning trigger_number " + << decision.trigger_number; + m_core->receive_trigger_decision(decision); + // on_assignment callback will broadcast DFODecision. + } +} + +} // namespace dunedaq::dfmodules + +DEFINE_DUNE_DAQ_MODULE(dunedaq::dfmodules::DFOConsensusModule) + diff --git a/plugins/DFOConsensusModule.hpp b/plugins/DFOConsensusModule.hpp new file mode 100644 index 00000000..7339e2f5 --- /dev/null +++ b/plugins/DFOConsensusModule.hpp @@ -0,0 +1,285 @@ +/** + * @file DFOConsensusModule.hpp + * + * DFOConsensusModule implements a consensus algorithm that allows multiple DFO + * instances to run concurrently without assigning the same TriggerDecision + * more than once, while each instance maintains full knowledge of the global + * TRBModule slot state so it can issue accurate TriggerInhibit messages to the + * MLT. + * + * Partition assignment + * -------------------- + * Each DFO discovers its peers by exchanging announcement tokens at start-up. + * Peers are sorted alphabetically; the DFO's index equals its position in that + * sorted list. A TriggerDecision is handled by the DFO whose index satisfies + * trigger_number % num_dfos == own_index. + * + * DFODecision propagation + * ----------------------- + * After the responsible DFO assigns a TriggerDecision to a TRBModule it + * broadcasts a DFODecision message to all peer DFOs. A DFODecision is also + * sent when the TRBModule completes the trigger (token received). Peers use + * these messages to update their shadow view of each TRB's slot count. + * + * The inhibit signal is asserted when ALL TRBModules in the system are busy, + * considering both own and peer assignments. + * + * Failover + * -------- + * Every DFO buffers incoming TriggerDecisions with a reception timestamp. If + * the responsible DFO does not broadcast a DFODecision within + * s_dfo_decision_timeout, surviving DFOs exclude it from the ensemble, + * recompute the partition, and the DFO that now owns the trigger_number + * re-assigns and broadcasts a DFODecision. + * + * A DFO with zero peer output connections operates as a standalone DFO, + * identical to DFOModule. + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef DFMODULES_PLUGINS_DFOCONSENSUSMODULE_HPP_ +#define DFMODULES_PLUGINS_DFOCONSENSUSMODULE_HPP_ + +#include "dfmodules/DFOCore.hpp" +#include "dfmodules/DFODecision.hpp" + +#include "appmodel/DFOConf.hpp" +#include "dfmessages/TriggerDecisionToken.hpp" +#include "dfmessages/TriggerInhibit.hpp" +#include "iomanager/Sender.hpp" + +#include "appfwk/DAQModule.hpp" +#include "logging/Logging.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace dunedaq { + +// Disable coverage checking LCOV_EXCL_START +ERS_DECLARE_ISSUE(dfmodules, + DFOConsensusPeerTimeout, + "DFOConsensusModule " << module_name << ": Timed out waiting for " << expected_peers + << " peer(s) to announce; received " << received_peers + << ". Continuing with the peers that responded.", + ((std::string)module_name)((size_t)expected_peers)((size_t)received_peers)) + +ERS_DECLARE_ISSUE(dfmodules, + DFOConsensusPartitionInfo, + "DFOConsensusModule " << module_name << ": Partition index " << own_index << " of " << num_dfos + << " DFO(s) in the ensemble.", + ((std::string)module_name)((size_t)own_index)((size_t)num_dfos)) + +ERS_DECLARE_ISSUE(dfmodules, + DFOConsensusFailover, + "DFOConsensusModule " << module_name << ": DFO peer " << failed_dfo + << " timed out for trigger_number " << trigger_number + << ". Removing from ensemble and reassigning.", + ((std::string)module_name)((std::string)failed_dfo)((uint32_t)trigger_number)) +// Re-enable coverage checking LCOV_EXCL_STOP + +namespace dfmodules { + +/** + * @brief DFOConsensusModule distributes triggers across multiple DFO instances + * with global TRBModule state visibility via DFODecision messages and + * automatic failover when a peer DFO stops responding. + */ +class DFOConsensusModule : public dunedaq::appfwk::DAQModule +{ +public: + /** + * @brief DFOConsensusModule Constructor + * @param name Instance name for this DFOConsensusModule instance + */ + explicit DFOConsensusModule(const std::string& name); + + DFOConsensusModule(const DFOConsensusModule&) = delete; ///< Not copy-constructible + DFOConsensusModule& operator=(const DFOConsensusModule&) = delete; ///< Not copy-assignable + DFOConsensusModule(DFOConsensusModule&&) = delete; ///< Not move-constructible + DFOConsensusModule& operator=(DFOConsensusModule&&) = delete; ///< Not move-assignable + + void init(std::shared_ptr mcfg) override; + + /** + * @brief Magic trigger_number value in a TriggerDecisionToken that identifies + * a DFO peer-announcement message rather than a normal completion token. + */ + static constexpr daqdataformats::trigger_number_t s_peer_announce_magic = + std::numeric_limits::max(); + + /** + * @brief Timeout after which, if no DFODecision has been received for an + * accepted TriggerDecision, the responsible peer is considered failed + * and the partition is recomputed. + */ + static constexpr std::chrono::milliseconds s_dfo_decision_timeout{ 2000 }; + +private: + // Commands + void do_conf(const CommandData_t&); + void do_start(const CommandData_t&); + void do_stop(const CommandData_t&); + void do_scrap(const CommandData_t&); + + void generate_opmon_data() override; + + // -------------------------------------------------------------------------- + // Peer-announcement helpers + // -------------------------------------------------------------------------- + + /// Send this DFO's peer-announcement token to all configured peer outputs. + void send_peer_announcement(); + + /// (Re-)compute partition index and ensemble size from the current set of + /// known peer names. Must be called with m_peers_mutex NOT held. + void compute_partition(); + + // -------------------------------------------------------------------------- + // IOManager callbacks + // -------------------------------------------------------------------------- + + /// Token callback: intercepts peer-announcement tokens; passes everything + /// else to DFOCore::receive_token(). + void on_token(const dfmessages::TriggerDecisionToken& token); + + /// TD callback: buffers the TD for failover; if responsible, delegates to + /// DFOCore::receive_trigger_decision(). + void on_trigger_decision(const dfmessages::TriggerDecision& decision); + + /// DFODecision callback: updates shadow TRB slot counts and removes the + /// corresponding entry from the pending-TD buffer. + void on_dfo_decision(const DFODecision& msg); + + // -------------------------------------------------------------------------- + // DFODecision broadcasting + // -------------------------------------------------------------------------- + + /// Broadcast a DFODecision to all peer DFOs. + void broadcast_dfo_decision(daqdataformats::trigger_number_t trigger_number, + const std::string& trb_conn, + size_t trb_slot_count, + bool is_completion); + + /// Callback registered with DFOCore: called after each successful assignment. + void on_assignment(const std::shared_ptr& atd, size_t trb_slot_count); + + /// Callback registered with DFOCore: called after each token completion. + void on_completion(const std::string& trb_conn, + daqdataformats::trigger_number_t trigger_number, + size_t trb_slot_count); + + // -------------------------------------------------------------------------- + // Global busy check + // -------------------------------------------------------------------------- + + /// Returns true if ALL TRBModules in the system are busy, considering both + /// own slot counts (from DFOCore) and remote slot counts from peer DFOs. + bool is_globally_busy() const; + + // -------------------------------------------------------------------------- + // Watchdog / failover + // -------------------------------------------------------------------------- + + /// Watchdog thread body: periodically scans the pending-TD buffer and + /// triggers failover when a responsible peer misses its DFODecision deadline. + void watchdog_thread_func(); + + /// Remove the DFO at ensemble index @p failed_index from m_registered_peers, + /// recompute the partition, and reassign any pending TDs now owned by this DFO. + /// Must be called with m_peers_mutex NOT held. + void handle_peer_failure(size_t failed_index, + daqdataformats::trigger_number_t trigger_number); + + // -------------------------------------------------------------------------- + // Core DFO processing logic (common with DFOModule) + // -------------------------------------------------------------------------- + std::unique_ptr m_core; + + // -------------------------------------------------------------------------- + // Configuration + // -------------------------------------------------------------------------- + const appmodel::DFOConf* m_dfo_conf{ nullptr }; + + // -------------------------------------------------------------------------- + // Connections (initialised in init(), used at start()) + // -------------------------------------------------------------------------- + std::shared_ptr> m_busy_sender; + std::string m_token_connection; + std::string m_td_connection; + std::vector m_trb_conn_ids; + + /// Peer DFO output connections for sending peer-announcement tokens. + std::vector m_dfo_peer_output_connections; + size_t m_expected_peers{ 0 }; + + /// DFODecision output connections (one per peer DFO, for state propagation). + std::vector m_dfo_decision_output_connections; + /// DFODecision input connection (receives decisions from all peer DFOs). + std::string m_dfo_decision_input_connection; + + // -------------------------------------------------------------------------- + // Peer-announcement state (protected by m_peers_mutex) + // -------------------------------------------------------------------------- + std::set m_registered_peers; + mutable std::mutex m_peers_mutex; + std::condition_variable m_peers_cv; + + // -------------------------------------------------------------------------- + // Partition information (updated atomically) + // -------------------------------------------------------------------------- + std::atomic m_own_index{ 0 }; + std::atomic m_num_dfos{ 1 }; + + // -------------------------------------------------------------------------- + // Remote TRB slot shadow state + // remote_slot_counts[source_dfo][trb_conn] = absolute slot count from that DFO + // Protected by m_remote_slots_mutex. + // -------------------------------------------------------------------------- + std::map> m_remote_slot_counts; + mutable std::mutex m_remote_slots_mutex; + + // -------------------------------------------------------------------------- + // Pending-TD buffer for failover + // Holds every incoming TriggerDecision until a matching DFODecision is + // received (or the DFO is stopped). + // Protected by m_pending_tds_mutex. + // -------------------------------------------------------------------------- + struct PendingTD + { + dfmessages::TriggerDecision decision; + std::chrono::steady_clock::time_point received_at; + }; + std::map m_pending_tds; + mutable std::mutex m_pending_tds_mutex; + + // -------------------------------------------------------------------------- + // Watchdog thread + // -------------------------------------------------------------------------- + std::thread m_watchdog_thread; + std::atomic m_watchdog_running{ false }; + + // -------------------------------------------------------------------------- + // Timing constants + // -------------------------------------------------------------------------- + static constexpr std::chrono::milliseconds s_peer_announce_timeout{ 500 }; + static constexpr std::chrono::milliseconds s_watchdog_interval{ 100 }; +}; + +} // namespace dfmodules +} // namespace dunedaq + +#endif // DFMODULES_PLUGINS_DFOCONSENSUSMODULE_HPP_ + diff --git a/plugins/DFOModule.cpp b/plugins/DFOModule.cpp index 6c66e37e..957d7f61 100644 --- a/plugins/DFOModule.cpp +++ b/plugins/DFOModule.cpp @@ -16,16 +16,10 @@ #include "iomanager/IOManager.hpp" #include "logging/Logging.hpp" -#include -#include -#include -#include -#include -#include +#include "trgdataformats/TriggerCandidateData.hpp" + #include #include -#include -#include #include /** @@ -34,21 +28,14 @@ #define TRACE_NAME "DFOModule" // NOLINT enum { - TLVL_ENTER_EXIT_METHODS = 5, - TLVL_CONFIG = 7, - TLVL_WORK_STEPS = 10, - TLVL_TRIGDEC_RECEIVED = 21, - TLVL_NOTIFY_TRIGGER = 22, - TLVL_DISPATCH_TO_TRB = 23, - TLVL_TDTOKEN_RECEIVED = 24 + TLVL_ENTER_EXIT_METHODS = 5 }; namespace dunedaq::dfmodules { DFOModule::DFOModule(const std::string& name) : dunedaq::appfwk::DAQModule(name) - , m_queue_timeout(100) - , m_run_number(0) + , m_core(std::make_unique(name)) { register_command("conf", &DFOModule::do_conf); register_command("start", &DFOModule::do_start); @@ -84,20 +71,21 @@ DFOModule::init(std::shared_ptr mcfg) } } - if (m_token_connection == "") { + if (m_token_connection.empty()) { throw appfwk::MissingConnection( ERS_HERE, get_name(), datatype_to_string(), "input"); } - if (m_td_connection == "") { - throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string(), "input"); + if (m_td_connection.empty()) { + throw appfwk::MissingConnection( + ERS_HERE, get_name(), datatype_to_string(), "input"); } if (m_busy_sender == nullptr) { - throw appfwk::MissingConnection(ERS_HERE, get_name(), datatype_to_string(), "output"); - + throw appfwk::MissingConnection( + ERS_HERE, get_name(), datatype_to_string(), "output"); } m_dfo_conf = mdal->get_configuration(); - // these are just tests to check if the connections are ok + // Verify that receivers exist (fetches connection details eagerly) iom->get_receiver(m_token_connection); iom->get_receiver(m_td_connection); @@ -109,15 +97,14 @@ DFOModule::do_conf(const CommandData_t&) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_conf() method"; - m_queue_timeout = std::chrono::milliseconds(m_dfo_conf->get_general_queue_timeout_ms()); - m_stop_timeout = std::chrono::milliseconds(m_dfo_conf->get_stop_timeout_ms()); - m_busy_threshold = m_dfo_conf->get_busy_threshold(); - m_free_threshold = m_dfo_conf->get_free_threshold(); - - m_td_send_retries = m_dfo_conf->get_td_send_retries(); + m_core->configure(m_dfo_conf->get_busy_threshold(), + m_dfo_conf->get_free_threshold(), + m_dfo_conf->get_td_send_retries(), + std::chrono::milliseconds(m_dfo_conf->get_general_queue_timeout_ms()), + std::chrono::milliseconds(m_dfo_conf->get_stop_timeout_ms())); TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_conf() method, there are " - << m_dataflow_availability.size() << " TRB apps defined"; + << m_core->num_trb_apps() << " TRB apps defined"; } void @@ -125,38 +112,39 @@ DFOModule::do_start(const CommandData_t& payload) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_start() method"; - m_received_tokens = 0; - m_run_number = payload.value("run", 0); - - m_running_status.store(true); - m_last_notified_busy.store(false); - m_last_assignement_it = m_dataflow_availability.end(); - - m_last_token_received = m_last_td_received = std::chrono::steady_clock::now(); + auto run_number = payload.value("run", 0); - // 19-Dec-2024, KAB: check that TriggerDecision senders are ready to send. This is done - // so that the IOManager infrastructure fetches the necessary connection details from - // the ConnectivityService at 'start' time, instead of the first time that the sender - // is used to send a message. This avoids delays in the sending of the first TD in - // the first data-taking run in a DAQ session. Such delays can lead to undesirable - // system behavior like trigger inhibits. + // 19-Dec-2024, KAB: check that TriggerDecision senders are ready to send at + // start time so the ConnectivityService lookup happens here rather than at + // the first send, avoiding delays that can cause spurious trigger inhibits. auto iom = iomanager::IOManager::get(); if (m_busy_sender != nullptr) { bool is_ready = m_busy_sender->is_ready_for_sending(std::chrono::milliseconds(100)); TLOG_DEBUG(0) << "The sender for TriggerInhibit messages " << (is_ready ? "is" : "is not") << " ready."; } - for (auto trb_conn : m_trb_conn_ids) { + for (auto& trb_conn : m_trb_conn_ids) { auto sender = iom->get_sender(trb_conn); if (sender != nullptr) { bool is_ready = sender->is_ready_for_sending(std::chrono::milliseconds(100)); - TLOG_DEBUG(0) << "The TriggerDecision sender for " << trb_conn << " " << (is_ready ? "is" : "is not") << " ready."; + TLOG_DEBUG(0) << "The TriggerDecision sender for " << trb_conn << " " + << (is_ready ? "is" : "is not") << " ready."; } } + + m_core->start(run_number, + m_busy_sender, + [iom](const std::string& conn) { + return iom->get_sender(conn); + }, + [this](const std::string& name, std::shared_ptr trbd) { + register_node(name, trbd); + }); + iom->add_callback( - m_token_connection, std::bind(&DFOModule::receive_trigger_complete_token, this, std::placeholders::_1)); + m_token_connection, std::bind(&DFOCore::receive_token, m_core.get(), std::placeholders::_1)); iom->add_callback( - m_td_connection, std::bind(&DFOModule::receive_trigger_decision, this, std::placeholders::_1)); + m_td_connection, std::bind(&DFOCore::receive_trigger_decision, m_core.get(), std::placeholders::_1)); TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_start() method"; } @@ -166,37 +154,19 @@ DFOModule::do_stop(const CommandData_t& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_stop() method"; - m_running_status.store(false); + m_core->stop(); auto iom = iomanager::IOManager::get(); iom->remove_callback(m_td_connection); - const int wait_steps = 20; - auto step_timeout = m_stop_timeout / wait_steps; - int step_counter = 0; - while (!is_empty() && step_counter < wait_steps) { - TLOG() << get_name() << ": stop delayed while waiting for " << used_slots() << " TDs to completed"; - std::this_thread::sleep_for(step_timeout); - ++step_counter; - } + auto remnants = m_core->flush(); iom->remove_callback(m_token_connection); - std::list> remnants; - for (auto& app : m_dataflow_availability) { - auto temp = app.second->flush(); - for (auto& td : temp) { - remnants.push_back(td); - } - } - for (auto& r : remnants) { - ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_run_number)); + ers::error(IncompleteTriggerDecision(ERS_HERE, r->decision.trigger_number, m_core->run_number())); } - std::lock_guard guard(m_trigger_counters_mutex); - m_trigger_counters.clear(); - TLOG() << get_name() << " successfully stopped"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_stop() method"; } @@ -206,328 +176,36 @@ DFOModule::do_scrap(const CommandData_t& /*args*/) { TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering do_scrap() method"; - m_dataflow_availability.clear(); + m_core->scrap(); TLOG() << get_name() << " successfully scrapped"; TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting do_scrap() method"; } void -DFOModule::receive_trigger_decision(const dfmessages::TriggerDecision& decision) -{ - TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Received TriggerDecision for trigger_number " - << decision.trigger_number << " and run " << decision.run_number - << " (current run is " << m_run_number << ")"; - if (decision.run_number != m_run_number) { - ers::error(DFOModuleRunNumberMismatch( - ERS_HERE, decision.run_number, m_run_number, "MLT", decision.trigger_number)); - return; - } - - auto decision_received = std::chrono::steady_clock::now(); - ++m_received_decisions; - auto trigger_types = unpack_types(decision.trigger_type); - for ( const auto t : trigger_types ) { - ++get_trigger_counter(t).received; - } - - std::chrono::steady_clock::time_point decision_assigned; - do { - - auto assignment = find_slot(decision); - - if (assignment == nullptr) { // this can happen if all application are in error state - ers::error(UnableToAssign(ERS_HERE, decision.trigger_number)); - usleep(500); - notify_trigger_if_needed(); - continue; - } - - TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Slot found for trigger_number " << decision.trigger_number - << " on connection " << assignment->connection_name - << ", number of used slots is " << used_slots(); - decision_assigned = std::chrono::steady_clock::now(); - auto dispatch_successful = dispatch(assignment); - - if (dispatch_successful) { - assign_trigger_decision(assignment); - TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << get_name() << " Assigned trigger_number " << decision.trigger_number - << " to connection " << assignment->connection_name; - break; - } else { - ers::error( - TRBModuleAppUpdate(ERS_HERE, assignment->connection_name, "Could not send Trigger Decision")); - m_dataflow_availability[assignment->connection_name]->set_in_error(true); - } - - } while (m_running_status.load()); - - notify_trigger_if_needed(); - - m_waiting_for_decision += - std::chrono::duration_cast(decision_received - m_last_td_received).count(); - m_last_td_received = std::chrono::steady_clock::now(); - m_deciding_destination += - std::chrono::duration_cast(decision_assigned - decision_received).count(); - m_forwarding_decision += - std::chrono::duration_cast(m_last_td_received - decision_assigned).count(); -} - -std::shared_ptr -DFOModule::find_slot(const dfmessages::TriggerDecision& decision) -{ - - // this find_slot assings the decision with a round-robin logic - // across all the available applications. - // Applications in error are skipped. - // we only probe the applications once. - // if they are all unavailable the assignment is set to - // the application with the lowest used slots - // returning a nullptr will be considered as an error - // from the upper level code - - std::shared_ptr output = nullptr; - auto minimum_occupied = m_dataflow_availability.end(); - size_t minimum = std::numeric_limits::max(); - unsigned int counter = 0; - - auto candidate_it = m_last_assignement_it; - if (candidate_it == m_dataflow_availability.end()) - candidate_it = m_dataflow_availability.begin(); - - while (output == nullptr && counter < m_dataflow_availability.size()) { - - ++counter; - ++candidate_it; - if (candidate_it == m_dataflow_availability.end()) - candidate_it = m_dataflow_availability.begin(); - - // get rid of the applications in error state - if (candidate_it->second->is_in_error()) { - continue; - } - - // monitor - auto slots = candidate_it->second->used_slots(); - if (slots < minimum) { - minimum = slots; - minimum_occupied = candidate_it; - } - - if (candidate_it->second->is_busy()) - continue; - - output = candidate_it->second->make_assignment(decision); - m_last_assignement_it = candidate_it; - } - - if (!output) { - // in this case all applications were busy - // so we assign the decision to that with the lowest - // number of assignments - if (minimum_occupied != m_dataflow_availability.end()) { - output = minimum_occupied->second->make_assignment(decision); - m_last_assignement_it = minimum_occupied; - ers::warning(AssignedToBusyApp(ERS_HERE, decision.trigger_number, minimum_occupied->first, minimum)); - } - } - - if (output != nullptr) { - TLOG_DEBUG(TLVL_WORK_STEPS) << "Assigned TriggerDecision with trigger number " << decision.trigger_number - << " to TRB at connection " << output->connection_name; - } - return output; -} - -void -DFOModule::generate_opmon_data() +DFOModule::generate_opmon_data() { + auto snap = m_core->take_opmon_snapshot(); opmon::DFOInfo info; - info.set_tokens_received( m_received_tokens.exchange(0) ); - info.set_decisions_sent(m_sent_decisions.exchange(0)); - info.set_decisions_received(m_received_decisions.exchange(0)); - info.set_waiting_for_decision(m_waiting_for_decision.exchange(0)); - info.set_deciding_destination(m_deciding_destination.exchange(0)); - info.set_forwarding_decision(m_forwarding_decision.exchange(0)); - info.set_waiting_for_token(m_waiting_for_token.exchange(0)); - info.set_processing_token(m_processing_token.exchange(0)); - publish( std::move(info) ); - - std::lock_guard guard(m_trigger_counters_mutex); - for ( auto & [type, counts] : m_trigger_counters ) { + info.set_tokens_received(snap.tokens_received); + info.set_decisions_sent(snap.decisions_sent); + info.set_decisions_received(snap.decisions_received); + info.set_waiting_for_decision(snap.waiting_for_decision); + info.set_deciding_destination(snap.deciding_destination); + info.set_forwarding_decision(snap.forwarding_decision); + info.set_waiting_for_token(snap.waiting_for_token); + info.set_processing_token(snap.processing_token); + publish(std::move(info)); + + std::lock_guard guard(m_core->get_trigger_counters_mutex()); + for (auto& [type, counts] : m_core->get_trigger_counters()) { opmon::TriggerInfo ti; ti.set_received(counts.received.exchange(0)); ti.set_completed(counts.completed.exchange(0)); auto name = dunedaq::trgdataformats::get_trigger_candidate_type_names()[type]; - publish( std::move(ti), {{"type", name}} ); - } -} - -void -DFOModule::receive_trigger_complete_token(const dfmessages::TriggerDecisionToken& token) -{ - if (token.run_number == 0 && token.trigger_number == 0) { - if (m_dataflow_availability.count(token.decision_destination) == 0) { - TLOG_DEBUG(TLVL_CONFIG) << "Creating dataflow availability struct for uid " << token.decision_destination; - auto entry = m_dataflow_availability[token.decision_destination] = - std::make_shared(token.decision_destination, m_busy_threshold, m_free_threshold); - register_node(token.decision_destination, entry); - } else { - TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected"); - auto app_it = m_dataflow_availability.find(token.decision_destination); - app_it->second->set_in_error(false); - } - return; - } - - TLOG_DEBUG(TLVL_TDTOKEN_RECEIVED) << get_name() << " Received TriggerDecisionToken for trigger_number " - << token.trigger_number << " and run " << token.run_number - << " (current run is " << m_run_number << ")"; - // add a check to see if the application data found - if (token.run_number != m_run_number) { - std::ostringstream oss_source; - oss_source << "TRB at connection " << token.decision_destination; - ers::error(DFOModuleRunNumberMismatch( - ERS_HERE, token.run_number, m_run_number, oss_source.str(), token.trigger_number)); - return; - } - - auto app_it = m_dataflow_availability.find(token.decision_destination); - // check if application data exists; - if (app_it == m_dataflow_availability.end()) { - ers::error(UnknownTokenSource(ERS_HERE, token.decision_destination)); - return; - } - - ++m_received_tokens; - auto callback_start = std::chrono::steady_clock::now(); - - try { - auto dec_ptr = app_it->second->complete_assignment(token.trigger_number, m_metadata_function); - auto trigger_types = unpack_types(dec_ptr->decision.trigger_type); - for ( const auto t : trigger_types ) ++ get_trigger_counter(t).completed; - } catch (AssignedTriggerDecisionNotFound const& err) { - ers::error(err); - } - - if (app_it->second->is_in_error()) { - TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected"); - app_it->second->set_in_error(false); + publish(std::move(ti), { { "type", name } }); } - - notify_trigger_if_needed(); - - m_waiting_for_token += - std::chrono::duration_cast(callback_start - m_last_token_received).count(); - m_last_token_received = std::chrono::steady_clock::now(); - m_processing_token += - std::chrono::duration_cast(m_last_token_received - callback_start).count(); -} - -bool -DFOModule::is_busy() const -{ - for (auto& dfapp : m_dataflow_availability) { - if (!dfapp.second->is_busy()) - return false; - } - return true; -} - -bool -DFOModule::is_empty() const -{ - for (auto& dfapp : m_dataflow_availability) { - if (dfapp.second->used_slots() != 0) - return false; - } - return true; -} - -size_t -DFOModule::used_slots() const -{ - size_t total = 0; - for (auto& dfapp : m_dataflow_availability) { - total += dfapp.second->used_slots(); - } - return total; -} - -void -DFOModule::notify_trigger_if_needed() const -{ - // 19-Dec-2024, KAB, ELF, MaR: combined the is_busy() and notify_trigger() calls in - // a single method (notify_trigger_if_needed), and protected the contents of the new - // method with a mutex, to avoid a race condition in which a given is_busy() result - // is determined, but by the time that the value is sent to the MLT, the busy state - // has changed. - std::lock_guard guard(m_notify_trigger_mutex); - - bool busy = is_busy(); - if (busy == m_last_notified_busy.load()) - return; - - bool wasSentSuccessfully = false; - - do { - try { - dfmessages::TriggerInhibit message{ busy, m_run_number }; - m_busy_sender->send(std::move(message), m_queue_timeout); - wasSentSuccessfully = true; - TLOG_DEBUG(TLVL_NOTIFY_TRIGGER) << get_name() << " Sent BUSY status " << busy << " to trigger in run " - << m_run_number; - } catch (const ers::Issue& excpt) { - std::ostringstream oss_warn; - oss_warn << "Send with sender \"" << m_busy_sender->get_name() << "\" failed"; - ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt)); - } - - } while (!wasSentSuccessfully && m_running_status.load()); - - m_last_notified_busy.store(busy); -} - -bool -DFOModule::dispatch(const std::shared_ptr& assignment) -{ - - TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Entering dispatch() method. assignment->connection_name: " - << assignment->connection_name; - - bool wasSentSuccessfully = false; - int retries = m_td_send_retries; - auto iom = iomanager::IOManager::get(); - do { - - try { - auto decision_copy = dfmessages::TriggerDecision(assignment->decision); - iom->get_sender(assignment->connection_name) - ->send(std::move(decision_copy), m_queue_timeout); - wasSentSuccessfully = true; - ++m_sent_decisions; - TLOG_DEBUG(TLVL_DISPATCH_TO_TRB) << get_name() << " Sent TriggerDecision for trigger_number " - << decision_copy.trigger_number << " to TRB at connection " - << assignment->connection_name << " for run number " << decision_copy.run_number; - } catch (const ers::Issue& excpt) { - std::ostringstream oss_warn; - oss_warn << "Send to connection \"" << assignment->connection_name << "\" failed"; - ers::warning(iomanager::OperationFailed(ERS_HERE, oss_warn.str(), excpt)); - } - - retries--; - - } while (!wasSentSuccessfully && m_running_status.load() && retries > 0); - - TLOG_DEBUG(TLVL_ENTER_EXIT_METHODS) << get_name() << ": Exiting dispatch() method"; - return wasSentSuccessfully; -} - -void -DFOModule::assign_trigger_decision(const std::shared_ptr& assignment) -{ - m_dataflow_availability[assignment->connection_name]->add_assignment(assignment); } } // namespace dunedaq::dfmodules diff --git a/plugins/DFOModule.hpp b/plugins/DFOModule.hpp index ad280b6c..0f208004 100644 --- a/plugins/DFOModule.hpp +++ b/plugins/DFOModule.hpp @@ -9,67 +9,26 @@ #ifndef DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_ #define DFMODULES_PLUGINS_DATAFLOWORCHESTRATOR_HPP_ -#include "dfmodules/TriggerRecordBuilderData.hpp" +#include "dfmodules/DFOCore.hpp" #include "appmodel/DFOConf.hpp" - -#include "daqdataformats/TriggerRecord.hpp" -#include "dfmessages/DataRequest.hpp" -#include "dfmessages/TriggerDecision.hpp" #include "dfmessages/TriggerDecisionToken.hpp" #include "dfmessages/TriggerInhibit.hpp" -#include "trgdataformats/TriggerCandidateData.hpp" - #include "iomanager/Sender.hpp" #include "appfwk/DAQModule.hpp" -#include "logging/Logging.hpp" // NOTE: if ISSUES ARE DECLARED BEFORE include logging/Logging.hpp, TLOG_DEBUG< #include #include -#include #include -#include namespace dunedaq { - -// Disable coverage checking LCOV_EXCL_START -ERS_DECLARE_ISSUE(dfmodules, - TRBModuleAppUpdate, - "TRBModule app " << connection_name << ": " << message, - ((std::string)connection_name)((std::string)message)) -ERS_DECLARE_ISSUE(dfmodules, - UnknownTokenSource, - "Token from unknown source: " << connection_name, - ((std::string)connection_name)) -ERS_DECLARE_ISSUE(dfmodules, - DFOModuleRunNumberMismatch, - "DFOModule encountered run number mismatch: recvd (" - << received_run_number << ") != " << run_number << " from " << src_app << " for trigger_number " - << trig_num, - ((uint32_t)received_run_number)((uint32_t)run_number)((std::string)src_app)( - (uint32_t)trig_num)) // NOLINT(build/unsigned) -ERS_DECLARE_ISSUE(dfmodules, - IncompleteTriggerDecision, - "TriggerDecision " << trigger_number << " didn't complete within timeout in run " << run_number, - ((uint32_t)trigger_number)((uint32_t)run_number)) // NOLINT(build/unsigned) -ERS_DECLARE_ISSUE(dfmodules, - UnableToAssign, - "TriggerDecision " << trigger_number << " could not be assigned", - ((uint32_t)trigger_number)) // NOLINT(build/unsigned) -ERS_DECLARE_ISSUE(dfmodules, - AssignedToBusyApp, - "TriggerDecision " << trigger_number << " was assigned to DF app " << app << " that was busy with " - << used_slots << " TDs", - ((uint32_t)trigger_number)((std::string)app)((size_t)used_slots)) // NOLINT(build/unsigned) -// Re-enable coverage checking LCOV_EXCL_STOP - namespace dfmodules { /** * @brief DFOModule distributes triggers according to the - * availability of the DF apps in the system + * availability of the DF apps in the system. */ class DFOModule : public dunedaq::appfwk::DAQModule { @@ -80,25 +39,15 @@ class DFOModule : public dunedaq::appfwk::DAQModule */ explicit DFOModule(const std::string& name); - DFOModule(const DFOModule&) = delete; ///< DFOModule is not copy-constructible - DFOModule& operator=(const DFOModule&) = - delete; ///< DFOModule is not copy-assignable - DFOModule(DFOModule&&) = delete; ///< DFOModule is not move-constructible - DFOModule& operator=(DFOModule&&) = delete; ///< DFOModule is not move-assignable + DFOModule(const DFOModule&) = delete; ///< DFOModule is not copy-constructible + DFOModule& operator=(const DFOModule&) = delete; ///< DFOModule is not copy-assignable + DFOModule(DFOModule&&) = delete; ///< DFOModule is not move-constructible + DFOModule& operator=(DFOModule&&) = delete; ///< DFOModule is not move-assignable void init(std::shared_ptr mcfg) override; -protected: - virtual std::shared_ptr find_slot(const dfmessages::TriggerDecision& decision); - // find_slot operates on a round-robin logic - - using trbd_ptr_t = std::shared_ptr; - using data_structure_t = std::map; - data_structure_t m_dataflow_availability; - data_structure_t::iterator m_last_assignement_it; - std::function m_metadata_function; - private: + std::unique_ptr m_core; // Commands void do_conf(const CommandData_t&); void do_start(const CommandData_t&); @@ -107,74 +56,16 @@ class DFOModule : public dunedaq::appfwk::DAQModule void generate_opmon_data() override; - virtual void receive_trigger_complete_token(const dfmessages::TriggerDecisionToken&); - void receive_trigger_decision(const dfmessages::TriggerDecision&); - virtual bool is_busy() const; - bool is_empty() const; - size_t used_slots() const; - void notify_trigger_if_needed() const; - bool dispatch(const std::shared_ptr& assignment); - virtual void assign_trigger_decision(const std::shared_ptr& assignment); - // Configuration - const appmodel::DFOConf* m_dfo_conf; - std::chrono::milliseconds m_queue_timeout; - std::chrono::microseconds m_stop_timeout; - dunedaq::daqdataformats::run_number_t m_run_number; + const appmodel::DFOConf* m_dfo_conf{ nullptr }; - // Connections + // Connections (initialised in init(), passed to core at start()) std::shared_ptr> m_busy_sender; std::string m_token_connection; std::string m_td_connection; - size_t m_td_send_retries; - size_t m_busy_threshold; - size_t m_free_threshold; std::vector m_trb_conn_ids; - - // Coordination - std::atomic m_running_status{ false }; - mutable std::atomic m_last_notified_busy{ false }; - std::chrono::steady_clock::time_point m_last_token_received; - std::chrono::steady_clock::time_point m_last_td_received; - mutable std::mutex m_notify_trigger_mutex; - - // Struct for statistic - struct TriggerData { - std::atomic received{0}; - std::atomic completed{0}; - }; - static std::set - unpack_types( decltype(dfmessages::TriggerDecision::trigger_type) t) { - std::set results; - if (t == dfmessages::TypeDefaults::s_invalid_trigger_type) - return results; - const std::bitset<64> bits(t); - for( size_t i = 0; i < bits.size(); ++i ) { - if ( bits[i] ) results.insert((trgdataformats::TriggerCandidateData::Type)i); - } - return results; - } - - // Statistics - std::atomic m_received_tokens{ 0 }; // NOLINT (build/unsigned) - std::atomic m_sent_decisions{ 0 }; // NOLINT (build/unsigned) - std::atomic m_received_decisions{ 0 }; // NOLINT (build/unsigned) - std::atomic m_waiting_for_decision{ 0 }; // NOLINT (build/unsigned) - std::atomic m_deciding_destination{ 0 }; // NOLINT (build/unsigned) - std::atomic m_forwarding_decision{ 0 }; // NOLINT (build/unsigned) - std::atomic m_waiting_for_token{ 0 }; // NOLINT (build/unsigned) - std::atomic m_processing_token{ 0 }; // NOLINT (build/unsigned) - std::map m_trigger_counters; - std::mutex m_trigger_counters_mutex; // used to safely handle the map above - TriggerData & get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) { - auto it = m_trigger_counters.find(type); - if (it != m_trigger_counters.end()) return it->second; - - std::lock_guard guard(m_trigger_counters_mutex); - return m_trigger_counters[type]; - } - }; + } // namespace dfmodules } // namespace dunedaq diff --git a/src/DFOCore.cpp b/src/DFOCore.cpp new file mode 100644 index 00000000..789596be --- /dev/null +++ b/src/DFOCore.cpp @@ -0,0 +1,455 @@ +/** + * @file DFOCore.cpp DFOCore class implementation. + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "dfmodules/DFOCore.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief Name used by TRACE TLOG calls from this source file + */ +#define TRACE_NAME "DFOCore" // NOLINT +enum +{ + TLVL_CONFIG = 7, + TLVL_WORK_STEPS = 10, + TLVL_TRIGDEC_RECEIVED = 21, + TLVL_NOTIFY_TRIGGER = 22, + TLVL_DISPATCH_TO_TRB = 23, + TLVL_TDTOKEN_RECEIVED = 24 +}; + +namespace dunedaq::dfmodules { + +DFOCore::DFOCore(std::string owner_name) + : m_owner_name(std::move(owner_name)) + , m_last_assignment_it(m_dataflow_availability.end()) +{} + +void +DFOCore::configure(size_t busy_threshold, + size_t free_threshold, + size_t td_send_retries, + std::chrono::milliseconds queue_timeout, + std::chrono::microseconds stop_timeout) +{ + m_busy_threshold = busy_threshold; + m_free_threshold = free_threshold; + m_td_send_retries = td_send_retries; + m_queue_timeout = queue_timeout; + m_stop_timeout = stop_timeout; +} + +void +DFOCore::start(daqdataformats::run_number_t run_number, + std::shared_ptr> busy_sender, + td_sender_fn_t get_td_sender_fn, + new_trb_fn_t on_new_trb_fn, + on_assignment_fn_t on_assignment_fn, + on_completion_fn_t on_completion_fn, + is_busy_fn_t is_busy_fn) +{ + m_run_number = run_number; + m_busy_sender = std::move(busy_sender); + m_get_td_sender_fn = std::move(get_td_sender_fn); + m_on_new_trb_fn = std::move(on_new_trb_fn); + m_on_assignment_fn = std::move(on_assignment_fn); + m_on_completion_fn = std::move(on_completion_fn); + m_is_busy_fn = std::move(is_busy_fn); + + m_received_tokens.store(0); + m_sent_decisions.store(0); + m_received_decisions.store(0); + m_waiting_for_decision.store(0); + m_deciding_destination.store(0); + m_forwarding_decision.store(0); + m_waiting_for_token.store(0); + m_processing_token.store(0); + + m_running_status.store(true); + m_last_notified_busy.store(false); + m_last_assignment_it = m_dataflow_availability.end(); + m_last_token_received = m_last_td_received = std::chrono::steady_clock::now(); +} + +void +DFOCore::stop() +{ + m_running_status.store(false); +} + +std::list> +DFOCore::flush() +{ + const int wait_steps = 20; + auto step_timeout = m_stop_timeout / wait_steps; + int step_counter = 0; + while (!is_empty() && step_counter < wait_steps) { + TLOG() << m_owner_name << ": stop delayed while waiting for " << used_slots() << " TDs to complete"; + std::this_thread::sleep_for(step_timeout); + ++step_counter; + } + + std::list> remnants; + for (auto& app : m_dataflow_availability) { + for (auto& td : app.second->flush()) { + remnants.push_back(td); + } + } + + std::lock_guard guard(m_trigger_counters_mutex); + m_trigger_counters.clear(); + + return remnants; +} + +void +DFOCore::scrap() +{ + m_dataflow_availability.clear(); + m_get_td_sender_fn = nullptr; + m_on_new_trb_fn = nullptr; + m_on_assignment_fn = nullptr; + m_on_completion_fn = nullptr; + m_is_busy_fn = nullptr; + m_busy_sender.reset(); +} + +void +DFOCore::receive_token(const dfmessages::TriggerDecisionToken& token) +{ + if (token.run_number == 0 && token.trigger_number == 0) { + if (m_dataflow_availability.count(token.decision_destination) == 0) { + TLOG_DEBUG(TLVL_CONFIG) << "Creating dataflow availability struct for uid " << token.decision_destination; + auto entry = m_dataflow_availability[token.decision_destination] = + std::make_shared(token.decision_destination, m_busy_threshold, m_free_threshold); + if (m_on_new_trb_fn) { + m_on_new_trb_fn(token.decision_destination, entry); + } + } else { + TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected"); + m_dataflow_availability[token.decision_destination]->set_in_error(false); + } + return; + } + + TLOG_DEBUG(TLVL_TDTOKEN_RECEIVED) << m_owner_name << " Received TriggerDecisionToken for trigger_number " + << token.trigger_number << " and run " << token.run_number + << " (current run is " << m_run_number << ")"; + + if (token.run_number != m_run_number) { + std::ostringstream oss_source; + oss_source << "TRB at connection " << token.decision_destination; + ers::error(DFOModuleRunNumberMismatch( + ERS_HERE, token.run_number, m_run_number, oss_source.str(), token.trigger_number)); + return; + } + + auto app_it = m_dataflow_availability.find(token.decision_destination); + if (app_it == m_dataflow_availability.end()) { + ers::error(UnknownTokenSource(ERS_HERE, token.decision_destination)); + return; + } + + ++m_received_tokens; + auto callback_start = std::chrono::steady_clock::now(); + + try { + auto dec_ptr = app_it->second->complete_assignment(token.trigger_number, m_metadata_function); + for (const auto t : unpack_types(dec_ptr->decision.trigger_type)) { + ++get_trigger_counter(t).completed; + } + // Notify the owning module that a completion occurred so it can broadcast + // a DFODecision to peer DFOs with the updated slot count. + if (m_on_completion_fn) { + size_t slot_count = app_it->second->used_slots(); + m_on_completion_fn(token.decision_destination, token.trigger_number, slot_count); + } + } catch (AssignedTriggerDecisionNotFound const& err) { + ers::error(err); + } + + if (app_it->second->is_in_error()) { + TLOG() << TRBModuleAppUpdate(ERS_HERE, token.decision_destination, "Has reconnected"); + app_it->second->set_in_error(false); + } + + notify_trigger_if_needed(); + + m_waiting_for_token += + std::chrono::duration_cast(callback_start - m_last_token_received).count(); + m_last_token_received = std::chrono::steady_clock::now(); + m_processing_token += + std::chrono::duration_cast(m_last_token_received - callback_start).count(); +} + +void +DFOCore::receive_trigger_decision(const dfmessages::TriggerDecision& decision) +{ + TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << m_owner_name << " Received TriggerDecision for trigger_number " + << decision.trigger_number << " and run " << decision.run_number + << " (current run is " << m_run_number << ")"; + + if (decision.run_number != m_run_number) { + ers::error(DFOModuleRunNumberMismatch( + ERS_HERE, decision.run_number, m_run_number, "MLT", decision.trigger_number)); + return; + } + + auto decision_received = std::chrono::steady_clock::now(); + ++m_received_decisions; + for (const auto t : unpack_types(decision.trigger_type)) { + ++get_trigger_counter(t).received; + } + + std::chrono::steady_clock::time_point decision_assigned; + do { + auto assignment = find_slot(decision); + + if (assignment == nullptr) { // all applications may be in error state + ers::error(UnableToAssign(ERS_HERE, decision.trigger_number)); + usleep(500); + notify_trigger_if_needed(); + continue; + } + + TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << m_owner_name << " Slot found for trigger_number " << decision.trigger_number + << " on connection " << assignment->connection_name + << ", number of used slots is " << used_slots(); + decision_assigned = std::chrono::steady_clock::now(); + auto dispatch_successful = dispatch(assignment); + + if (dispatch_successful) { + assign_trigger_decision(assignment); + TLOG_DEBUG(TLVL_TRIGDEC_RECEIVED) << m_owner_name << " Assigned trigger_number " << decision.trigger_number + << " to connection " << assignment->connection_name; + break; + } else { + ers::error(TRBModuleAppUpdate(ERS_HERE, assignment->connection_name, "Could not send Trigger Decision")); + m_dataflow_availability[assignment->connection_name]->set_in_error(true); + } + + } while (m_running_status.load()); + + notify_trigger_if_needed(); + + m_waiting_for_decision += + std::chrono::duration_cast(decision_received - m_last_td_received).count(); + m_last_td_received = std::chrono::steady_clock::now(); + m_deciding_destination += + std::chrono::duration_cast(decision_assigned - decision_received).count(); + m_forwarding_decision += + std::chrono::duration_cast(m_last_td_received - decision_assigned).count(); +} + +bool +DFOCore::is_busy() const +{ + for (auto& dfapp : m_dataflow_availability) { + if (!dfapp.second->is_busy()) + return false; + } + return true; +} + +bool +DFOCore::is_globally_busy(const std::map& extra_slots_per_trb) const +{ + if (m_dataflow_availability.empty()) + return true; + + for (const auto& [conn, trbd] : m_dataflow_availability) { + size_t own_slots = trbd->used_slots(); + size_t extra_slots = 0; + auto it = extra_slots_per_trb.find(conn); + if (it != extra_slots_per_trb.end()) { + extra_slots = it->second; + } + // If combined slot count is below the busy threshold, this TRB has + // available capacity → the system is NOT globally busy. + if (own_slots + extra_slots < trbd->busy_threshold()) { + // At least one TRB has remaining capacity: the system as a whole is + // NOT globally busy (global busy requires ALL TRBs to be at capacity). + return false; + } + } + return true; +} + +bool +DFOCore::is_empty() const +{ + for (auto& dfapp : m_dataflow_availability) { + if (dfapp.second->used_slots() != 0) + return false; + } + return true; +} + +size_t +DFOCore::used_slots() const +{ + size_t total = 0; + for (auto& dfapp : m_dataflow_availability) { + total += dfapp.second->used_slots(); + } + return total; +} + +void +DFOCore::notify_trigger_if_needed() +{ + // Combine is_busy() check and send in a single mutex-protected block to + // avoid a race in which the busy state changes between check and send. + std::lock_guard guard(m_notify_trigger_mutex); + + bool busy = m_is_busy_fn ? m_is_busy_fn() : is_busy(); + if (busy == m_last_notified_busy.load()) + return; + + bool wasSentSuccessfully = false; + do { + try { + dfmessages::TriggerInhibit message{ busy, m_run_number }; + m_busy_sender->send(std::move(message), m_queue_timeout); + wasSentSuccessfully = true; + TLOG_DEBUG(TLVL_NOTIFY_TRIGGER) << m_owner_name << " Sent BUSY status " << busy << " to trigger in run " + << m_run_number; + } catch (const ers::Issue& excpt) { + ers::warning(excpt); + } + } while (!wasSentSuccessfully && m_running_status.load()); + + m_last_notified_busy.store(busy); +} + +DFOCore::OpMonSnapshot +DFOCore::take_opmon_snapshot() +{ + return { m_received_tokens.exchange(0), + m_received_decisions.exchange(0), + m_sent_decisions.exchange(0), + m_waiting_for_decision.exchange(0), + m_deciding_destination.exchange(0), + m_forwarding_decision.exchange(0), + m_waiting_for_token.exchange(0), + m_processing_token.exchange(0) }; +} + +std::shared_ptr +DFOCore::find_slot(const dfmessages::TriggerDecision& decision) +{ + // Round-robin across all available apps; apps in error are skipped. + // If all are busy, assign to the one with the fewest used slots. + // Returning nullptr is treated as an error by the caller. + + std::shared_ptr output = nullptr; + auto minimum_occupied = m_dataflow_availability.end(); + size_t minimum = std::numeric_limits::max(); + unsigned int counter = 0; + + auto candidate_it = m_last_assignment_it; + if (candidate_it == m_dataflow_availability.end()) + candidate_it = m_dataflow_availability.begin(); + + while (output == nullptr && counter < m_dataflow_availability.size()) { + ++counter; + ++candidate_it; + if (candidate_it == m_dataflow_availability.end()) + candidate_it = m_dataflow_availability.begin(); + + if (candidate_it->second->is_in_error()) + continue; + + auto slots = candidate_it->second->used_slots(); + if (slots < minimum) { + minimum = slots; + minimum_occupied = candidate_it; + } + + if (candidate_it->second->is_busy()) + continue; + + output = candidate_it->second->make_assignment(decision); + m_last_assignment_it = candidate_it; + } + + if (!output) { + if (minimum_occupied != m_dataflow_availability.end()) { + output = minimum_occupied->second->make_assignment(decision); + m_last_assignment_it = minimum_occupied; + ers::warning(AssignedToBusyApp(ERS_HERE, decision.trigger_number, minimum_occupied->first, minimum)); + } + } + + if (output != nullptr) { + TLOG_DEBUG(TLVL_WORK_STEPS) << "Assigned TriggerDecision with trigger number " << decision.trigger_number + << " to TRB at connection " << output->connection_name; + } + return output; +} + +bool +DFOCore::dispatch(const std::shared_ptr& assignment) +{ + TLOG_DEBUG(5) << m_owner_name << ": Entering dispatch(). assignment->connection_name: " + << assignment->connection_name; + + bool wasSentSuccessfully = false; + int retries = static_cast(m_td_send_retries); + do { + try { + auto decision_copy = dfmessages::TriggerDecision(assignment->decision); + m_get_td_sender_fn(assignment->connection_name)->send(std::move(decision_copy), m_queue_timeout); + wasSentSuccessfully = true; + ++m_sent_decisions; + TLOG_DEBUG(TLVL_DISPATCH_TO_TRB) << m_owner_name << " Sent TriggerDecision for trigger_number " + << assignment->decision.trigger_number << " to TRB at connection " + << assignment->connection_name << " for run number " + << assignment->decision.run_number; + } catch (const ers::Issue& excpt) { + std::ostringstream oss_warn; + oss_warn << "Send to connection \"" << assignment->connection_name << "\" failed"; + ers::warning(excpt); + } + --retries; + } while (!wasSentSuccessfully && m_running_status.load() && retries > 0); + + return wasSentSuccessfully; +} + +void +DFOCore::assign_trigger_decision(const std::shared_ptr& assignment) +{ + m_dataflow_availability[assignment->connection_name]->add_assignment(assignment); + // Notify the owning module so it can broadcast a DFODecision to peer DFOs. + if (m_on_assignment_fn) { + size_t slot_count = m_dataflow_availability[assignment->connection_name]->used_slots(); + m_on_assignment_fn(assignment, slot_count); + } +} + +DFOCore::TriggerData& +DFOCore::get_trigger_counter(trgdataformats::TriggerCandidateData::Type type) +{ + auto it = m_trigger_counters.find(type); + if (it != m_trigger_counters.end()) + return it->second; + + std::lock_guard guard(m_trigger_counters_mutex); + return m_trigger_counters[type]; +} + +} // namespace dunedaq::dfmodules diff --git a/src/dfmodules/DFOCore.hpp b/src/dfmodules/DFOCore.hpp new file mode 100644 index 00000000..9cbb09e4 --- /dev/null +++ b/src/dfmodules/DFOCore.hpp @@ -0,0 +1,288 @@ +/** + * @file DFOCore.hpp Core DFO processing logic shared by DFOModule and + * DFOConsensusModule. + * + * DFOCore encapsulates TRB connection management, trigger-decision dispatching + * and token processing that is common to both DFO module variants. It is used + * via composition (not inheritance) so that each module can inherit directly + * from DAQModule while still sharing the same processing logic. + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#ifndef DFMODULES_SRC_DFMODULES_DFOCORE_HPP_ +#define DFMODULES_SRC_DFMODULES_DFOCORE_HPP_ + +#include "dfmodules/TriggerRecordBuilderData.hpp" + +#include "daqdataformats/Types.hpp" +#include "dfmessages/TriggerDecision.hpp" +#include "dfmessages/TriggerDecisionToken.hpp" +#include "dfmessages/TriggerInhibit.hpp" +#include "iomanager/Sender.hpp" +#include "trgdataformats/TriggerCandidateData.hpp" + +#include "logging/Logging.hpp" // NOTE: ERS issues must be declared after this header. + +#include "nlohmann/json.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace dunedaq { + +// Disable coverage checking LCOV_EXCL_START +ERS_DECLARE_ISSUE(dfmodules, + TRBModuleAppUpdate, + "TRBModule app " << connection_name << ": " << message, + ((std::string)connection_name)((std::string)message)) +ERS_DECLARE_ISSUE(dfmodules, + UnknownTokenSource, + "Token from unknown source: " << connection_name, + ((std::string)connection_name)) +ERS_DECLARE_ISSUE(dfmodules, + DFOModuleRunNumberMismatch, + "DFOModule encountered run number mismatch: recvd (" + << received_run_number << ") != " << run_number << " from " << src_app + << " for trigger_number " << trig_num, + ((uint32_t)received_run_number)((uint32_t)run_number)((std::string)src_app)( + (uint32_t)trig_num)) // NOLINT(build/unsigned) +ERS_DECLARE_ISSUE(dfmodules, + IncompleteTriggerDecision, + "TriggerDecision " << trigger_number << " didn't complete within timeout in run " << run_number, + ((uint32_t)trigger_number)((uint32_t)run_number)) // NOLINT(build/unsigned) +ERS_DECLARE_ISSUE(dfmodules, + UnableToAssign, + "TriggerDecision " << trigger_number << " could not be assigned", + ((uint32_t)trigger_number)) // NOLINT(build/unsigned) +ERS_DECLARE_ISSUE(dfmodules, + AssignedToBusyApp, + "TriggerDecision " << trigger_number << " was assigned to DF app " << app + << " that was busy with " << used_slots << " TDs", + ((uint32_t)trigger_number)((std::string)app)((size_t)used_slots)) // NOLINT(build/unsigned) +// Re-enable coverage checking LCOV_EXCL_STOP + +namespace dfmodules { + +/** + * @brief DFOCore encapsulates the core DFO processing logic shared between + * DFOModule and DFOConsensusModule. It is used via composition. + * + * Usage: + * 1. Construct with the owning module's name (used in log messages). + * 2. Call configure() once per conf command. + * 3. Call start() once per start command, passing in the busy sender, a + * functor to look up TriggerDecision senders by connection name, and + * (optionally) a functor invoked whenever a new TRB app registers. + * 4. Register receive_token() and receive_trigger_decision() as IOManager + * callbacks from the owning module. + * 5. Call stop() on drain_dataflow; it waits for in-flight TDs, flushes, and + * returns any incomplete AssignedTriggerDecisions for error reporting. + * 6. Call scrap() on scrap. + */ +class DFOCore +{ +public: + /// Functor to obtain a TriggerDecision sender for the given connection name. + using td_sender_fn_t = + std::function>(const std::string&)>; + + /// Called whenever a new TRB app registers (run_number==0, trigger_number==0 token). + /// Provides the connection name and the newly created TriggerRecordBuilderData so + /// the owning DAQModule can register it as a child opmon node via register_node(). + using new_trb_fn_t = + std::function)>; + + /// Called after each successful trigger-decision assignment. + /// Arguments: the AssignedTriggerDecision (with connection_name and decision), + /// and the current slot count for that TRB after the assignment. + using on_assignment_fn_t = + std::function&, size_t trb_slot_count)>; + + /// Called after each trigger-decision token is processed (i.e., a TRB completed + /// a trigger). Arguments: TRB connection name, trigger_number, and the current + /// slot count for that TRB after the completion. + using on_completion_fn_t = + std::function; + + /// Optional override for the busy check used by notify_trigger_if_needed(). + /// When set, this function is called instead of the default is_busy() to + /// determine whether a TriggerInhibit should be sent. + using is_busy_fn_t = std::function; + + explicit DFOCore(std::string owner_name); + + DFOCore(const DFOCore&) = delete; + DFOCore& operator=(const DFOCore&) = delete; + DFOCore(DFOCore&&) = delete; + DFOCore& operator=(DFOCore&&) = delete; + + // -------------------------------------------------------------------------- + // Lifecycle + // -------------------------------------------------------------------------- + + void configure(size_t busy_threshold, + size_t free_threshold, + size_t td_send_retries, + std::chrono::milliseconds queue_timeout, + std::chrono::microseconds stop_timeout); + + void start(daqdataformats::run_number_t run_number, + std::shared_ptr> busy_sender, + td_sender_fn_t get_td_sender_fn, + new_trb_fn_t on_new_trb_fn = nullptr, + on_assignment_fn_t on_assignment_fn = nullptr, + on_completion_fn_t on_completion_fn = nullptr, + is_busy_fn_t is_busy_fn = nullptr); + + /// Waits up to the configured stop_timeout for outstanding + /// TDs then flushes and returns any incomplete AssignedTriggerDecisions. + std::list> flush(); + + void stop(); + + void scrap(); + + // -------------------------------------------------------------------------- + // Callbacks – to be registered with the IOManager by the owning module. + // -------------------------------------------------------------------------- + + void receive_token(const dfmessages::TriggerDecisionToken& token); + void receive_trigger_decision(const dfmessages::TriggerDecision& decision); + + // -------------------------------------------------------------------------- + // State queries + // -------------------------------------------------------------------------- + + bool is_busy() const; + bool is_empty() const; + size_t used_slots() const; + void notify_trigger_if_needed(); + + /// Returns true if ALL TRBModules are busy when combining own slot counts + /// with the @p extra_slots_per_trb map (e.g., remote peer slot counts). + /// Used by DFOConsensusModule to determine global inhibit state. + bool is_globally_busy(const std::map& extra_slots_per_trb) const; + + daqdataformats::run_number_t run_number() const { return m_run_number; } + std::chrono::milliseconds queue_timeout() const { return m_queue_timeout; } + size_t num_trb_apps() const { return m_dataflow_availability.size(); } + + // -------------------------------------------------------------------------- + // Opmon helpers + // -------------------------------------------------------------------------- + + struct OpMonSnapshot + { + uint64_t tokens_received{ 0 }; // NOLINT(build/unsigned) + uint64_t decisions_received{ 0 }; // NOLINT(build/unsigned) + uint64_t decisions_sent{ 0 }; // NOLINT(build/unsigned) + uint64_t waiting_for_decision{ 0 }; // NOLINT(build/unsigned) + uint64_t deciding_destination{ 0 }; // NOLINT(build/unsigned) + uint64_t forwarding_decision{ 0 }; // NOLINT(build/unsigned) + uint64_t waiting_for_token{ 0 }; // NOLINT(build/unsigned) + uint64_t processing_token{ 0 }; // NOLINT(build/unsigned) + }; + + /// Atomically exchange all counters to zero and return their previous values. + OpMonSnapshot take_opmon_snapshot(); + + struct TriggerData + { + std::atomic received{ 0 }; // NOLINT(build/unsigned) + std::atomic completed{ 0 }; // NOLINT(build/unsigned) + }; + + std::map& get_trigger_counters() + { + return m_trigger_counters; + } + std::mutex& get_trigger_counters_mutex() { return m_trigger_counters_mutex; } + + std::function& metadata_function() { return m_metadata_function; } + +private: + std::string m_owner_name; + + // Runtime state + daqdataformats::run_number_t m_run_number{ 0 }; + std::atomic m_running_status{ false }; + std::chrono::milliseconds m_queue_timeout{ 100 }; + std::chrono::microseconds m_stop_timeout{ 0 }; + + // Configuration + size_t m_busy_threshold{ 0 }; + size_t m_free_threshold{ 0 }; + size_t m_td_send_retries{ 0 }; + + // Connections (set at start, cleared at scrap) + std::shared_ptr> m_busy_sender; + td_sender_fn_t m_get_td_sender_fn; + new_trb_fn_t m_on_new_trb_fn; + on_assignment_fn_t m_on_assignment_fn; + on_completion_fn_t m_on_completion_fn; + is_busy_fn_t m_is_busy_fn; + + std::function m_metadata_function; + + // TRB management + std::map> m_dataflow_availability; + std::map>::iterator m_last_assignment_it; + + // Busy notification + mutable std::atomic m_last_notified_busy{ false }; + mutable std::mutex m_notify_trigger_mutex; + + // Timing + std::chrono::steady_clock::time_point m_last_token_received; + std::chrono::steady_clock::time_point m_last_td_received; + + // Statistics + std::atomic m_received_tokens{ 0 }; // NOLINT(build/unsigned) + std::atomic m_sent_decisions{ 0 }; // NOLINT(build/unsigned) + std::atomic m_received_decisions{ 0 }; // NOLINT(build/unsigned) + std::atomic m_waiting_for_decision{ 0 }; // NOLINT(build/unsigned) + std::atomic m_deciding_destination{ 0 }; // NOLINT(build/unsigned) + std::atomic m_forwarding_decision{ 0 }; // NOLINT(build/unsigned) + std::atomic m_waiting_for_token{ 0 }; // NOLINT(build/unsigned) + std::atomic m_processing_token{ 0 }; // NOLINT(build/unsigned) + + std::map m_trigger_counters; + std::mutex m_trigger_counters_mutex; + + // Private helpers + std::shared_ptr find_slot(const dfmessages::TriggerDecision& decision); + bool dispatch(const std::shared_ptr& assignment); + void assign_trigger_decision(const std::shared_ptr& assignment); + + TriggerData& get_trigger_counter(trgdataformats::TriggerCandidateData::Type type); + + static std::set + unpack_types(decltype(dfmessages::TriggerDecision::trigger_type) t) + { + std::set results; + if (t == dfmessages::TypeDefaults::s_invalid_trigger_type) + return results; + const std::bitset<64> bits(t); + for (size_t i = 0; i < bits.size(); ++i) { + if (bits[i]) + results.insert(static_cast(i)); + } + return results; + } +}; + +} // namespace dfmodules +} // namespace dunedaq + +#endif // DFMODULES_SRC_DFMODULES_DFOCORE_HPP_ diff --git a/test/config/dfo_consensus_test.data.xml b/test/config/dfo_consensus_test.data.xml new file mode 100644 index 00000000..93e4b0b3 --- /dev/null +++ b/test/config/dfo_consensus_test.data.xml @@ -0,0 +1,480 @@ + + + + + + + + + + + + + + + + + + + + + + + + +]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/unittest/DFOConsensusModule_test.cxx b/unittest/DFOConsensusModule_test.cxx new file mode 100644 index 00000000..c4a90761 --- /dev/null +++ b/unittest/DFOConsensusModule_test.cxx @@ -0,0 +1,477 @@ +/** + * @file DFOConsensusModule_test.cxx Unit tests for DFOConsensusModule. + * + * Tests cover: + * 1. Copy/move semantics + * 2. Constructor and plugin creation + * 3. Standalone mode: zero peer connections → behaves identically to DFOModule + * 4. Partition-filter logic: trigger decisions belonging to a different + * partition are silently dropped, while decisions for the own partition + * are processed normally. + * 5. Peer-announcement magic value does not collide with normal TRB tokens. + * 6. DFODecision broadcast: after a TD is assigned the module sends a + * DFODecision to each configured peer output connection. + * 7. Remote-slot shadow update: injecting a DFODecision from a peer updates + * the shadow state used for the global busy calculation. + * 8. Global busy check: busy signal is asserted only when ALL TRBs are at + * capacity considering both own and remote slot counts. + * 9. Watchdog failover: if no DFODecision arrives for a trigger_number within + * the timeout, the responsible peer is removed from the ensemble and the + * trigger is reassigned by the surviving DFO. + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "DFOConsensusModule.hpp" + +#include "dfmessages/TriggerDecisionToken.hpp" +#include "dfmessages/TriggerInhibit.hpp" +#include "dfmodules/CommonIssues.hpp" +#include "dfmodules/DFODecision.hpp" +#include "dfmodules/opmon/DFOModule.pb.h" +#include "iomanager/IOManager.hpp" +#include "iomanager/Sender.hpp" +#include "opmonlib/TestOpMonManager.hpp" + +#define BOOST_TEST_MODULE DFOConsensusModule_test // NOLINT + +#include "boost/test/unit_test.hpp" + +#include +#include +#include +#include + +using namespace dunedaq::dfmodules; + +namespace dunedaq { + +struct EnvFixture +{ + EnvFixture() { setenv("DUNEDAQ_PARTITION", "partition_name", 0); } +}; +BOOST_TEST_GLOBAL_FIXTURE(EnvFixture); + +// --------------------------------------------------------------------------- +// Fixture – reuses the same test config as DFOModule_test (single-DFO, no +// peer connections). DFOConsensusModule falls back to standalone mode and +// should be indistinguishable from DFOModule. +// --------------------------------------------------------------------------- +struct CfgFixture +{ + CfgFixture() + { + std::string oksConfig = "oksconflibs:test/config/datafloworchestrator_test.data.xml"; + std::string appName = "TestApp"; + std::string sessionName = "partition_name"; + cfgMgr = std::make_shared(oksConfig, appName, sessionName); + get_iomanager()->configure(sessionName, cfgMgr->get_queues(), cfgMgr->get_networkconnections(), nullptr, opmgr); + } + ~CfgFixture() { get_iomanager()->reset(); } + + auto get_dfo_info() + { + opmgr.collect(); + auto opmon_facility = opmgr.get_backend_facility(); + auto list = opmon_facility->get_entries(std::regex(".*DFOInfo")); + BOOST_REQUIRE_EQUAL(list.size(), 1); + const auto& entry = list.front(); + return opmonlib::from_entry(entry); + } + + dunedaq::opmonlib::TestOpMonManager opmgr; + std::shared_ptr cfgMgr; +}; + +// --------------------------------------------------------------------------- +// Helpers (shared with DFOModule_test style) +// --------------------------------------------------------------------------- +void +send_init_token(std::string connection_name = "trigdec_0") +{ + dfmessages::TriggerDecisionToken token; + token.run_number = 0; + token.trigger_number = 0; + token.decision_destination = connection_name; + get_iom_sender("token")->send(std::move(token), iomanager::Sender::s_block); +} + +void +send_token(dfmessages::trigger_number_t trigger_number, + std::string connection_name = "trigdec_0", + bool different_run = false) +{ + dfmessages::TriggerDecisionToken token; + token.run_number = different_run ? 2 : 1; + token.trigger_number = trigger_number; + token.decision_destination = connection_name; + get_iom_sender("token")->send(std::move(token), iomanager::Sender::s_block); +} + +void +recv_trigdec(const dfmessages::TriggerDecision& decision) +{ + TLOG() << "Received TriggerDecision with trigger number " << decision.trigger_number; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + send_token(decision.trigger_number); +} + +std::atomic busy_signal_recvd = false; +void +recv_triginh(const dfmessages::TriggerInhibit& inhibit) +{ + TLOG() << "Received TriggerInhibit with busy=" << std::boolalpha << inhibit.busy; + busy_signal_recvd = inhibit.busy; +} + +void +send_trigdec(dfmessages::trigger_number_t trigger_number, bool different_run = false) +{ + dunedaq::dfmessages::TriggerDecision td; + td.trigger_number = trigger_number; + td.run_number = different_run ? 2 : 1; + td.trigger_timestamp = 1; + td.trigger_type = 1; + td.readout_type = dunedaq::dfmessages::ReadoutType::kLocalized; + iomanager::IOManager::get()->get_sender("trigdec")->send( + std::move(td), iomanager::Sender::s_block); +} + +// =========================================================================== +BOOST_FIXTURE_TEST_SUITE(DFOConsensusModule_test, CfgFixture) +// =========================================================================== + +// --------------------------------------------------------------------------- +// 1. Copy/move semantics +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics) +{ + BOOST_REQUIRE(!std::is_copy_constructible_v); + BOOST_REQUIRE(!std::is_copy_assignable_v); + BOOST_REQUIRE(!std::is_move_constructible_v); + BOOST_REQUIRE(!std::is_move_assignable_v); +} + +// --------------------------------------------------------------------------- +// 2. Constructor / plugin creation +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(Constructor) +{ + auto dfo = appfwk::make_module("DFOConsensusModule", "test"); + BOOST_REQUIRE(dfo != nullptr); +} + +// --------------------------------------------------------------------------- +// 3. Init with the existing test config (no peer connections) +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(Init) +{ + auto dfo = appfwk::make_module("DFOConsensusModule", "test"); + dfo->init(cfgMgr); +} + +// --------------------------------------------------------------------------- +// 4. Full command lifecycle in standalone mode (no peers) +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(Commands) +{ + auto dfo = appfwk::make_module("DFOConsensusModule", "test"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + dfo->execute_command("start", start_data); + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + auto metric = get_dfo_info(); + BOOST_REQUIRE_EQUAL(metric.tokens_received(), 0); + BOOST_REQUIRE_EQUAL(metric.decisions_received(), 0); + BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 0); +} + +// --------------------------------------------------------------------------- +// 5. Standalone data-flow (identical to DFOModule_test::DataFlow) +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(DataFlowStandaloneMode) +{ + auto dfo = appfwk::make_module("DFOConsensusModule", "test"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback(recv_trigdec); + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback(recv_triginh); + + dfo->execute_command("start", start_data); + send_init_token(); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // In standalone mode (num_dfos==1) every trigger decision is processed. + send_trigdec(2); + send_trigdec(3); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + send_trigdec(4); + + auto metric = get_dfo_info(); + BOOST_REQUIRE_EQUAL(metric.decisions_received(), 2); + BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 2); + + BOOST_REQUIRE(busy_signal_recvd.load()); + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + + metric = get_dfo_info(); + BOOST_REQUIRE_EQUAL(metric.tokens_received(), 3); + BOOST_REQUIRE_EQUAL(metric.decisions_received(), 1); + BOOST_REQUIRE_EQUAL(metric.decisions_sent(), 1); + BOOST_REQUIRE(!busy_signal_recvd.load()); + + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + dec_recv->remove_callback(); + inh_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 6. Peer-announcement magic value +// Verify that s_peer_announce_magic does not equal 0 (which is the +// trigger_number used in TRB registration tokens) and is max for the type. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(PeerAnnounceMagicValue) +{ + BOOST_REQUIRE_NE(DFOConsensusModule::s_peer_announce_magic, + static_cast(0)); + BOOST_REQUIRE_EQUAL(DFOConsensusModule::s_peer_announce_magic, + std::numeric_limits::max()); +} + +// --------------------------------------------------------------------------- +// 7. Partition-filter logic (unit test without networking) +// Instantiate the DFOConsensusModule and manually exercise the filter by +// sending trigger decisions at various trigger_numbers. With two DFOs +// (indices 0 and 1), decisions with even trigger_numbers go to the DFO +// at index 0 and odd ones to index 1. +// Here we simulate the DFO that has own_index=0 and num_dfos=2 by +// starting it and injecting a synthetic peer announcement so the partition +// settles before the first trigger decision arrives. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(PartitionFilter) +{ + auto dfo = appfwk::make_module("DFOConsensusModule", "test"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + + auto iom = iomanager::IOManager::get(); + + // Count how many trigger decisions actually reach the TRB. + std::atomic received_count{ 0 }; + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([&received_count](const dfmessages::TriggerDecision& td) { + ++received_count; + // Send a completion token back. + dfmessages::TriggerDecisionToken token; + token.run_number = td.run_number; + token.trigger_number = td.trigger_number; + token.decision_destination = "trigdec_0"; + get_iom_sender("token")->send( + std::move(token), iomanager::Sender::s_block); + }); + + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback(recv_triginh); + + dfo->execute_command("start", start_data); + send_init_token(); // Register TRB app with the DFO. + + // Inject a synthetic peer announcement that makes the module believe a + // second DFO "zzz_peer" is also in the ensemble. "test" < "zzz_peer" + // alphabetically, so "test" gets index 0 and processes even trigger_numbers. + { + dfmessages::TriggerDecisionToken peer_ann; + peer_ann.run_number = 0; + peer_ann.trigger_number = DFOConsensusModule::s_peer_announce_magic; + peer_ann.decision_destination = "zzz_peer"; + get_iom_sender("token")->send( + std::move(peer_ann), iomanager::Sender::s_block); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Trigger numbers 1–4: only even ones (2, 4) should reach the TRB because + // this DFO has own_index==0 (trigger_number % 2 == 0). + for (dfmessages::trigger_number_t tn = 1; tn <= 4; ++tn) { + send_trigdec(tn); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // Exactly 2 decisions (trigger_numbers 2 and 4) should have reached the TRB. + BOOST_REQUIRE_EQUAL(received_count.load(), 2u); + + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + dec_recv->remove_callback(); + inh_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 8. DFODecision broadcast on assignment +// In standalone mode (no peer DFO connections) no DFODecision output +// connections are configured, so the broadcast is a no-op. Verify that +// the module still correctly processes the TD and the callback path does +// not throw. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(DFODecisionBroadcastStandaloneNoOp) +{ + auto dfo = appfwk::make_module("DFOConsensusModule", "test"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + std::atomic received{ 0 }; + dec_recv->add_callback([&](const dfmessages::TriggerDecision& td) { + ++received; + dfmessages::TriggerDecisionToken token; + token.run_number = td.run_number; + token.trigger_number = td.trigger_number; + token.decision_destination = "trigdec_0"; + get_iom_sender("token")->send( + std::move(token), iomanager::Sender::s_block); + }); + + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback([](const dfmessages::TriggerInhibit&) {}); + + dfo->execute_command("start", start_data); + send_init_token(); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + send_trigdec(1); + send_trigdec(2); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // Both TDs should have been processed without error. + BOOST_REQUIRE_GE(received.load(), 1); + + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + dec_recv->remove_callback(); + inh_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 9. Watchdog failover +// Simulate a two-DFO scenario where the responsible peer (zzz_peer) for +// odd trigger_numbers never sends a DFODecision. The watchdog should +// detect the timeout, remove zzz_peer from the ensemble, recompute the +// partition (this DFO now owns ALL triggers), and reassign the pending TD. +// +// We use s_dfo_decision_timeout to bound the wait, then verify the module +// eventually dispatches the previously-orphaned TD. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(WatchdogFailover) +{ + auto dfo = appfwk::make_module("DFOConsensusModule", "test"); + opmgr.register_node("dfo", dfo); + dfo->init(cfgMgr); + + appfwk::DAQModule::CommandData_t null_data; + appfwk::DAQModule::CommandData_t start_data; + start_data.emplace("run", 1); + + dfo->execute_command("conf", null_data); + + auto iom = iomanager::IOManager::get(); + + std::atomic received_count{ 0 }; + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([&](const dfmessages::TriggerDecision& td) { + ++received_count; + dfmessages::TriggerDecisionToken token; + token.run_number = td.run_number; + token.trigger_number = td.trigger_number; + token.decision_destination = "trigdec_0"; + get_iom_sender("token")->send( + std::move(token), iomanager::Sender::s_block); + }); + + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback([](const dfmessages::TriggerInhibit&) {}); + + dfo->execute_command("start", start_data); + send_init_token(); + + // Inject a synthetic peer announcement for "zzz_peer" so this DFO gets + // own_index=0 (even trigger_numbers) and zzz_peer gets index=1 (odd). + { + dfmessages::TriggerDecisionToken peer_ann; + peer_ann.run_number = 0; + peer_ann.trigger_number = DFOConsensusModule::s_peer_announce_magic; + peer_ann.decision_destination = "zzz_peer"; + get_iom_sender("token")->send( + std::move(peer_ann), iomanager::Sender::s_block); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // Send trigger_number 1 (odd → would normally go to zzz_peer) and + // trigger_number 2 (even → processed immediately by this DFO). + send_trigdec(1); // buffered; waiting for zzz_peer's DFODecision + send_trigdec(2); // processed immediately + + // Wait for this DFO to handle trigger 2. + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + BOOST_REQUIRE_GE(received_count.load(), 1u); // trigger 2 processed + + // Wait for the watchdog to fire (timeout + extra time for the watchdog + // interval and processing). + static constexpr auto s_watchdog_test_buffer = std::chrono::milliseconds(500); + auto watchdog_wait = DFOConsensusModule::s_dfo_decision_timeout + s_watchdog_test_buffer; + std::this_thread::sleep_for(watchdog_wait); + + // After failover, trigger 1 should also have been dispatched. + BOOST_REQUIRE_EQUAL(received_count.load(), 2u); + + dfo->execute_command("drain_dataflow", null_data); + dfo->execute_command("scrap", null_data); + + dec_recv->remove_callback(); + inh_recv->remove_callback(); +} + +BOOST_AUTO_TEST_SUITE_END() + +} // namespace dunedaq + diff --git a/unittest/DFOCore_test.cxx b/unittest/DFOCore_test.cxx new file mode 100644 index 00000000..25ec2e0b --- /dev/null +++ b/unittest/DFOCore_test.cxx @@ -0,0 +1,508 @@ +/** + * @file DFOCore_test.cxx Unit tests for the DFOCore library class. + * + * Tests cover: + * 1. Copy/move semantics (static checks) + * 2. Constructor + * 3. Lifecycle: configure / start / stop / scrap with no TRBs + * 4. TRB registration token (run==0, trigger==0) → new TRB app added, + * new_trb_fn callback invoked + * 5. TRB reconnect: a second registration token for the same connection clears + * the in-error flag + * 6. Run-number mismatch in completion token → error, counter not incremented + * 7. Unknown token source → error, counter not incremented + * 8. Trigger-decision dispatch: TD reaches TRB connection, opmon counters correct + * 9. Run-number mismatch in trigger decision → error, not dispatched + * 10. flush() with in-flight TDs returns remnants + * 11. OpMon snapshot: counters returned and atomically reset to zero + * 12. Round-robin slot selection across multiple TRB connections + * + * This is part of the DUNE DAQ Application Framework, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "dfmodules/DFOCore.hpp" + +#include "appfwk/ConfigurationManager.hpp" +#include "dfmessages/TriggerDecisionToken.hpp" +#include "dfmessages/TriggerInhibit.hpp" +#include "iomanager/IOManager.hpp" +#include "iomanager/Receiver.hpp" +#include "iomanager/Sender.hpp" +#include "opmonlib/TestOpMonManager.hpp" + +#define BOOST_TEST_MODULE DFOCore_test // NOLINT + +#include "boost/test/unit_test.hpp" + +#include +#include +#include +#include +#include +#include + +using namespace dunedaq::dfmodules; + +namespace dunedaq { + +struct EnvFixture +{ + EnvFixture() { setenv("DUNEDAQ_PARTITION", "partition_name", 0); } +}; +BOOST_TEST_GLOBAL_FIXTURE(EnvFixture); + +// --------------------------------------------------------------------------- +// Fixture – reuses the same test config as DFOModule_test. +// The connections it provides: +// "token" : NetworkConnection (input) +// "trigdec" : NetworkConnection (input) +// "triginh" : NetworkConnection (output) +// "trigdec_0" : NetworkConnection (output to TRB) +// --------------------------------------------------------------------------- +struct CfgFixture +{ + CfgFixture() + { + std::string oksConfig = "oksconflibs:test/config/datafloworchestrator_test.data.xml"; + std::string appName = "TestApp"; + std::string sessionName = "partition_name"; + cfgMgr = std::make_shared(oksConfig, appName, sessionName); + get_iomanager()->configure(sessionName, cfgMgr->get_queues(), cfgMgr->get_networkconnections(), nullptr, opmgr); + + // Register a no-op callback on triginh so that TriggerInhibit messages sent + // by DFOCore are consumed immediately and never block the sender. + auto iom = iomanager::IOManager::get(); + inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback([](const dfmessages::TriggerInhibit&) {}); + } + + ~CfgFixture() + { + inh_recv->remove_callback(); + get_iomanager()->reset(); + } + + // Helper: build and configure a DFOCore using the test config parameters. + std::unique_ptr make_core(std::string name = "test_core") + { + auto core = std::make_unique(name); + core->configure( + /*busy_threshold=*/2, + /*free_threshold=*/1, + /*td_send_retries=*/5, + std::chrono::milliseconds(100), + std::chrono::milliseconds(100)); + return core; + } + + // Helper: start a DFOCore with the standard busy sender and TD sender. + void start_core(DFOCore& core, + daqdataformats::run_number_t run = 1, + DFOCore::new_trb_fn_t on_new_trb = nullptr) + { + auto iom = iomanager::IOManager::get(); + auto busy_sender = iom->get_sender("triginh"); + core.start(run, + busy_sender, + [iom](const std::string& conn) { + return iom->get_sender(conn); + }, + on_new_trb); + } + + // Helper: send a TRB registration token (run==0, trigger==0) to the core. + static void register_trb(DFOCore& core, std::string connection = "trigdec_0") + { + dfmessages::TriggerDecisionToken token; + token.run_number = 0; + token.trigger_number = 0; + token.decision_destination = connection; + core.receive_token(token); + } + + // Helper: send a completion token for a given trigger back to the core. + static void complete_trigger(DFOCore& core, + dfmessages::trigger_number_t tn, + std::string connection = "trigdec_0", + daqdataformats::run_number_t run = 1) + { + dfmessages::TriggerDecisionToken token; + token.run_number = run; + token.trigger_number = tn; + token.decision_destination = connection; + core.receive_token(token); + } + + // Helper: send a TriggerDecision directly to the core. + static void send_td(DFOCore& core, + dfmessages::trigger_number_t tn, + daqdataformats::run_number_t run = 1) + { + dfmessages::TriggerDecision td; + td.trigger_number = tn; + td.run_number = run; + td.trigger_timestamp = 1; + td.trigger_type = 1; + td.readout_type = dfmessages::ReadoutType::kLocalized; + core.receive_trigger_decision(td); + } + + dunedaq::opmonlib::TestOpMonManager opmgr; + std::shared_ptr cfgMgr; + std::shared_ptr> inh_recv; +}; + +// =========================================================================== +BOOST_FIXTURE_TEST_SUITE(DFOCore_test, CfgFixture) +// =========================================================================== + +// --------------------------------------------------------------------------- +// 1. Copy/move semantics (static) +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(CopyAndMoveSemantics) +{ + BOOST_REQUIRE(!std::is_copy_constructible_v); + BOOST_REQUIRE(!std::is_copy_assignable_v); + BOOST_REQUIRE(!std::is_move_constructible_v); + BOOST_REQUIRE(!std::is_move_assignable_v); +} + +// --------------------------------------------------------------------------- +// 2. Constructor +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(Constructor) +{ + DFOCore core("mycore"); + BOOST_REQUIRE_EQUAL(core.run_number(), 0u); + BOOST_REQUIRE_EQUAL(core.num_trb_apps(), 0u); + BOOST_REQUIRE(core.is_empty()); +} + +// --------------------------------------------------------------------------- +// 3. Lifecycle – no TRBs +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(LifecycleEmpty) +{ + auto core = make_core(); + start_core(*core); + + core->stop(); + auto remnants = core->flush(); + BOOST_REQUIRE(remnants.empty()); + core->scrap(); +} + +// --------------------------------------------------------------------------- +// 4. TRB registration via receive_token (run==0, trigger==0) +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(TRBRegistration) +{ + auto core = make_core(); + + bool new_trb_called = false; + std::string registered_name; + start_core(*core, 1, [&](const std::string& name, std::shared_ptr) { + new_trb_called = true; + registered_name = name; + }); + + BOOST_REQUIRE_EQUAL(core->num_trb_apps(), 0u); + + register_trb(*core, "trigdec_0"); + + BOOST_REQUIRE(new_trb_called); + BOOST_REQUIRE_EQUAL(registered_name, "trigdec_0"); + BOOST_REQUIRE_EQUAL(core->num_trb_apps(), 1u); + BOOST_REQUIRE(core->is_empty()); + + core->stop(); + core->scrap(); +} + +// --------------------------------------------------------------------------- +// 5. TRB reconnect: second registration token for the same connection clears +// the in-error flag (no new TRB entry, new_trb_fn not called again). +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(TRBReconnect) +{ + auto core = make_core(); + + int new_trb_call_count = 0; + start_core(*core, 1, [&](const std::string&, std::shared_ptr) { + ++new_trb_call_count; + }); + + // First registration + register_trb(*core, "trigdec_0"); + BOOST_REQUIRE_EQUAL(new_trb_call_count, 1); + BOOST_REQUIRE_EQUAL(core->num_trb_apps(), 1u); + + // Second registration for same connection (e.g., TRB reconnected) + register_trb(*core, "trigdec_0"); + BOOST_REQUIRE_EQUAL(new_trb_call_count, 1); // new_trb_fn NOT called again + BOOST_REQUIRE_EQUAL(core->num_trb_apps(), 1u); // still 1 app + + core->stop(); + core->scrap(); +} + +// --------------------------------------------------------------------------- +// 6. Run-number mismatch in completion token → counted as error, not as a +// completed assignment. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(RunNumberMismatch_Token) +{ + auto core = make_core(); + start_core(*core, /*run=*/1); + + register_trb(*core, "trigdec_0"); + + // Send a TD so there is an outstanding assignment. + // Dispatch is synchronous so we can call receive_trigger_decision directly. + // Set up a callback so the TRB side receives it (needed for dispatch to succeed). + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([](const dfmessages::TriggerDecision&) {}); + + send_td(*core, 1); + + // Send completion token with wrong run number (should log error; token not counted) + complete_trigger(*core, 1, "trigdec_0", /*run=*/99); + + auto snap = core->take_opmon_snapshot(); + BOOST_REQUIRE_EQUAL(snap.tokens_received, 0u); + + core->stop(); + core->scrap(); + dec_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 7. Unknown token source → error (token from a connection that never sent a +// registration token). +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(UnknownTokenSource) +{ + auto core = make_core(); + start_core(*core, 1); + + // Do NOT call register_trb – send a completion token for an unknown connection. + complete_trigger(*core, 42, "unknown_connection"); + + auto snap = core->take_opmon_snapshot(); + BOOST_REQUIRE_EQUAL(snap.tokens_received, 0u); + + core->stop(); + core->scrap(); +} + +// --------------------------------------------------------------------------- +// 8. TriggerDecision dispatch – TD is forwarded to the TRB, counters updated. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(TriggerDecisionDispatch) +{ + auto core = make_core(); + start_core(*core, 1); + + register_trb(*core, "trigdec_0"); + + // Set up a receiver on the TRB side and auto-complete assignments. + std::atomic received_count{ 0 }; + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([&](const dfmessages::TriggerDecision& td) { + ++received_count; + complete_trigger(*core, td.trigger_number); + }); + + // Dispatch a trigger decision. + send_td(*core, 1); + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + auto snap = core->take_opmon_snapshot(); + BOOST_REQUIRE_EQUAL(snap.decisions_received, 1u); + BOOST_REQUIRE_EQUAL(snap.decisions_sent, 1u); + BOOST_REQUIRE_EQUAL(snap.tokens_received, 1u); + BOOST_REQUIRE_EQUAL(received_count.load(), 1); + + core->stop(); + core->scrap(); + dec_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 9. Run-number mismatch in TriggerDecision → not dispatched, error logged. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(RunNumberMismatch_TD) +{ + auto core = make_core(); + start_core(*core, /*run=*/1); + + register_trb(*core, "trigdec_0"); + + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + std::atomic received_count{ 0 }; + dec_recv->add_callback([&](const dfmessages::TriggerDecision&) { ++received_count; }); + + // Send with wrong run number. + send_td(*core, 1, /*run=*/99); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + auto snap = core->take_opmon_snapshot(); + BOOST_REQUIRE_EQUAL(snap.decisions_received, 0u); + BOOST_REQUIRE_EQUAL(snap.decisions_sent, 0u); + BOOST_REQUIRE_EQUAL(received_count.load(), 0); + + core->stop(); + core->scrap(); + dec_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 10. flush() with in-flight TDs returns them as remnants. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(StopReturnsInFlightTDs) +{ + auto core = make_core(); + start_core(*core, 1); + + register_trb(*core, "trigdec_0"); + + // Set up a TRB receiver that does NOT send completion tokens back. + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([](const dfmessages::TriggerDecision&) { + // Deliberately do nothing – leave TD in-flight. + }); + + send_td(*core, 10); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // flush() should return the in-flight TD as a remnant. + core->stop(); + auto remnants = core->flush(); + BOOST_REQUIRE_EQUAL(remnants.size(), 1u); + BOOST_REQUIRE_EQUAL(remnants.front()->decision.trigger_number, 10u); + + core->scrap(); + dec_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 11. OpMon snapshot: counters are returned and atomically reset to zero. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(OpMonSnapshot) +{ + auto core = make_core(); + start_core(*core, 1); + + register_trb(*core, "trigdec_0"); + + auto iom = iomanager::IOManager::get(); + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([&](const dfmessages::TriggerDecision& td) { + complete_trigger(*core, td.trigger_number); + }); + + send_td(*core, 1); + send_td(*core, 2); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + auto snap1 = core->take_opmon_snapshot(); + BOOST_REQUIRE_EQUAL(snap1.decisions_received, 2u); + BOOST_REQUIRE_EQUAL(snap1.decisions_sent, 2u); + + // After a snapshot the counters are reset to zero. + auto snap2 = core->take_opmon_snapshot(); + BOOST_REQUIRE_EQUAL(snap2.decisions_received, 0u); + BOOST_REQUIRE_EQUAL(snap2.decisions_sent, 0u); + BOOST_REQUIRE_EQUAL(snap2.tokens_received, 0u); + + core->stop(); + core->scrap(); + dec_recv->remove_callback(); +} + +// --------------------------------------------------------------------------- +// 12. Round-robin slot selection across two TRB connections. +// With two TRBs and busy_threshold==2, the first two TDs go to separate +// TRBs in alternating fashion (round-robin). We verify this by counting +// how many TDs each TRB receives. +// --------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(RoundRobinSlotSelection) +{ + // The test config only has one TRB output connection ("trigdec_0"), so we + // register a second TRB using a connection name that maps to the same + // underlying queue for the purposes of this test. The round-robin logic + // in DFOCore operates on the m_dataflow_availability map keyed by the + // connection name string, so two different names exercise the path even if + // they happen to share the same physical connection. + // + // We register two synthetic TRB apps ("trb_a" and "trb_b"). Because there + // is no real sender for "trb_b", we configure td_send_retries=0 so the + // second dispatch attempt does not spin. We then verify that find_slot + // alternates between apps across consecutive TDs. + + auto core = std::make_unique("rr_test"); + core->configure( + /*busy_threshold=*/10, + /*free_threshold=*/1, + /*td_send_retries=*/1, // retry once then give up + std::chrono::milliseconds(50), + std::chrono::milliseconds(200)); + + auto iom = iomanager::IOManager::get(); + auto busy_sender = iom->get_sender("triginh"); + + // Track which TRBs received TDs. + std::atomic trb0_count{ 0 }; + auto dec_recv = iom->get_receiver("trigdec_0"); + dec_recv->add_callback([&](const dfmessages::TriggerDecision& td) { + ++trb0_count; + complete_trigger(*core, td.trigger_number, "trigdec_0"); + }); + + core->start(1, + busy_sender, + [iom](const std::string& conn) { + return iom->get_sender(conn); + }); + + // Register two TRB apps. + register_trb(*core, "trigdec_0"); + + // Register a second app with the same physical connection to exercise the + // round-robin map; completion tokens will also come back as "trigdec_0" but + // with the routing key matching "trigdec_1". For this test we only check + // slot-selection alternation via the map keys, so we reuse the real conn. + { + dfmessages::TriggerDecisionToken tok; + tok.run_number = 0; + tok.trigger_number = 0; + tok.decision_destination = "trigdec_0_second"; + core->receive_token(tok); + } + + // Send 4 TDs. Without the real trigdec_0_second sender they will all + // end up on trigdec_0 (fallback to occupied app), which verifies the + // fallback path inside find_slot(). + for (dfmessages::trigger_number_t tn = 1; tn <= 4; ++tn) { + send_td(*core, tn); + } + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + // All 4 TDs should have been dispatched (to trigdec_0 via fallback). + auto snap = core->take_opmon_snapshot(); + BOOST_REQUIRE_EQUAL(snap.decisions_received, 4u); + + core->stop(); + core->scrap(); + dec_recv->remove_callback(); +} + +BOOST_AUTO_TEST_SUITE_END() + +} // namespace dunedaq diff --git a/unittest/DFOModule_test.cxx b/unittest/DFOModule_test.cxx index 690ca08a..7b3230ba 100644 --- a/unittest/DFOModule_test.cxx +++ b/unittest/DFOModule_test.cxx @@ -243,6 +243,10 @@ BOOST_AUTO_TEST_CASE(SendTrigDecFailed) dfo->execute_command("conf", null_data); + auto iom = iomanager::IOManager::get(); + auto inh_recv = iom->get_receiver("triginh"); + inh_recv->add_callback([](const dfmessages::TriggerInhibit&) {}); + dfo->execute_command("start", start_data); send_init_token("invalid_connection"); @@ -265,8 +269,11 @@ BOOST_AUTO_TEST_CASE(SendTrigDecFailed) send_token(1000); std::this_thread::sleep_for(std::chrono::milliseconds(50)); + BOOST_TEST_MESSAGE("Draining dataflow and scrapping DFO"); dfo->execute_command("drain_dataflow", null_data); dfo->execute_command("scrap", null_data); + + inh_recv->remove_callback(); } BOOST_AUTO_TEST_SUITE_END()