Skip to content

Commit b2d087c

Browse files
committed
fmq: channel return code handling
1 parent 8fef219 commit b2d087c

6 files changed

Lines changed: 143 additions & 79 deletions

File tree

src/StfBuilder/StfBuilderInput.cxx

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ mRunning = false;
7777
/// Receiving thread
7878
void StfInputInterface::DataHandlerThread()
7979
{
80+
using namespace std::chrono_literals;
8081
constexpr std::uint32_t cInvalidStfId = ~0;
8182
std::vector<FairMQMessagePtr> lReadoutMsgs;
8283
lReadoutMsgs.reserve(4096);
@@ -94,18 +95,33 @@ void StfInputInterface::DataHandlerThread()
9495
lReadoutMsgs.clear();
9596

9697
// receive readout messages
97-
const auto lRet = lInputChan.Receive(lReadoutMsgs);
98-
if (lRet < 0 && !mAcceptingData) { // NOT in FMQ:running state
98+
const std::int64_t lRet = lInputChan.Receive(lReadoutMsgs);
99+
100+
// timeout ok
101+
if (lRet == static_cast<int64_t>(fair::mq::TransferCode::timeout)) {
102+
continue;
103+
}
104+
105+
// interrupted
106+
if (lRet == static_cast<int64_t>(fair::mq::TransferCode::interrupted)) {
107+
if (mAcceptingData) {
108+
IDDLOG_RL(1000, "READOUT INTERFACE: Receive failed. FMQ state interrupted.");
109+
}
110+
std::this_thread::sleep_for(10ms);
99111
continue;
100112
}
101113

102-
if (lRet >= 0 && !mAcceptingData) {
103-
WDDLOG_RL(1000, "READOUT INTERFACE: Receive data but we are not in FMQ:RUNNING state.");
114+
// not in running state
115+
if (lRet > 0 && !mAcceptingData) {
116+
WDDLOG_RL(1000, "READOUT INTERFACE: Discarding received data because we are not in the FMQ:RUNNING state.");
104117
continue;
105118
}
106119

107-
if (lRet < 0 && mRunning && mAcceptingData) {
108-
WDDLOG_RL(1000, "READOUT INTERFACE: Receive failed . err={}", std::to_string(lRet));
120+
// error
121+
if (lRet == static_cast<int64_t>(fair::mq::TransferCode::error)) {
122+
EDDLOG_RL(1000, "READOUT INTERFACE: Receive failed. fmq_error={} errno={} error={}",
123+
lRet, errno, std::string(strerror(errno)));
124+
std::this_thread::sleep_for(10ms);
109125
continue;
110126
}
111127

@@ -180,8 +196,9 @@ void StfInputInterface::DataHandlerThread()
180196
/// StfBuilding thread
181197
void StfInputInterface::StfBuilderThread()
182198
{
183-
static constexpr bool cBuildOnTimeout = false;
184199
using namespace std::chrono_literals;
200+
201+
static constexpr bool cBuildOnTimeout = false;
185202
// current TF Id
186203
constexpr std::uint32_t cInvalidStfId = ~0;
187204
std::uint32_t lCurrentStfId = cInvalidStfId;
@@ -397,12 +414,12 @@ void StfInputInterface::StfBuilderThread()
397414

398415
void StfInputInterface::StfSequencerThread()
399416
{
400-
static constexpr std::uint64_t sMaxMissingStfsForSeq = 2ull * 11234 / 256; // 1 seconds of STFs
417+
using namespace std::chrono_literals;
401418

402-
std::uint64_t lLastStfId = -std::uint64_t(1);
419+
static constexpr std::uint64_t sMaxMissingStfsForSeq = 2ull * 11234 / 256; // 2 seconds of STFs
403420

404421
while (mRunning) {
405-
auto lStf = mSeqStfQueue.pop_wait_for(std::chrono::microseconds(500000));
422+
auto lStf = mSeqStfQueue.pop_wait_for(500ms);
406423

407424
if (lStf == std::nullopt || !mAcceptingData) {
408425
continue;

src/TfScheduler/TfSchedulerStfInfo.cxx

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ void TfSchedulerStfInfo::StaleCleanupThread()
138138
const auto lNumStfSenders = mDiscoveryConfig->status().stf_sender_count();
139139
const std::set<std::string> lStfSenderIdSet = mConnManager.getStfSenderSet();
140140
std::vector<std::uint64_t> lStfsToErase;
141-
lStfsToErase.reserve(1000);
142141
auto lLastDiscardTime = std::chrono::steady_clock::now();
143142

144143
// count how many time an FLP was missing STFs
@@ -147,15 +146,8 @@ void TfSchedulerStfInfo::StaleCleanupThread()
147146
std::vector<StfInfo> lStfInfos;
148147

149148
while (mRunning) {
150-
std::this_thread::sleep_for(2s);
151-
152-
const auto lNow = std::chrono::steady_clock::now();
153-
154-
if (lNow - lLastDiscardTime < sStfDiscardTimeout) {
155-
continue;
156-
}
157-
158-
lLastDiscardTime = lNow;
149+
std::this_thread::sleep_for(std::chrono::seconds(sStfDiscardTimeout));
150+
lLastDiscardTime = std::chrono::steady_clock::now();
159151

160152
lStfsToErase.clear();
161153
{
@@ -171,13 +163,12 @@ void TfSchedulerStfInfo::StaleCleanupThread()
171163
lStfId, lStfInfoVec.size(), lNumStfSenders);
172164

173165
lStfsToErase.push_back(lStfId);
174-
requestDropAllLocked(lStfId);
175166
continue;
176167
}
177168

178169
// check reap
179170
const auto &lLastStfInfo = lStfInfoVec.back();
180-
const auto lTimeDiff = std::chrono::abs(lLastStfInfo.mUpdateLocalTime - lNow);
171+
const auto lTimeDiff = std::chrono::abs(lLastStfInfo.mUpdateLocalTime - lLastDiscardTime);
181172
if (lTimeDiff > sStfDiscardTimeout) {
182173
WDDLOG_RL(1000, "Discarding incomplete SubTimeFrame. stf_id={} received={} expected={}",
183174
lStfId, lStfInfoVec.size(), lNumStfSenders);
@@ -194,18 +185,28 @@ void TfSchedulerStfInfo::StaleCleanupThread()
194185

195186
for (const auto &lStf : lMissingStfSenders) {
196187
lStfSenderMissingCnt[lStf]++;
197-
DDDLOG("Missing STF ids: stfs_id={} missing_cnt={}", lStf, lStfSenderMissingCnt[lStf]);
198188
}
199189

200190
lStfsToErase.push_back(lStfId);
201-
requestDropAllLocked(lStfId);
202191
}
203192
}
193+
194+
// drop outside of the main iteration loop
195+
for(const auto &lStfIdToDrop : lStfsToErase) {
196+
requestDropAllLocked(lStfIdToDrop);
197+
}
204198
}
205199

206200
if (lStfsToErase.size() > 0) {
207201
WDDLOG("SchedulingThread: TFs have been discarded due to incomplete number of STFs. discarded_tf_count={}",
208202
lStfsToErase.size());
203+
204+
for (const auto &lStfSenderCnt : lStfSenderMissingCnt) {
205+
if (lStfSenderCnt.second > 0) {
206+
DDDLOG("StfSender with missing ids: stfsender_id={} missing_cnt={}",
207+
lStfSenderCnt.first, lStfSenderCnt.second);
208+
}
209+
}
209210
}
210211
}
211212

src/common/SubTimeFrameDPL.cxx

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
#include <Framework/DataProcessingHeader.h>
1919
#include <Headers/Stack.h>
2020

21-
namespace o2
22-
{
23-
namespace DataDistribution
21+
namespace o2::DataDistribution
2422
{
2523

2624
using namespace o2::header;
@@ -322,24 +320,42 @@ std::unique_ptr<SubTimeFrame> DplToStfAdapter::deserialize(FairMQParts& pMsgs)
322320
return deserialize_impl();
323321
}
324322

325-
std::unique_ptr<SubTimeFrame> DplToStfAdapter::deserialize(FairMQChannel& pChan)
323+
std::unique_ptr<SubTimeFrame> DplToStfAdapter::deserialize(FairMQChannel& pChan, bool pLogError)
326324
{
327325
mMessages.clear();
328-
const std::int64_t ret = pChan.Receive(mMessages, 500 /* ms */);
326+
const std::int64_t lRet = pChan.Receive(mMessages, 500 /* ms */);
329327

330-
// timeout ?
331-
if (ret == -2) {
332-
return nullptr;
333-
}
334-
335-
if (ret < 0) {
336-
EDDLOG_RL(1000, "STF receive failed err={} errno={} error={}", ret, errno, std::string(strerror(errno)));
337-
mMessages.clear();
338-
return nullptr;
328+
switch (lRet) {
329+
case static_cast<std::int64_t>(fair::mq::TransferCode::timeout):
330+
mMessages.clear();
331+
return nullptr;
332+
break;
333+
case static_cast<std::int64_t>(fair::mq::TransferCode::interrupted):
334+
if (pLogError) {
335+
IDDLOG_RL(1000, "STF receive failed. what=fair::mq::TransferCode::interrupted");
336+
}
337+
mMessages.clear();
338+
return nullptr;
339+
break;
340+
case static_cast<std::int64_t>(fair::mq::TransferCode::error):
341+
EDDLOG_RL(1000, "STF receive failed. what=fair::mq::TransferCode::error err={} errno={} error={}",
342+
int(lRet), errno, std::string(strerror(errno)));
343+
mMessages.clear();
344+
return nullptr;
345+
break;
346+
default: // data or zero
347+
if (lRet > 0) {
348+
return deserialize_impl();
349+
} else {
350+
WDDLOG_RL(1000, "STF receive failed. what=zero_size");
351+
mMessages.clear();
352+
return nullptr;
353+
}
354+
break;
339355
}
340356

341-
return deserialize_impl();
357+
assert (false);
358+
return nullptr;
342359
}
343360

344-
}
345361
} /* o2::DataDistribution */

src/common/SubTimeFrameDPL.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
#include <Headers/DataHeader.h>
2121

22-
namespace o2
23-
{
24-
namespace DataDistribution
22+
namespace o2::DataDistribution
2523
{
2624

2725
////////////////////////////////////////////////////////////////////////////////
@@ -65,7 +63,7 @@ class DplToStfAdapter : public ISubTimeFrameVisitor
6563
DplToStfAdapter() = default;
6664
virtual ~DplToStfAdapter() = default;
6765

68-
std::unique_ptr<SubTimeFrame> deserialize(FairMQChannel& pChan);
66+
std::unique_ptr<SubTimeFrame> deserialize(FairMQChannel& pChan, bool logError = false);
6967
std::unique_ptr<SubTimeFrame> deserialize(FairMQParts& pMsgs);
7068

7169
protected:
@@ -76,7 +74,6 @@ class DplToStfAdapter : public ISubTimeFrameVisitor
7674
std::vector<FairMQMessagePtr> mMessages;
7775
};
7876

79-
}
8077
} /* o2::DataDistribution */
8178

8279
#endif /* ALICEO2_SUBTIMEFRAME_DPL_H_ */

src/common/SubTimeFrameVisitors.cxx

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
#include <memory>
2525
#include <algorithm>
2626

27-
namespace o2
28-
{
29-
namespace DataDistribution
27+
namespace o2::DataDistribution
3028
{
3129

3230
using namespace o2::header;
@@ -126,22 +124,42 @@ void InterleavedHdrDataDeserializer::visit(SubTimeFrame& pStf)
126124
}
127125
}
128126

129-
std::unique_ptr<SubTimeFrame> InterleavedHdrDataDeserializer::deserialize(FairMQChannel& pChan)
127+
std::unique_ptr<SubTimeFrame> InterleavedHdrDataDeserializer::deserialize(FairMQChannel& pChan, bool pLogError)
130128
{
131-
const std::int64_t ret = pChan.Receive(mMessages, 500 /* ms */);
132-
133-
// timeout ?
134-
if (ret == -2) {
135-
return nullptr;
136-
}
137-
138-
if (ret < 0) {
139-
EDDLOG_RL(1000, "STF receive failed err={} errno={} error={}", ret, errno, std::string(strerror(errno)));
140-
mMessages.clear();
141-
return nullptr;
129+
mMessages.clear();
130+
const std::int64_t lRet = pChan.Receive(mMessages, 500 /* ms */);
131+
132+
switch (lRet) {
133+
case static_cast<std::int64_t>(fair::mq::TransferCode::timeout):
134+
mMessages.clear();
135+
return nullptr;
136+
break;
137+
case static_cast<std::int64_t>(fair::mq::TransferCode::interrupted):
138+
if (pLogError) {
139+
IDDLOG_RL(1000, "STF receive failed. what=fair::mq::TransferCode::interrupted");
140+
}
141+
mMessages.clear();
142+
return nullptr;
143+
break;
144+
case static_cast<std::int64_t>(fair::mq::TransferCode::error):
145+
EDDLOG_RL(1000, "STF receive failed. what=fair::mq::TransferCode::error err={} errno={} error={}",
146+
int(lRet), errno, std::string(strerror(errno)));
147+
mMessages.clear();
148+
return nullptr;
149+
break;
150+
default: // data or zero
151+
if (lRet > 0) {
152+
return deserialize_impl();
153+
} else {
154+
WDDLOG_RL(1000, "STF receive failed. what=zero_size");
155+
mMessages.clear();
156+
return nullptr;
157+
}
158+
break;
142159
}
143160

144-
return deserialize_impl();
161+
assert (false);
162+
return nullptr;
145163
}
146164

147165
std::unique_ptr<SubTimeFrame> InterleavedHdrDataDeserializer::deserialize(FairMQParts& pMsgs)
@@ -305,28 +323,44 @@ void CoalescedHdrDataDeserializer::visit(SubTimeFrame& pStf)
305323
}
306324
}
307325

308-
std::unique_ptr<SubTimeFrame> CoalescedHdrDataDeserializer::deserialize(FairMQChannel& pChan)
326+
std::unique_ptr<SubTimeFrame> CoalescedHdrDataDeserializer::deserialize(FairMQChannel& pChan, bool pLogError)
309327
{
310328
mHdrs.clear();
311329
mData.clear();
312330

313-
const std::int64_t ret = pChan.Receive(mData, 500 /* ms */);
331+
const std::int64_t lRet = pChan.Receive(mData, 500 /* ms */);
314332

315-
// timeout ?
316-
if (ret == -2) {
317-
mData.clear();
318-
return nullptr;
319-
}
320-
321-
if (ret < 0) {
322-
DDLOGF_GRL(1000, DataDistSeverity::error, "STF receive failed err={} errno={} error={}", ret, errno,
323-
std::string(strerror(errno)));
324-
325-
mData.clear();
326-
return nullptr;
333+
switch (lRet) {
334+
case static_cast<std::int64_t>(fair::mq::TransferCode::timeout):
335+
mData.clear();
336+
return nullptr;
337+
break;
338+
case static_cast<std::int64_t>(fair::mq::TransferCode::interrupted):
339+
if (pLogError) {
340+
IDDLOG_RL(1000, "STF receive failed. what=fair::mq::TransferCode::interrupted");
341+
}
342+
mData.clear();
343+
return nullptr;
344+
break;
345+
case static_cast<std::int64_t>(fair::mq::TransferCode::error):
346+
EDDLOG_RL(1000, "STF receive failed. what=fair::mq::TransferCode::error err={} errno={} error={}",
347+
int(lRet), errno, std::string(strerror(errno)));
348+
mData.clear();
349+
return nullptr;
350+
break;
351+
default: // data or zero
352+
if (lRet > 0) {
353+
return deserialize_impl();
354+
} else {
355+
WDDLOG_RL(1000, "STF receive failed. what=zero_size");
356+
mData.clear();
357+
return nullptr;
358+
}
359+
break;
327360
}
328361

329-
return deserialize_impl();
362+
assert (false);
363+
return nullptr;
330364
}
331365

332366
std::unique_ptr<SubTimeFrame> CoalescedHdrDataDeserializer::deserialize(std::vector<FairMQMessagePtr>& pMsgs)
@@ -413,5 +447,4 @@ std::unique_ptr<SubTimeFrame> CoalescedHdrDataDeserializer::deserialize_impl()
413447
return lStf;
414448
}
415449

416-
}
417450
} /* o2::DataDistribution */

src/common/SubTimeFrameVisitors.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class InterleavedHdrDataDeserializer : public ISubTimeFrameVisitor
6464
InterleavedHdrDataDeserializer() = default;
6565
virtual ~InterleavedHdrDataDeserializer() = default;
6666

67-
std::unique_ptr<SubTimeFrame> deserialize(FairMQChannel& pChan);
67+
std::unique_ptr<SubTimeFrame> deserialize(FairMQChannel& pChan, bool pLogError = false);
6868
std::unique_ptr<SubTimeFrame> deserialize(FairMQParts& pMsgs);
6969

7070
protected:
@@ -122,7 +122,7 @@ class CoalescedHdrDataDeserializer : public ISubTimeFrameVisitor
122122
: mTfBld(pTfBld) { }
123123
virtual ~CoalescedHdrDataDeserializer() = default;
124124

125-
std::unique_ptr<SubTimeFrame> deserialize(FairMQChannel& pChan);
125+
std::unique_ptr<SubTimeFrame> deserialize(FairMQChannel& pChan, bool pLogError = false);
126126
std::unique_ptr<SubTimeFrame> deserialize(std::vector<FairMQMessagePtr>& pMsgs);
127127

128128
protected:

0 commit comments

Comments
 (0)