Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ CC ?= gcc
CFLAGS += -Wall -Werror -O2 -g -Ilibs

CXX ?= g++
CXXFLAGS += -std=c++11 -Wall -Wno-psabi -Werror -O2 -g -Ilibs
CXXFLAGS += -std=c++11 -Wall -Wno-psabi -Werror -O2 -g -Ilibs -DBOOST_ASIO_NO_DEPRECATED

LIBS=-lboost_system -lboost_program_options -lboost_regex -lboost_filesystem -lpthread -lstdc++ -lm
LIBS=-lboost_program_options -lboost_regex -lboost_filesystem -lpthread -lstdc++ -lm
LIBS_SDR=-lSoapySDR

all: dump978-fa skyaware978
Expand Down
2 changes: 1 addition & 1 deletion debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Source: dump978-fa
Section: embedded
Priority: extra
Maintainer: Oliver Jowett <oliver.jowett@flightaware.com>
Build-Depends: debhelper(>=12), libboost-system-dev, libboost-program-options-dev, libboost-regex-dev, libboost-filesystem-dev, libsoapysdr-dev
Build-Depends: debhelper(>=12), libboost-program-options-dev, libboost-regex-dev, libboost-filesystem-dev, libsoapysdr-dev
Standards-Version: 3.9.3
Homepage: http://www.flightaware.com/
Vcs-Git: https://github.com/flightaware/dump978.git
Expand Down
64 changes: 33 additions & 31 deletions dump978_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ namespace flightaware {
#define EXIT_NO_RESTART (64)

static int realmain(int argc, char **argv) {
boost::asio::io_service io_service;
boost::asio::io_context io_context;

// clang-format off
po::options_description desc("Allowed options");
Expand Down Expand Up @@ -130,24 +130,24 @@ static int realmain(int argc, char **argv) {
SampleSource::Pointer sample_source;
MessageSource::Pointer message_source;

tcp::resolver resolver(io_service);
tcp::resolver resolver(io_context);

if (opts.count("stdin") + opts.count("file") + opts.count("sdr") + opts.count("stratuxv3") != 1) {
std::cerr << "Exactly one of --stdin, --file, --sdr, or --stratuxv3 must be used" << std::endl;
return EXIT_NO_RESTART;
}

if (opts.count("stdin")) {
sample_source = StdinSampleSource::Create(io_service, opts);
sample_source = StdinSampleSource::Create(io_context, opts);
} else if (opts.count("file")) {
boost::filesystem::path path(opts["file"].as<std::string>());
sample_source = FileSampleSource::Create(io_service, path, opts);
sample_source = FileSampleSource::Create(io_context, path, opts);
} else if (opts.count("sdr")) {
auto device = opts["sdr"].as<std::string>();
sample_source = SoapySampleSource::Create(io_service, device, opts);
sample_source = SoapySampleSource::Create(io_context, device, opts);
} else if (opts.count("stratuxv3")) {
auto path = opts["stratuxv3"].as<std::string>();
message_source = StratuxSerial::Create(io_service, path);
message_source = StratuxSerial::Create(io_context, path);
} else {
assert("impossible case" && false);
}
Expand All @@ -159,27 +159,29 @@ static int realmain(int argc, char **argv) {

bool ok = true;
for (auto l : opts[option].as<std::vector<listen_option>>()) {
tcp::resolver::query query(l.host, l.port, tcp::resolver::query::passive);
boost::system::error_code ec;

bool success = false;
tcp::resolver::iterator end;
for (auto i = resolver.resolve(query, ec); i != end; ++i) {
const auto &endpoint = i->endpoint();

try {
auto listener = SocketListener::Create(io_service, endpoint, dispatch, factory);
listener->Start();
std::cerr << option << ": listening for connections on " << endpoint << std::endl;
success = true;
} catch (boost::system::system_error &err) {
std::cerr << option << ": could not listen on " << endpoint << ": " << err.what() << std::endl;
ec = err.code();
auto range = resolver.resolve(tcp::v4(), l.host, l.port, tcp::resolver::passive);
if (!ec) {
bool success = false;
for (auto i = range.begin(); i != range.end(); ++i) {
const auto &endpoint = i->endpoint();

try {
auto listener = SocketListener::Create(io_context, endpoint, dispatch, factory);
listener->Start();
std::cerr << option << ": listening for connections on " << endpoint << std::endl;
success = true;
} catch (boost::system::system_error &err) {
std::cerr << option << ": could not listen on " << endpoint << ": " << err.what() << std::endl;
ec = err.code();
}
}
}

if (!success) {
std::cerr << option << ": no available listening addresses" << std::endl;
if (!success) {
std::cerr << option << ": no available listening addresses" << std::endl;
ok = false;
}
} else {
std::cerr << option << ": failed to resolve " << l.host << ":" << l.port << ": " << ec.message() << std::endl;
ok = false;
}
}
Expand Down Expand Up @@ -247,29 +249,29 @@ static int realmain(int argc, char **argv) {
}

message_source->SetConsumer(std::bind(&MessageDispatch::Dispatch, &dispatch, std::placeholders::_1));
message_source->SetErrorHandler([&io_service, &saw_error](const boost::system::error_code &ec) {
message_source->SetErrorHandler([&io_context, &saw_error](const boost::system::error_code &ec) {
if (ec == boost::asio::error::eof) {
std::cerr << "Message source reports EOF" << std::endl;
} else {
std::cerr << "Message source reports error: " << ec.message() << std::endl;
saw_error = true;
}
io_service.stop();
io_context.stop();
});

boost::asio::signal_set signals(io_service, SIGINT, SIGTERM);
signals.async_wait([&io_service, &saw_error](const boost::system::error_code &ec, int signum) {
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&io_context, &saw_error](const boost::system::error_code &ec, int signum) {
std::cerr << "Caught signal " << signum << ", exiting" << std::endl;
saw_error = true;
io_service.stop();
io_context.stop();
});

message_source->Start();
if (sample_source) {
sample_source->Start();
}

io_service.run();
io_context.run();

if (sample_source) {
sample_source->Stop();
Expand Down
12 changes: 6 additions & 6 deletions faup978_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void validate(boost::any &v, const std::vector<std::string> &values, connect_opt
#define EXIT_NO_RESTART (64)

static int realmain(int argc, char **argv) {
boost::asio::io_service io_service;
boost::asio::io_context io_context;

// clang-format off
po::options_description desc("Allowed options");
Expand Down Expand Up @@ -80,19 +80,19 @@ static int realmain(int argc, char **argv) {
}

auto connect = opts["connect"].as<connect_option>();
auto input = RawInput::Create(io_service, connect.host, connect.port);
auto reporter = Reporter::Create(io_service);
auto input = RawInput::Create(io_context, connect.host, connect.port);
auto reporter = Reporter::Create(io_context);

input->SetConsumer(std::bind(&Reporter::HandleMessages, reporter, std::placeholders::_1));
input->SetErrorHandler([&io_service](const boost::system::error_code &ec) {
input->SetErrorHandler([&io_context](const boost::system::error_code &ec) {
std::cerr << "Connection failed: " << ec.message() << std::endl;
io_service.stop();
io_context.stop();
});

reporter->Start();
input->Start();

io_service.run();
io_context.run();

input->Stop();
reporter->Stop();
Expand Down
10 changes: 6 additions & 4 deletions faup978_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <iostream>
#include <sstream>

#include <boost/asio/bind_executor.hpp>

using namespace flightaware::uat;
using namespace flightaware::faup978;

Expand Down Expand Up @@ -59,8 +61,8 @@ void Reporter::PurgeOld() {
}

auto self(shared_from_this());
purge_timer_.expires_from_now(timeout_ / 4);
purge_timer_.async_wait(strand_.wrap([this, self](const boost::system::error_code &ec) {
purge_timer_.expires_after(timeout_ / 4);
purge_timer_.async_wait(boost::asio::bind_executor(strand_, [this, self](const boost::system::error_code &ec) {
if (!ec) {
PurgeOld();
}
Expand Down Expand Up @@ -88,8 +90,8 @@ void Reporter::PeriodicReport() {
}

auto self(shared_from_this());
report_timer_.expires_from_now(interval_);
report_timer_.async_wait(strand_.wrap([this, self](const boost::system::error_code &ec) {
report_timer_.expires_after(interval_);
report_timer_.async_wait(boost::asio::bind_executor(strand_, [this, self](const boost::system::error_code &ec) {
if (!ec) {
PeriodicReport();
}
Expand Down
8 changes: 4 additions & 4 deletions faup978_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <chrono>
#include <memory>

#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>

Expand All @@ -32,23 +32,23 @@ namespace flightaware {
static constexpr const char *TSV_VERSION_8U = "8U";
static constexpr const char *TSV_VERSION_8U_FIX = "8UF";

static Pointer Create(boost::asio::io_service &service, std::chrono::milliseconds interval = std::chrono::milliseconds(500), std::chrono::milliseconds timeout = std::chrono::seconds(300)) { return Pointer(new Reporter(service, interval, timeout)); }
static Pointer Create(boost::asio::io_context &service, std::chrono::milliseconds interval = std::chrono::milliseconds(500), std::chrono::milliseconds timeout = std::chrono::seconds(300)) { return Pointer(new Reporter(service, interval, timeout)); }

void Start();
void Stop();

void HandleMessages(flightaware::uat::SharedMessageVector messages);

private:
Reporter(boost::asio::io_service &service, std::chrono::milliseconds interval, std::chrono::milliseconds timeout) : strand_(service), report_timer_(service), purge_timer_(service), interval_(interval), timeout_(timeout) { tracker_ = flightaware::uat::Tracker::Create(service, timeout); }
Reporter(boost::asio::io_context &service, std::chrono::milliseconds interval, std::chrono::milliseconds timeout) : strand_(service), report_timer_(service), purge_timer_(service), interval_(interval), timeout_(timeout) { tracker_ = flightaware::uat::Tracker::Create(service, timeout); }

void PeriodicReport();
void PurgeOld();
void ReportOneAircraft(const flightaware::uat::Tracker::AddressKey &key, const flightaware::uat::AircraftState &aircraft, std::uint64_t now);

const char *TSVVersion() const { return fecfix_ ? TSV_VERSION_8U_FIX : TSV_VERSION_8U; }

boost::asio::io_service::strand strand_;
boost::asio::io_context::strand strand_;
boost::asio::steady_timer report_timer_;
boost::asio::steady_timer purge_timer_;
std::chrono::milliseconds interval_;
Expand Down
8 changes: 6 additions & 2 deletions sample_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "sample_source.h"

#include <boost/asio/post.hpp>

#include <chrono>
#include <iostream>

Expand All @@ -22,7 +24,8 @@ void FileSampleSource::Start() {
timestamp_ = 1; // always use synthetic timestamps for file sources

auto self = std::static_pointer_cast<FileSampleSource>(shared_from_this());
service_.post(std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code()));
boost::asio::post(service_,
std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code()));
}

void FileSampleSource::Stop() {
Expand Down Expand Up @@ -77,7 +80,8 @@ void FileSampleSource::ReadBlock(const boost::system::error_code &ec) {
timer_.expires_at(next_block_);
timer_.async_wait(std::bind(&FileSampleSource::ReadBlock, self, std::placeholders::_1));
} else {
service_.post(std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code()));
boost::asio::post(service_,
std::bind(&FileSampleSource::ReadBlock, self, boost::system::error_code()));
}
}

Expand Down
12 changes: 6 additions & 6 deletions sample_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include <functional>
#include <memory>

#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/filesystem.hpp>
Expand Down Expand Up @@ -61,15 +61,15 @@ namespace flightaware {

class FileSampleSource : public SampleSource {
public:
static SampleSource::Pointer Create(boost::asio::io_service &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options = boost::program_options::variables_map(), std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new FileSampleSource(service, path, options, samples_per_second, samples_per_block)); }
static SampleSource::Pointer Create(boost::asio::io_context &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options = boost::program_options::variables_map(), std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new FileSampleSource(service, path, options, samples_per_second, samples_per_block)); }

void Init() override {}
void Start() override;
void Stop() override;
SampleFormat Format() override { return format_; }

private:
FileSampleSource(boost::asio::io_service &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : service_(service), path_(path), timer_(service) {
FileSampleSource(boost::asio::io_context &service, const boost::filesystem::path &path, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : service_(service), path_(path), timer_(service) {
if (!options.count("format")) {
throw std::runtime_error("--format must be specified when using a file input");
}
Expand All @@ -84,7 +84,7 @@ namespace flightaware {

void ReadBlock(const boost::system::error_code &ec);

boost::asio::io_service &service_;
boost::asio::io_context &service_;
boost::filesystem::path path_;
SampleFormat format_;
unsigned alignment_;
Expand All @@ -100,15 +100,15 @@ namespace flightaware {

class StdinSampleSource : public SampleSource {
public:
static SampleSource::Pointer Create(boost::asio::io_service &service, const boost::program_options::variables_map &options, std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new StdinSampleSource(service, options, samples_per_second, samples_per_block)); }
static SampleSource::Pointer Create(boost::asio::io_context &service, const boost::program_options::variables_map &options, std::size_t samples_per_second = 2083333, std::size_t samples_per_block = 524288) { return Pointer(new StdinSampleSource(service, options, samples_per_second, samples_per_block)); }

void Init() override {}
void Start() override;
void Stop() override;
SampleFormat Format() override { return format_; }

private:
StdinSampleSource(boost::asio::io_service &service, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : samples_per_second_(samples_per_second), stream_(service), used_(0) {
StdinSampleSource(boost::asio::io_context &service, const boost::program_options::variables_map &options, std::size_t samples_per_second, std::size_t samples_per_block) : samples_per_second_(samples_per_second), stream_(service), used_(0) {
if (!options.count("format")) {
throw std::runtime_error("--format must be specified when using a file input");
}
Expand Down
14 changes: 7 additions & 7 deletions skyaware978_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void validate(boost::any &v, const std::vector<std::string> &values, connect_opt
#define EXIT_NO_RESTART (64)

static int realmain(int argc, char **argv) {
boost::asio::io_service io_service;
boost::asio::io_context io_context;

// clang-format off
po::options_description desc("Allowed options");
Expand Down Expand Up @@ -92,14 +92,14 @@ static int realmain(int argc, char **argv) {

auto connect = opts["connect"].as<connect_option>();
auto reconnect_interval = opts["reconnect-interval"].as<unsigned>();
auto input = RawInput::Create(io_service, connect.host, connect.port, std::chrono::milliseconds(reconnect_interval * 1000));
auto input = RawInput::Create(io_context, connect.host, connect.port, std::chrono::milliseconds(reconnect_interval * 1000));

auto tracker = Tracker::Create(io_service);
auto tracker = Tracker::Create(io_context);
input->SetConsumer(std::bind(&Tracker::HandleMessages, tracker, std::placeholders::_1));
input->SetErrorHandler([&io_service, reconnect_interval](const boost::system::error_code &ec) {
input->SetErrorHandler([&io_context, reconnect_interval](const boost::system::error_code &ec) {
std::cerr << "Connection failed: " << ec.message() << std::endl;
if (!reconnect_interval) {
io_service.stop();
io_context.stop();
}
});

Expand All @@ -109,13 +109,13 @@ static int realmain(int argc, char **argv) {
}

auto dir = opts["json-dir"].as<std::string>();
auto writer = SkyAwareWriter::Create(io_service, tracker, dir, std::chrono::milliseconds(1000), opts["history-count"].as<unsigned>(), std::chrono::milliseconds(opts["history-interval"].as<unsigned>() * 1000), location);
auto writer = SkyAwareWriter::Create(io_context, tracker, dir, std::chrono::milliseconds(1000), opts["history-count"].as<unsigned>(), std::chrono::milliseconds(opts["history-interval"].as<unsigned>() * 1000), location);

writer->Start();
tracker->Start();
input->Start();

io_service.run();
io_context.run();

input->Stop();
tracker->Stop();
Expand Down
6 changes: 4 additions & 2 deletions skyaware_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <iostream>
#include <sstream>

#include <boost/asio/bind_executor.hpp>

#include "json.hpp"

#include "track.h"
Expand Down Expand Up @@ -218,8 +220,8 @@ void SkyAwareWriter::PeriodicWrite() {
}

auto self(shared_from_this());
timer_.expires_from_now(interval_);
timer_.async_wait(strand_.wrap([this, self](const boost::system::error_code &ec) {
timer_.expires_after(interval_);
timer_.async_wait(boost::asio::bind_executor(strand_, [this, self](const boost::system::error_code &ec) {
if (!ec) {
PeriodicWrite();
}
Expand Down
Loading