diff --git a/Makefile b/Makefile index f219063..47532a6 100644 --- a/Makefile +++ b/Makefile @@ -7,9 +7,9 @@ CC ?= gcc CFLAGS += -Wall -Werror -O2 -g -Ilibs CXX ?= g++ -CXXFLAGS += -std=c++11 -Wall -Wno-psabi -Werror -O2 -g -Ilibs +CXXFLAGS += -std=c++11 -Wall -Wno-psabi -Werror -O2 -g -Ilibs -DBOOST_ASIO_NO_DEPRECATED -LIBS=-lboost_system -lboost_program_options -lboost_regex -lboost_filesystem -lpthread -lstdc++ -lm +LIBS=-lboost_program_options -lboost_regex -lboost_filesystem -lpthread -lstdc++ -lm LIBS_SDR=-lSoapySDR all: dump978-fa skyaware978 diff --git a/debian/control b/debian/control index af67d68..d90bcc0 100644 --- a/debian/control +++ b/debian/control @@ -2,7 +2,7 @@ Source: dump978-fa Section: embedded Priority: extra Maintainer: Oliver Jowett -Build-Depends: debhelper(>=12), libboost-system-dev, libboost-program-options-dev, libboost-regex-dev, libboost-filesystem-dev, libsoapysdr-dev +Build-Depends: debhelper(>=12), libboost-program-options-dev, libboost-regex-dev, libboost-filesystem-dev, libsoapysdr-dev Standards-Version: 3.9.3 Homepage: http://www.flightaware.com/ Vcs-Git: https://github.com/flightaware/dump978.git diff --git a/dump978_main.cc b/dump978_main.cc index a1f0f64..5e08dfd 100644 --- a/dump978_main.cc +++ b/dump978_main.cc @@ -78,7 +78,7 @@ namespace flightaware { #define EXIT_NO_RESTART (64) static int realmain(int argc, char **argv) { - boost::asio::io_service io_service; + boost::asio::io_context io_context; // clang-format off po::options_description desc("Allowed options"); @@ -130,7 +130,7 @@ static int realmain(int argc, char **argv) { SampleSource::Pointer sample_source; MessageSource::Pointer message_source; - tcp::resolver resolver(io_service); + tcp::resolver resolver(io_context); if (opts.count("stdin") + opts.count("file") + opts.count("sdr") + opts.count("stratuxv3") != 1) { std::cerr << "Exactly one of --stdin, --file, --sdr, or --stratuxv3 must be used" << std::endl; @@ -138,16 +138,16 @@ static int realmain(int argc, char **argv) { } if (opts.count("stdin")) { - sample_source = StdinSampleSource::Create(io_service, opts); + sample_source = StdinSampleSource::Create(io_context, opts); } else if (opts.count("file")) { boost::filesystem::path path(opts["file"].as()); - sample_source = FileSampleSource::Create(io_service, path, opts); + sample_source = FileSampleSource::Create(io_context, path, opts); } else if (opts.count("sdr")) { auto device = opts["sdr"].as(); - sample_source = SoapySampleSource::Create(io_service, device, opts); + sample_source = SoapySampleSource::Create(io_context, device, opts); } else if (opts.count("stratuxv3")) { auto path = opts["stratuxv3"].as(); - message_source = StratuxSerial::Create(io_service, path); + message_source = StratuxSerial::Create(io_context, path); } else { assert("impossible case" && false); } @@ -159,27 +159,29 @@ static int realmain(int argc, char **argv) { bool ok = true; for (auto l : opts[option].as>()) { - tcp::resolver::query query(l.host, l.port, tcp::resolver::query::passive); boost::system::error_code ec; - - bool success = false; - tcp::resolver::iterator end; - for (auto i = resolver.resolve(query, ec); i != end; ++i) { - const auto &endpoint = i->endpoint(); - - try { - auto listener = SocketListener::Create(io_service, endpoint, dispatch, factory); - listener->Start(); - std::cerr << option << ": listening for connections on " << endpoint << std::endl; - success = true; - } catch (boost::system::system_error &err) { - std::cerr << option << ": could not listen on " << endpoint << ": " << err.what() << std::endl; - ec = err.code(); + auto range = resolver.resolve(tcp::v4(), l.host, l.port, tcp::resolver::passive); + if (!ec) { + bool success = false; + for (auto i = range.begin(); i != range.end(); ++i) { + const auto &endpoint = i->endpoint(); + + try { + auto listener = SocketListener::Create(io_context, endpoint, dispatch, factory); + listener->Start(); + std::cerr << option << ": listening for connections on " << endpoint << std::endl; + success = true; + } catch (boost::system::system_error &err) { + std::cerr << option << ": could not listen on " << endpoint << ": " << err.what() << std::endl; + ec = err.code(); + } } - } - - if (!success) { - std::cerr << option << ": no available listening addresses" << std::endl; + if (!success) { + std::cerr << option << ": no available listening addresses" << std::endl; + ok = false; + } + } else { + std::cerr << option << ": failed to resolve " << l.host << ":" << l.port << ": " << ec.message() << std::endl; ok = false; } } @@ -247,21 +249,21 @@ static int realmain(int argc, char **argv) { } message_source->SetConsumer(std::bind(&MessageDispatch::Dispatch, &dispatch, std::placeholders::_1)); - message_source->SetErrorHandler([&io_service, &saw_error](const boost::system::error_code &ec) { + message_source->SetErrorHandler([&io_context, &saw_error](const boost::system::error_code &ec) { if (ec == boost::asio::error::eof) { std::cerr << "Message source reports EOF" << std::endl; } else { std::cerr << "Message source reports error: " << ec.message() << std::endl; saw_error = true; } - io_service.stop(); + io_context.stop(); }); - boost::asio::signal_set signals(io_service, SIGINT, SIGTERM); - signals.async_wait([&io_service, &saw_error](const boost::system::error_code &ec, int signum) { + boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); + signals.async_wait([&io_context, &saw_error](const boost::system::error_code &ec, int signum) { std::cerr << "Caught signal " << signum << ", exiting" << std::endl; saw_error = true; - io_service.stop(); + io_context.stop(); }); message_source->Start(); @@ -269,7 +271,7 @@ static int realmain(int argc, char **argv) { sample_source->Start(); } - io_service.run(); + io_context.run(); if (sample_source) { sample_source->Stop(); diff --git a/faup978_main.cc b/faup978_main.cc index 9636e0b..f9bbbc0 100644 --- a/faup978_main.cc +++ b/faup978_main.cc @@ -42,7 +42,7 @@ void validate(boost::any &v, const std::vector &values, connect_opt #define EXIT_NO_RESTART (64) static int realmain(int argc, char **argv) { - boost::asio::io_service io_service; + boost::asio::io_context io_context; // clang-format off po::options_description desc("Allowed options"); @@ -80,19 +80,19 @@ static int realmain(int argc, char **argv) { } auto connect = opts["connect"].as(); - auto input = RawInput::Create(io_service, connect.host, connect.port); - auto reporter = Reporter::Create(io_service); + auto input = RawInput::Create(io_context, connect.host, connect.port); + auto reporter = Reporter::Create(io_context); input->SetConsumer(std::bind(&Reporter::HandleMessages, reporter, std::placeholders::_1)); - input->SetErrorHandler([&io_service](const boost::system::error_code &ec) { + input->SetErrorHandler([&io_context](const boost::system::error_code &ec) { std::cerr << "Connection failed: " << ec.message() << std::endl; - io_service.stop(); + io_context.stop(); }); reporter->Start(); input->Start(); - io_service.run(); + io_context.run(); input->Stop(); reporter->Stop(); diff --git a/faup978_reporter.cc b/faup978_reporter.cc index 0e0131f..b024ef6 100644 --- a/faup978_reporter.cc +++ b/faup978_reporter.cc @@ -9,6 +9,8 @@ #include #include +#include + using namespace flightaware::uat; using namespace flightaware::faup978; @@ -59,8 +61,8 @@ void Reporter::PurgeOld() { } auto self(shared_from_this()); - purge_timer_.expires_from_now(timeout_ / 4); - purge_timer_.async_wait(strand_.wrap([this, self](const boost::system::error_code &ec) { + purge_timer_.expires_after(timeout_ / 4); + purge_timer_.async_wait(boost::asio::bind_executor(strand_, [this, self](const boost::system::error_code &ec) { if (!ec) { PurgeOld(); } @@ -88,8 +90,8 @@ void Reporter::PeriodicReport() { } auto self(shared_from_this()); - report_timer_.expires_from_now(interval_); - report_timer_.async_wait(strand_.wrap([this, self](const boost::system::error_code &ec) { + report_timer_.expires_after(interval_); + report_timer_.async_wait(boost::asio::bind_executor(strand_, [this, self](const boost::system::error_code &ec) { if (!ec) { PeriodicReport(); } diff --git a/faup978_reporter.h b/faup978_reporter.h index 02825c2..067b7fa 100644 --- a/faup978_reporter.h +++ b/faup978_reporter.h @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include @@ -32,7 +32,7 @@ namespace flightaware { static constexpr const char *TSV_VERSION_8U = "8U"; static constexpr const char *TSV_VERSION_8U_FIX = "8UF"; - static Pointer Create(boost::asio::io_service &service, std::chrono::milliseconds interval = std::chrono::milliseconds(500), std::chrono::milliseconds timeout = std::chrono::seconds(300)) { return Pointer(new Reporter(service, interval, timeout)); } + static Pointer Create(boost::asio::io_context &service, std::chrono::milliseconds interval = std::chrono::milliseconds(500), std::chrono::milliseconds timeout = std::chrono::seconds(300)) { return Pointer(new Reporter(service, interval, timeout)); } void Start(); void Stop(); @@ -40,7 +40,7 @@ namespace flightaware { void HandleMessages(flightaware::uat::SharedMessageVector messages); private: - Reporter(boost::asio::io_service &service, std::chrono::milliseconds interval, std::chrono::milliseconds timeout) : strand_(service), report_timer_(service), purge_timer_(service), interval_(interval), timeout_(timeout) { tracker_ = flightaware::uat::Tracker::Create(service, timeout); } + Reporter(boost::asio::io_context &service, std::chrono::milliseconds interval, std::chrono::milliseconds timeout) : strand_(service), report_timer_(service), purge_timer_(service), interval_(interval), timeout_(timeout) { tracker_ = flightaware::uat::Tracker::Create(service, timeout); } void PeriodicReport(); void PurgeOld(); @@ -48,7 +48,7 @@ namespace flightaware { const char *TSVVersion() const { return fecfix_ ? TSV_VERSION_8U_FIX : TSV_VERSION_8U; } - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; boost::asio::steady_timer report_timer_; boost::asio::steady_timer purge_timer_; std::chrono::milliseconds interval_; diff --git a/sample_source.cc b/sample_source.cc index 8f187e8..9f870df 100644 --- a/sample_source.cc +++ b/sample_source.cc @@ -4,6 +4,8 @@ #include "sample_source.h" +#include + #include #include @@ -22,7 +24,8 @@ void FileSampleSource::Start() { timestamp_ = 1; // always use synthetic timestamps for file sources auto self = std::static_pointer_cast(shared_from_this()); - service_.post(std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code())); + boost::asio::post(service_, + std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code())); } void FileSampleSource::Stop() { @@ -77,7 +80,8 @@ void FileSampleSource::ReadBlock(const boost::system::error_code &ec) { timer_.expires_at(next_block_); timer_.async_wait(std::bind(&FileSampleSource::ReadBlock, self, std::placeholders::_1)); } else { - service_.post(std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code())); + boost::asio::post(service_, + std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code())); } } diff --git a/sample_source.h b/sample_source.h index 8a94858..5eb6ef7 100644 --- a/sample_source.h +++ b/sample_source.h @@ -12,7 +12,7 @@ #include #include -#include +#include #include #include #include @@ -61,7 +61,7 @@ namespace flightaware { class FileSampleSource : public SampleSource { public: - static SampleSource::Pointer Create(boost::asio::io_service &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options = boost::program_options::variables_map(), std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new FileSampleSource(service, path, options, samples_per_second, samples_per_block)); } + static SampleSource::Pointer Create(boost::asio::io_context &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options = boost::program_options::variables_map(), std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new FileSampleSource(service, path, options, samples_per_second, samples_per_block)); } void Init() override {} void Start() override; @@ -69,7 +69,7 @@ namespace flightaware { SampleFormat Format() override { return format_; } private: - FileSampleSource(boost::asio::io_service &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : service_(service), path_(path), timer_(service) { + FileSampleSource(boost::asio::io_context &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : service_(service), path_(path), timer_(service) { if (!options.count("format")) { throw std::runtime_error("--format must be specified when using a file input"); } @@ -84,7 +84,7 @@ namespace flightaware { void ReadBlock(const boost::system::error_code &ec); - boost::asio::io_service &service_; + boost::asio::io_context &service_; boost::filesystem::path path_; SampleFormat format_; unsigned alignment_; @@ -100,7 +100,7 @@ namespace flightaware { class StdinSampleSource : public SampleSource { public: - static SampleSource::Pointer Create(boost::asio::io_service &service, const boost::program_options::variables_map &options, std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new StdinSampleSource(service, options, samples_per_second, samples_per_block)); } + static SampleSource::Pointer Create(boost::asio::io_context &service, const boost::program_options::variables_map &options, std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new StdinSampleSource(service, options, samples_per_second, samples_per_block)); } void Init() override {} void Start() override; @@ -108,7 +108,7 @@ namespace flightaware { SampleFormat Format() override { return format_; } private: - StdinSampleSource(boost::asio::io_service &service, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : samples_per_second_(samples_per_second), stream_(service), used_(0) { + StdinSampleSource(boost::asio::io_context &service, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : samples_per_second_(samples_per_second), stream_(service), used_(0) { if (!options.count("format")) { throw std::runtime_error("--format must be specified when using a file input"); } diff --git a/skyaware978_main.cc b/skyaware978_main.cc index 0f05b43..5395b6d 100644 --- a/skyaware978_main.cc +++ b/skyaware978_main.cc @@ -42,7 +42,7 @@ void validate(boost::any &v, const std::vector &values, connect_opt #define EXIT_NO_RESTART (64) static int realmain(int argc, char **argv) { - boost::asio::io_service io_service; + boost::asio::io_context io_context; // clang-format off po::options_description desc("Allowed options"); @@ -92,14 +92,14 @@ static int realmain(int argc, char **argv) { auto connect = opts["connect"].as(); auto reconnect_interval = opts["reconnect-interval"].as(); - auto input = RawInput::Create(io_service, connect.host, connect.port, std::chrono::milliseconds(reconnect_interval * 1000)); + auto input = RawInput::Create(io_context, connect.host, connect.port, std::chrono::milliseconds(reconnect_interval * 1000)); - auto tracker = Tracker::Create(io_service); + auto tracker = Tracker::Create(io_context); input->SetConsumer(std::bind(&Tracker::HandleMessages, tracker, std::placeholders::_1)); - input->SetErrorHandler([&io_service, reconnect_interval](const boost::system::error_code &ec) { + input->SetErrorHandler([&io_context, reconnect_interval](const boost::system::error_code &ec) { std::cerr << "Connection failed: " << ec.message() << std::endl; if (!reconnect_interval) { - io_service.stop(); + io_context.stop(); } }); @@ -109,13 +109,13 @@ static int realmain(int argc, char **argv) { } auto dir = opts["json-dir"].as(); - auto writer = SkyAwareWriter::Create(io_service, tracker, dir, std::chrono::milliseconds(1000), opts["history-count"].as(), std::chrono::milliseconds(opts["history-interval"].as() * 1000), location); + auto writer = SkyAwareWriter::Create(io_context, tracker, dir, std::chrono::milliseconds(1000), opts["history-count"].as(), std::chrono::milliseconds(opts["history-interval"].as() * 1000), location); writer->Start(); tracker->Start(); input->Start(); - io_service.run(); + io_context.run(); input->Stop(); tracker->Stop(); diff --git a/skyaware_writer.cc b/skyaware_writer.cc index 620eee4..ab4b780 100644 --- a/skyaware_writer.cc +++ b/skyaware_writer.cc @@ -9,6 +9,8 @@ #include #include +#include + #include "json.hpp" #include "track.h" @@ -218,8 +220,8 @@ void SkyAwareWriter::PeriodicWrite() { } auto self(shared_from_this()); - timer_.expires_from_now(interval_); - timer_.async_wait(strand_.wrap([this, self](const boost::system::error_code &ec) { + timer_.expires_after(interval_); + timer_.async_wait(boost::asio::bind_executor(strand_, [this, self](const boost::system::error_code &ec) { if (!ec) { PeriodicWrite(); } diff --git a/skyaware_writer.h b/skyaware_writer.h index 889b51d..b24dd6b 100644 --- a/skyaware_writer.h +++ b/skyaware_writer.h @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include #include @@ -24,17 +24,17 @@ namespace flightaware { public: typedef std::shared_ptr Pointer; - static Pointer Create(boost::asio::io_service &service, flightaware::uat::Tracker::Pointer tracker, const boost::filesystem::path &dir, std::chrono::milliseconds interval, unsigned history_count, std::chrono::milliseconds history_interval, boost::optional> location) { return Pointer(new SkyAwareWriter(service, tracker, dir, interval, history_count, history_interval, location)); } + static Pointer Create(boost::asio::io_context &service, flightaware::uat::Tracker::Pointer tracker, const boost::filesystem::path &dir, std::chrono::milliseconds interval, unsigned history_count, std::chrono::milliseconds history_interval, boost::optional> location) { return Pointer(new SkyAwareWriter(service, tracker, dir, interval, history_count, history_interval, location)); } void Start(); void Stop(); private: - SkyAwareWriter(boost::asio::io_service &service, flightaware::uat::Tracker::Pointer tracker, const boost::filesystem::path &dir, std::chrono::milliseconds interval, unsigned history_count, std::chrono::milliseconds history_interval, boost::optional> location) : strand_(service), timer_(service), tracker_(tracker), dir_(dir), interval_(interval), history_count_(history_count), history_interval_(history_interval), location_(location) {} + SkyAwareWriter(boost::asio::io_context &service, flightaware::uat::Tracker::Pointer tracker, const boost::filesystem::path &dir, std::chrono::milliseconds interval, unsigned history_count, std::chrono::milliseconds history_interval, boost::optional> location) : strand_(service), timer_(service), tracker_(tracker), dir_(dir), interval_(interval), history_count_(history_count), history_interval_(history_interval), location_(location) {} void PeriodicWrite(); - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; boost::asio::steady_timer timer_; flightaware::uat::Tracker::Pointer tracker_; boost::filesystem::path dir_; diff --git a/soapy_source.cc b/soapy_source.cc index b427b8e..7b5221e 100644 --- a/soapy_source.cc +++ b/soapy_source.cc @@ -141,7 +141,7 @@ class SoapySDRCategory : public boost::system::error_category { static SoapySDRCategory soapysdr_category; -SoapySampleSource::SoapySampleSource(boost::asio::io_service &service, const std::string &device_name, const boost::program_options::variables_map &options) : timer_(service), device_name_(device_name), options_(options) { +SoapySampleSource::SoapySampleSource(boost::asio::io_context &service, const std::string &device_name, const boost::program_options::variables_map &options) : timer_(service), device_name_(device_name), options_(options) { if (!log_handler_registered_.exchange(true)) { SoapySDR::registerLogHandler(SoapyLogger); SoapySDR::setLogLevel(SOAPY_SDR_NOTICE); @@ -277,9 +277,9 @@ void SoapySampleSource::Start() { void SoapySampleSource::Keepalive() { if (rx_thread_ && rx_thread_->joinable()) { - // Keep the io_service alive while the rx_thread is active + // Keep the io_context alive while the rx_thread is active auto self(shared_from_this()); - timer_.expires_from_now(std::chrono::milliseconds(1000)); + timer_.expires_after(std::chrono::milliseconds(1000)); timer_.async_wait([self, this](const boost::system::error_code &ec) { if (!ec) { Keepalive(); diff --git a/soapy_source.h b/soapy_source.h index d448e67..ee0278f 100644 --- a/soapy_source.h +++ b/soapy_source.h @@ -19,7 +19,7 @@ namespace flightaware { namespace uat { class SoapySampleSource : public SampleSource { public: - static SampleSource::Pointer Create(boost::asio::io_service &service, const std::string &device_name, const boost::program_options::variables_map &options) { return Pointer(new SoapySampleSource(service, device_name, options)); } + static SampleSource::Pointer Create(boost::asio::io_context &service, const std::string &device_name, const boost::program_options::variables_map &options) { return Pointer(new SoapySampleSource(service, device_name, options)); } virtual ~SoapySampleSource(); @@ -29,7 +29,7 @@ namespace flightaware { SampleFormat Format() override { return format_; } private: - SoapySampleSource(boost::asio::io_service &service, const std::string &device_name, const boost::program_options::variables_map &options); + SoapySampleSource(boost::asio::io_context &service, const std::string &device_name, const boost::program_options::variables_map &options); void Run(); void Keepalive(); diff --git a/socket_input.cc b/socket_input.cc index 4faced5..8b94c44 100644 --- a/socket_input.cc +++ b/socket_input.cc @@ -9,24 +9,24 @@ using boost::asio::ip::tcp; #include -RawInput::RawInput(boost::asio::io_service &service, const std::string &host, const std::string &port_or_service, std::chrono::milliseconds reconnect_interval) : host_(host), port_or_service_(port_or_service), reconnect_interval_(reconnect_interval), resolver_(service), socket_(service), reconnect_timer_(service), used_(0) { readbuf_.resize(8192); } +RawInput::RawInput(boost::asio::io_context &service, const std::string &host, const std::string &port_or_service, std::chrono::milliseconds reconnect_interval) : host_(host), port_or_service_(port_or_service), reconnect_interval_(reconnect_interval), resolver_(service), socket_(service), reconnect_timer_(service), used_(0) { readbuf_.resize(8192); } void RawInput::Start() { auto self(shared_from_this()); std::cerr << "Connecting to " << host_ << ":" << port_or_service_ << std::endl; - tcp::resolver::query query(host_, port_or_service_); - resolver_.async_resolve(query, [this, self](const boost::system::error_code &ec, tcp::resolver::iterator it) { - if (!ec) { - next_endpoint_ = it; - TryNextEndpoint(boost::asio::error::make_error_code(boost::asio::error::host_not_found)); - } else if (ec == boost::asio::error::operation_aborted) { - return; - } else { - HandleError(ec); - return; - } - }); + + resolver_.async_resolve(tcp::v4(), host_, port_or_service_, + [this, self](const boost::system::error_code &ec, tcp::resolver::results_type range) { + if (!ec) { + TryNextEndpoint(range.begin(), range.end(), boost::asio::error::make_error_code(boost::asio::error::host_not_found)); + } else if (ec == boost::asio::error::operation_aborted) { + return; + } else { + HandleError(ec); + return; + } + }); } void RawInput::Stop() { @@ -34,16 +34,18 @@ void RawInput::Stop() { socket_.close(); } -void RawInput::TryNextEndpoint(const boost::system::error_code &last_error) { - if (next_endpoint_ == tcp::resolver::iterator()) { +void RawInput::TryNextEndpoint(resolver::results_type::iterator next, + resolver::results_type::iterator end, + const boost::system::error_code &last_error) { + if (next == end) { // No more addresses to try HandleError(last_error); return; } - tcp::endpoint endpoint = *next_endpoint_++; + tcp::endpoint endpoint = *next++; auto self(shared_from_this()); - socket_.async_connect(endpoint, [this, self, endpoint](const boost::system::error_code &ec) { + socket_.async_connect(endpoint, [this, self, endpoint, next, end](const boost::system::error_code &ec) { if (!ec) { std::cerr << "Connected to " << endpoint << std::endl; ScheduleRead(); @@ -52,7 +54,7 @@ void RawInput::TryNextEndpoint(const boost::system::error_code &last_error) { } else { std::cerr << "connection to " << endpoint << " failed: " << ec.message() << std::endl; socket_.close(); - TryNextEndpoint(ec); + TryNextEndpoint(next, end, ec); } }); } @@ -92,7 +94,7 @@ void RawInput::HandleError(const boost::system::error_code &ec) { auto self(shared_from_this()); - reconnect_timer_.expires_from_now(reconnect_interval_); + reconnect_timer_.expires_after(reconnect_interval_); reconnect_timer_.async_wait([this, self](const boost::system::error_code &ec) { if (!ec) Start(); diff --git a/socket_input.h b/socket_input.h index eb684e3..4c5f3c6 100644 --- a/socket_input.h +++ b/socket_input.h @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include @@ -23,7 +23,7 @@ namespace flightaware { typedef std::shared_ptr Pointer; typedef std::function ErrorHandler; - static Pointer Create(boost::asio::io_service &service, const std::string &host, const std::string &port_or_service, std::chrono::milliseconds reconnect_interval = std::chrono::milliseconds(0)) { return Pointer(new RawInput(service, host, port_or_service, reconnect_interval)); } + static Pointer Create(boost::asio::io_context &service, const std::string &host, const std::string &port_or_service, std::chrono::milliseconds reconnect_interval = std::chrono::milliseconds(0)) { return Pointer(new RawInput(service, host, port_or_service, reconnect_interval)); } void Start(); void Stop(); @@ -31,9 +31,12 @@ namespace flightaware { void SetErrorHandler(ErrorHandler handler) { error_handler_ = handler; } private: - RawInput(boost::asio::io_service &service, const std::string &host, const std::string &port_or_service, std::chrono::milliseconds reconnect_interval); + RawInput(boost::asio::io_context &service, const std::string &host, const std::string &port_or_service, std::chrono::milliseconds reconnect_interval); - void TryNextEndpoint(const boost::system::error_code &last_error); + using resolver = boost::asio::ip::tcp::resolver; + void TryNextEndpoint(resolver::results_type::iterator next, + resolver::results_type::iterator end, + const boost::system::error_code &last_error); void ScheduleRead(); void ParseBuffer(); boost::optional ParseLine(const std::string &line); @@ -44,9 +47,8 @@ namespace flightaware { std::string port_or_service_; std::chrono::milliseconds reconnect_interval_; - boost::asio::ip::tcp::resolver resolver_; + resolver resolver_; boost::asio::ip::tcp::socket socket_; - boost::asio::ip::tcp::resolver::iterator next_endpoint_; boost::asio::steady_timer reconnect_timer_; ErrorHandler error_handler_; diff --git a/socket_output.cc b/socket_output.cc index d22f4a4..0748dc1 100644 --- a/socket_output.cc +++ b/socket_output.cc @@ -16,30 +16,34 @@ using boost::asio::ip::tcp; using namespace flightaware::uat; -SocketOutput::SocketOutput(asio::io_service &service, tcp::socket &&socket) : strand_(service), socket_(std::move(socket)), peer_(socket_.remote_endpoint()), flush_pending_(false) {} +SocketOutput::SocketOutput(asio::io_context &service, tcp::socket &&socket) : strand_(service), socket_(std::move(socket)), peer_(socket_.remote_endpoint()), flush_pending_(false) {} void SocketOutput::Start() { ReadAndDiscard(); } void SocketOutput::ReadAndDiscard() { auto self(shared_from_this()); auto buf = std::make_shared(512); - socket_.async_read_some(asio::buffer(*buf), strand_.wrap([this, self, buf](const boost::system::error_code &ec, std::size_t len) { + + auto completion = ([this, self, buf](const boost::system::error_code &ec, std::size_t len) { if (ec) { HandleError(ec); } else { ReadAndDiscard(); } - })); + }); + socket_.async_read_some(asio::buffer(*buf), + boost::asio::bind_executor(strand_, completion)); } void SocketOutput::Write(SharedMessageVector messages) { auto self(shared_from_this()); - strand_.dispatch([this, self, messages]() { - if (IsOpen()) { - InternalWrite(messages); - Flush(); - } - }); + boost::asio::dispatch(strand_, + [this, self, messages]() { + if (IsOpen()) { + InternalWrite(messages); + Flush(); + } + }); } void SocketOutput::Flush() { @@ -54,7 +58,7 @@ void SocketOutput::Flush() { outbuf_.str(std::string()); auto self(shared_from_this()); - async_write(socket_, boost::asio::buffer(*writebuf), strand_.wrap([this, self, writebuf](const boost::system::error_code &ec, size_t len) { + auto completion = ([this, self, writebuf](const boost::system::error_code &ec, size_t len) { flush_pending_ = false; if (ec) { HandleError(ec); @@ -62,7 +66,10 @@ void SocketOutput::Flush() { } Flush(); // maybe some more data arrived - })); + }); + async_write(socket_, + boost::asio::buffer(*writebuf), + boost::asio::bind_executor(strand_, completion)); } void SocketOutput::HandleError(const boost::system::error_code &ec) { @@ -108,7 +115,7 @@ void JsonOutput::InternalWrite(SharedMessageVector messages) { ////////////// -SocketListener::SocketListener(asio::io_service &service, const tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory) : service_(service), acceptor_(service), endpoint_(endpoint), socket_(service), dispatch_(dispatch), factory_(factory) {} +SocketListener::SocketListener(asio::io_context &service, const tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory) : service_(service), acceptor_(service), endpoint_(endpoint), socket_(service), dispatch_(dispatch), factory_(factory) {} void SocketListener::Start() { acceptor_.open(endpoint_.protocol()); diff --git a/socket_output.h b/socket_output.h index b7c2e53..932d246 100644 --- a/socket_output.h +++ b/socket_output.h @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include @@ -34,7 +34,7 @@ namespace flightaware { virtual ~SocketOutput() {}; protected: - SocketOutput(boost::asio::io_service &service_, boost::asio::ip::tcp::socket &&socket_); + SocketOutput(boost::asio::io_context &service_, boost::asio::ip::tcp::socket &&socket_); std::ostringstream &Buf() { return outbuf_; } virtual void InternalWrite(SharedMessageVector messages) = 0; @@ -44,7 +44,7 @@ namespace flightaware { void Flush(); void ReadAndDiscard(); - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; boost::asio::ip::tcp::socket socket_; boost::asio::ip::tcp::endpoint peer_; @@ -57,7 +57,7 @@ namespace flightaware { class RawOutput : public SocketOutput { public: // factory method, this class must always be constructed via make_shared - static Pointer Create(boost::asio::io_service &service, boost::asio::ip::tcp::socket &&socket, SharedMessageVector header) { return Pointer(new RawOutput(service, std::move(socket), header)); } + static Pointer Create(boost::asio::io_context &service, boost::asio::ip::tcp::socket &&socket, SharedMessageVector header) { return Pointer(new RawOutput(service, std::move(socket), header)); } void Start() override; @@ -65,7 +65,7 @@ namespace flightaware { void InternalWrite(SharedMessageVector messages) override; private: - RawOutput(boost::asio::io_service &service_, boost::asio::ip::tcp::socket &&socket_, SharedMessageVector header) : SocketOutput(service_, std::move(socket_)) { header_ = header; } + RawOutput(boost::asio::io_context &service_, boost::asio::ip::tcp::socket &&socket_, SharedMessageVector header) : SocketOutput(service_, std::move(socket_)) { header_ = header; } SharedMessageVector header_; }; @@ -73,32 +73,32 @@ namespace flightaware { class JsonOutput : public SocketOutput { public: // factory method, this class must always be constructed via make_shared - static Pointer Create(boost::asio::io_service &service, boost::asio::ip::tcp::socket &&socket) { return Pointer(new JsonOutput(service, std::move(socket))); } + static Pointer Create(boost::asio::io_context &service, boost::asio::ip::tcp::socket &&socket) { return Pointer(new JsonOutput(service, std::move(socket))); } protected: void InternalWrite(SharedMessageVector messages) override; private: - JsonOutput(boost::asio::io_service &service_, boost::asio::ip::tcp::socket &&socket_) : SocketOutput(service_, std::move(socket_)) {} + JsonOutput(boost::asio::io_context &service_, boost::asio::ip::tcp::socket &&socket_) : SocketOutput(service_, std::move(socket_)) {} }; class SocketListener : public std::enable_shared_from_this { public: typedef std::shared_ptr Pointer; - typedef std::function ConnectionFactory; + typedef std::function ConnectionFactory; // factory method, this class must always be constructed via make_shared - static Pointer Create(boost::asio::io_service &service, const boost::asio::ip::tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory) { return Pointer(new SocketListener(service, endpoint, dispatch, factory)); } + static Pointer Create(boost::asio::io_context &service, const boost::asio::ip::tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory) { return Pointer(new SocketListener(service, endpoint, dispatch, factory)); } void Start(); void Close(); private: - SocketListener(boost::asio::io_service &service, const boost::asio::ip::tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory); + SocketListener(boost::asio::io_context &service, const boost::asio::ip::tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory); void Accept(); - boost::asio::io_service &service_; + boost::asio::io_context &service_; boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::endpoint endpoint_; boost::asio::ip::tcp::socket socket_; diff --git a/stratux_serial.cc b/stratux_serial.cc index 186268a..083368c 100644 --- a/stratux_serial.cc +++ b/stratux_serial.cc @@ -34,7 +34,7 @@ enum class StratuxSerial::ParserState { MESSAGE // reading rssi / timestamp / payload }; -StratuxSerial::StratuxSerial(boost::asio::io_service &io_service, const std::string &path) : path_(path), port_(io_service), read_timer_(io_service), parser_state_(ParserState::PREAMBLE), preamble_index_(0) {} +StratuxSerial::StratuxSerial(boost::asio::io_context &io_context, const std::string &path) : path_(path), port_(io_context), read_timer_(io_context), parser_state_(ParserState::PREAMBLE), preamble_index_(0) {} void StratuxSerial::Start() { try { @@ -95,7 +95,7 @@ void StratuxSerial::StartReading() { // little more data arrives, but at least we don't have to do a bunch of work on every one of // those) if (len < read_buffer_size * 3 / 4) { - read_timer_.expires_from_now(read_interval); + read_timer_.expires_after(read_interval); read_timer_.async_wait([this, self](const boost::system::error_code &ec) { if (!ec) { StartReading(); diff --git a/stratux_serial.h b/stratux_serial.h index f8c545e..dd2b298 100644 --- a/stratux_serial.h +++ b/stratux_serial.h @@ -12,7 +12,7 @@ #include #include -#include +#include #include #include #include @@ -33,10 +33,10 @@ namespace flightaware { virtual void Start(); virtual void Stop(); - static Pointer Create(boost::asio::io_service &io_service, const std::string &path) { return Pointer(new StratuxSerial(io_service, path)); } + static Pointer Create(boost::asio::io_context &io_context, const std::string &path) { return Pointer(new StratuxSerial(io_context, path)); } protected: - StratuxSerial(boost::asio::io_service &io_service, const std::string &path); + StratuxSerial(boost::asio::io_context &io_context, const std::string &path); private: void StartReading(); diff --git a/track.cc b/track.cc index f2e319e..42fadb0 100644 --- a/track.cc +++ b/track.cc @@ -7,6 +7,8 @@ #include #include +#include + using namespace flightaware::uat; void AircraftState::UpdateFromMessage(const AdsbMessage &message) { @@ -121,8 +123,8 @@ void Tracker::PurgeOld() { } } auto self(shared_from_this()); - timer_.expires_from_now(timeout_ / 4); - timer_.async_wait(strand_.wrap([this, self](const boost::system::error_code &ec) { + timer_.expires_after(timeout_ / 4); + timer_.async_wait(boost::asio::bind_executor(strand_, [this, self](const boost::system::error_code &ec) { if (!ec) { PurgeOld(); } @@ -134,7 +136,7 @@ void Tracker::HandleMessages(SharedMessageVector messages) { const std::uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now() - unix_epoch).count(); auto self(shared_from_this()); - strand_.dispatch([this, self, now, messages]() { + boost::asio::dispatch(strand_, [this, self, now, messages]() { const std::uint64_t PAST_FUZZ = 15000; const std::uint64_t FUTURE_FUZZ = 1000; diff --git a/track.h b/track.h index e30d4dd..f23a18b 100644 --- a/track.h +++ b/track.h @@ -12,7 +12,7 @@ #include #include -#include +#include #include #include @@ -147,7 +147,7 @@ namespace flightaware { typedef std::shared_ptr Pointer; typedef std::map MapType; - static Pointer Create(boost::asio::io_service &service, std::chrono::milliseconds timeout = std::chrono::seconds(300)) { return Pointer(new Tracker(service, timeout)); } + static Pointer Create(boost::asio::io_context &service, std::chrono::milliseconds timeout = std::chrono::seconds(300)) { return Pointer(new Tracker(service, timeout)); } void Start(); void Stop(); @@ -159,11 +159,11 @@ namespace flightaware { void PurgeOld(); private: - Tracker(boost::asio::io_service &service, std::chrono::milliseconds timeout) : strand_(service), timer_(service), timeout_(timeout) {} + Tracker(boost::asio::io_context &service, std::chrono::milliseconds timeout) : strand_(service), timer_(service), timeout_(timeout) {} void HandleMessage(const AdsbMessage &message); - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; boost::asio::steady_timer timer_; std::chrono::milliseconds timeout_; MapType aircraft_;