@@ -52,13 +52,28 @@ bool TfBuilderInput::start(std::shared_ptr<ConsulTfBuilder> pConfig)
5252
5353 mNumStfSenders = lNumStfSenders;
5454
55- IDDLOG (" Creating input channels. num_channels={} partition={}" ,
56- mNumStfSenders , lStatus.partition ().partition_id ());
55+ IDDLOG (" Creating input channels. num_channels={} partition={}" , mNumStfSenders , lStatus.partition ().partition_id ());
5756
5857 const auto &lAaddress = lStatus.info ().ip_address ();
5958
6059 auto &lSocketMap = *(lStatus.mutable_sockets ()->mutable_map ());
6160
61+ // ZEROMQ high watermark on receive
62+ int lRcvBufSize = 50 ;
63+ {
64+ const auto lTfBZmqHwmVar = getenv (" DATADIST_TF_ZMQ_RCVHWM" );
65+ if (lTfBZmqHwmVar) {
66+ try {
67+ const int lTfBZmqHwm = std::stoi (lTfBZmqHwmVar);
68+ lRcvBufSize = std::max (lTfBZmqHwm, 4 );
69+ IDDLOG (" TfBuilderInput: RcvBufSize is set to {}. DATADIST_TF_ZMQ_RCVHWM={}" , lRcvBufSize, lRcvBufSize);
70+ } catch (...) {
71+ EDDLOG (" (Sub)TimeFrame source: DATADIST_TF_ZMQ_RCVHWM must be greater than 3. DATADIST_TF_ZMQ_RCVHWM={}" ,
72+ lTfBZmqHwmVar);
73+ }
74+ }
75+ }
76+
6277 for (std::uint32_t lSocketIdx = 0 ; lSocketIdx < mNumStfSenders ; lSocketIdx++) {
6378
6479 std::string lAddress = " tcp://" + lAaddress + " :" + std::to_string (10000 + lSocketIdx);
@@ -73,7 +88,7 @@ bool TfBuilderInput::start(std::shared_ptr<ConsulTfBuilder> pConfig)
7388
7489 lNewChannel->UpdateRateLogging (1 ); // log each second
7590 lNewChannel->UpdateAutoBind (true ); // make sure bind succeeds
76- lNewChannel->UpdateRcvBufSize (200 ); // make sure one sender does not advance too much
91+ lNewChannel->UpdateRcvBufSize (lRcvBufSize ); // make sure one sender does not advance too much
7792 lNewChannel->UpdateRcvKernelSize (2 << 20 );
7893 lNewChannel->Init ();
7994
0 commit comments