diff --git a/core-framework/include/core/BufferedContentSession.h b/core-framework/include/core/BufferedContentSession.h index f989877350..c32fb1e5d3 100644 --- a/core-framework/include/core/BufferedContentSession.h +++ b/core-framework/include/core/BufferedContentSession.h @@ -44,6 +44,8 @@ class BufferedContentSession : public ContentSessionImpl { std::shared_ptr read(const std::shared_ptr& resource_id) override; + void remove(const std::shared_ptr& resource_id) override; + void commit() override; void rollback() override; diff --git a/core-framework/src/core/BufferedContentSession.cpp b/core-framework/src/core/BufferedContentSession.cpp index 418da5c161..7fbb90f842 100644 --- a/core-framework/src/core/BufferedContentSession.cpp +++ b/core-framework/src/core/BufferedContentSession.cpp @@ -95,6 +95,11 @@ void BufferedContentSession::commit() { append_state_.clear(); } +void BufferedContentSession::remove(const std::shared_ptr& resource_id) { + managed_resources_.erase(resource_id); + append_state_.erase(resource_id); +} + void BufferedContentSession::rollback() { managed_resources_.clear(); append_state_.clear(); diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp index 835487039b..0e63a6e64a 100644 --- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp +++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp @@ -317,6 +317,12 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash (Ro ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared()); } +TEST_CASE("ProcessSession::write can be cancelled") { + ContentRepositoryDependentTests::testOkWrite(std::make_shared()); + ContentRepositoryDependentTests::testErrWrite(std::make_shared()); + ContentRepositoryDependentTests::testCancelWrite(std::make_shared()); +} + size_t getDbSize(const std::filesystem::path& dir) { auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string(), {}); auto opendb = db->open(); diff --git a/libminifi/include/core/ForwardingContentSession.h b/libminifi/include/core/ForwardingContentSession.h index 620c71fa3c..2699ee7458 100644 --- a/libminifi/include/core/ForwardingContentSession.h +++ b/libminifi/include/core/ForwardingContentSession.h @@ -43,6 +43,8 @@ class ForwardingContentSession : public ContentSessionImpl { std::shared_ptr read(const std::shared_ptr& resource_id) override; + void remove(const std::shared_ptr& resource_id) override; + void commit() override; void rollback() override; diff --git a/libminifi/src/core/ForwardingContentSession.cpp b/libminifi/src/core/ForwardingContentSession.cpp index cc7999ee8c..17c715f37d 100644 --- a/libminifi/src/core/ForwardingContentSession.cpp +++ b/libminifi/src/core/ForwardingContentSession.cpp @@ -51,6 +51,12 @@ std::shared_ptr ForwardingContentSession::append(const std::shar return repository_->write(*resource_id, true); } +void ForwardingContentSession::remove(const std::shared_ptr& resource_id) { + created_claims_.erase(resource_id); + append_state_.erase(resource_id); + repository_->remove(*resource_id); +} + void ForwardingContentSession::commit() { created_claims_.clear(); append_state_.clear(); diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 576ed3a91a..edf13c9004 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -31,10 +31,11 @@ #include #include "core/ProcessSessionReadCallback.h" -#include "io/StreamSlice.h" +#include "core/Processor.h" #include "io/StreamPipe.h" +#include "io/StreamSlice.h" +#include "minifi-c/minifi-c.h" #include "minifi-cpp/utils/gsl.h" -#include "core/Processor.h" /* This implementation is only for native Windows systems. */ #if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__ @@ -256,7 +257,15 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const io::OutputStreamCallb if (nullptr == stream) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write"); } - if (callback(stream) < 0) { + const auto callback_result = callback(stream); + if (callback_result == MinifiIoStatus::MINIFI_IO_CANCEL) { + stream->close(); + content_session_->remove(claim); + claim.reset(); + return; + } + + if (callback_result < 0) { throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); } diff --git a/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h b/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h index f7d6bb7102..b15bc2f477 100644 --- a/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h +++ b/libminifi/test/libtest/unit/ContentRepositoryDependentTests.h @@ -16,19 +16,19 @@ * limitations under the License. */ +#pragma once + #include #include #include -#include #include "catch2/catch_test_macros.hpp" -#include "core/Processor.h" #include "core/ProcessSession.h" -#include "core/Resource.h" -#include "unit/TestBase.h" -#include "unit/Catch.h" -#include "unit/DummyProcessor.h" +#include "core/Processor.h" #include "io/StreamPipe.h" +#include "minifi-c/minifi-c.h" +#include "unit/DummyProcessor.h" +#include "unit/TestBase.h" #pragma once @@ -42,11 +42,11 @@ struct ReadUntilItCan { std::array buffer{}; size_t bytes_read = 0; while (true) { - size_t read_result = stream->read(buffer); + const size_t read_result = stream->read(buffer); if (minifi::io::isError(read_result)) return -1; if (read_result == 0) - return bytes_read; + return gsl::narrow(bytes_read); bytes_read += read_result; const auto char_view = gsl::make_span(buffer).subspan(0, read_result).as_span(); value_.append(std::begin(char_view), std::end(char_view)); @@ -69,18 +69,18 @@ class Fixture { process_session_ = std::make_unique(context_); } - core::ProcessSession &processSession() { return *process_session_; } + [[nodiscard]] core::ProcessSession& processSession() const { return *process_session_; } - void transferAndCommit(const std::shared_ptr& flow_file) { + void transferAndCommit(const std::shared_ptr& flow_file) const { process_session_->transfer(flow_file, Success); process_session_->commit(); } - void writeToFlowFile(const std::shared_ptr& flow_file, const std::string content) { + void writeToFlowFile(const std::shared_ptr& flow_file, const std::string_view content) const { process_session_->writeBuffer(flow_file, content); } - void appendToFlowFile(const std::shared_ptr& flow_file, const std::string content_to_append) { + void appendToFlowFile(const std::shared_ptr& flow_file, const std::string_view content_to_append) const { process_session_->add(flow_file); process_session_->appendBuffer(flow_file, content_to_append); } @@ -94,8 +94,8 @@ class Fixture { std::unique_ptr process_session_; }; -void testReadOnSmallerClonedFlowFiles(std::shared_ptr content_repo) { - Fixture fixture = Fixture(std::move(content_repo)); +inline void testReadOnSmallerClonedFlowFiles(std::shared_ptr content_repo) { + auto fixture = Fixture(std::move(content_repo)); core::ProcessSession& process_session = fixture.processSession(); const auto original_ff = process_session.create(); fixture.writeToFlowFile(original_ff, "foobar"); @@ -123,8 +123,8 @@ void testReadOnSmallerClonedFlowFiles(std::shared_ptr c CHECK(read_until_it_can_callback.value_ == "bar"); } -void testAppendToUnmanagedFlowFile(std::shared_ptr content_repo) { - Fixture fixture = Fixture(std::move(content_repo)); +inline void testAppendToUnmanagedFlowFile(std::shared_ptr content_repo) { + auto fixture = Fixture(std::move(content_repo)); core::ProcessSession& process_session = fixture.processSession(); const auto flow_file = process_session.create(); REQUIRE(flow_file); @@ -142,8 +142,8 @@ void testAppendToUnmanagedFlowFile(std::shared_ptr cont CHECK(read_until_it_can_callback.value_ == "myfoobar"); } -void testAppendToManagedFlowFile(std::shared_ptr content_repo) { - Fixture fixture = Fixture(std::move(content_repo)); +inline void testAppendToManagedFlowFile(std::shared_ptr content_repo) { + auto fixture = Fixture(std::move(content_repo)); core::ProcessSession& process_session = fixture.processSession(); const auto flow_file = process_session.create(); REQUIRE(flow_file); @@ -160,8 +160,8 @@ void testAppendToManagedFlowFile(std::shared_ptr conten CHECK(read_until_it_can_callback.value_ == "myfoobar"); } -void testReadFromZeroLengthFlowFile(std::shared_ptr content_repo) { - Fixture fixture = Fixture(std::move(content_repo)); +inline void testReadFromZeroLengthFlowFile(std::shared_ptr content_repo) { + const auto fixture = Fixture(std::move(content_repo)); core::ProcessSession& process_session = fixture.processSession(); const auto flow_file = process_session.create(); REQUIRE(flow_file); @@ -171,4 +171,69 @@ void testReadFromZeroLengthFlowFile(std::shared_ptr con REQUIRE_NOTHROW(process_session.readBuffer(flow_file)); REQUIRE_NOTHROW(process_session.read(flow_file, ReadUntilItCan{})); } + +inline void testErrWrite(std::shared_ptr content_repo) { + const auto fixture = Fixture(std::move(content_repo)); + core::ProcessSession& process_session = fixture.processSession(); + const auto flow_file = process_session.create(); + fixture.writeToFlowFile(flow_file, "original_content"); + + REQUIRE_THROWS( + process_session.write(flow_file, [](const std::shared_ptr& output_stream) { + std::string str = "new_content"; + output_stream->write(as_bytes(std::span(str))); + return MinifiIoStatus::MINIFI_IO_ERROR; + })); + + fixture.transferAndCommit(flow_file); + + CHECK(flow_file->getSize() == 16); + ReadUntilItCan read_until_it_can_callback; + const auto read_result = process_session.readBuffer(flow_file); + process_session.read(flow_file, std::ref(read_until_it_can_callback)); + CHECK(to_string(read_result) == "original_content"); +} + +inline void testOkWrite(std::shared_ptr content_repo) { + const auto fixture = Fixture(std::move(content_repo)); + core::ProcessSession& process_session = fixture.processSession(); + const auto flow_file = process_session.create(); + fixture.writeToFlowFile(flow_file, "original_content"); + + CHECK(flow_file->getSize() == 16); + + process_session.write(flow_file, [](const std::shared_ptr& output_stream) { + std::string str = "new_content"; + return output_stream->write(as_bytes(std::span(str))); + }); + + fixture.transferAndCommit(flow_file); + + CHECK(flow_file->getSize() == 11); + ReadUntilItCan read_until_it_can_callback; + const auto read_result = process_session.readBuffer(flow_file); + process_session.read(flow_file, std::ref(read_until_it_can_callback)); + CHECK(to_string(read_result) == "new_content"); +} + +inline void testCancelWrite(std::shared_ptr content_repo) { + const auto fixture = Fixture(std::move(content_repo)); + core::ProcessSession& process_session = fixture.processSession(); + const auto flow_file = process_session.create(); + fixture.writeToFlowFile(flow_file, "original_content"); + + process_session.write(flow_file, [](const std::shared_ptr& output_stream) { + std::string str = "new_content"; + output_stream->write(as_bytes(std::span(str))); + return MinifiIoStatus::MINIFI_IO_CANCEL; + }); + + fixture.transferAndCommit(flow_file); + + CHECK(flow_file->getSize() == 16); + ReadUntilItCan read_until_it_can_callback; + const auto read_result = process_session.readBuffer(flow_file); + process_session.read(flow_file, std::ref(read_until_it_can_callback)); + CHECK(to_string(read_result) == "original_content"); +} } // namespace ContentRepositoryDependentTests diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp index 1987b00d65..5bd4d08ff8 100644 --- a/libminifi/test/unit/ProcessSessionTests.cpp +++ b/libminifi/test/unit/ProcessSessionTests.cpp @@ -20,7 +20,6 @@ #include #include "core/ProcessSession.h" -#include "core/Resource.h" #include "unit/TestBase.h" #include "unit/Catch.h" #include "unit/ContentRepositoryDependentTests.h" @@ -34,13 +33,13 @@ class Fixture { public: explicit Fixture(TestController::PlanConfig config = {}): plan_config_(std::move(config)) {} - minifi::core::ProcessSession &processSession() { return *process_session_; } + [[nodiscard]] minifi::core::ProcessSession& processSession() const { return *process_session_; } private: TestController test_controller_; TestController::PlanConfig plan_config_; std::shared_ptr test_plan_ = test_controller_.createPlan(plan_config_); - minifi::core::Processor* dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); + [[maybe_unused]] minifi::core::Processor* dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); std::shared_ptr context_ = [this] { test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }(); std::unique_ptr process_session_ = std::make_unique(context_); }; @@ -127,3 +126,13 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", " ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared()); ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared()); } + +TEST_CASE("Test ProcessSession::write's possible outcomes") { + ContentRepositoryDependentTests::testOkWrite(std::make_shared()); + ContentRepositoryDependentTests::testErrWrite(std::make_shared()); + ContentRepositoryDependentTests::testCancelWrite(std::make_shared()); + + ContentRepositoryDependentTests::testOkWrite(std::make_shared()); + ContentRepositoryDependentTests::testErrWrite(std::make_shared()); + ContentRepositoryDependentTests::testCancelWrite(std::make_shared()); +} diff --git a/minifi-api/include/minifi-c/minifi-c.h b/minifi-api/include/minifi-c/minifi-c.h index ce47d30cd9..d42c53aee6 100644 --- a/minifi-api/include/minifi-c/minifi-c.h +++ b/minifi-api/include/minifi-c/minifi-c.h @@ -41,6 +41,11 @@ extern "C" { typedef bool MinifiBool; +typedef enum MinifiIoStatus : int64_t { + MINIFI_IO_ERROR = -1, + MINIFI_IO_CANCEL = -125 +} MinifiIoStatus; + typedef enum MinifiInputRequirement : uint32_t { MINIFI_INPUT_REQUIRED = 0, MINIFI_INPUT_ALLOWED = 1, diff --git a/minifi-api/include/minifi-cpp/core/ContentSession.h b/minifi-api/include/minifi-cpp/core/ContentSession.h index ce12eab9f7..6d2fdaa442 100644 --- a/minifi-api/include/minifi-cpp/core/ContentSession.h +++ b/minifi-api/include/minifi-cpp/core/ContentSession.h @@ -39,6 +39,8 @@ class ContentSession { virtual std::shared_ptr read(const std::shared_ptr& resource_id) = 0; + virtual void remove(const std::shared_ptr& resource_id) = 0; + virtual void commit() = 0; virtual void rollback() = 0;