[MEDIUM] Patch libarrow for CVE-2026-25087#16145
[MEDIUM] Patch libarrow for CVE-2026-25087#16145durgajagadeesh wants to merge 5 commits intomicrosoft:3.0-devfrom
Conversation
|
Buddy build has License warning. I am working on to resolve this issue.. |
|
|
0c425be to
43a2202
Compare
🔒 CVE Patch Review: CVE-2026-25087PR #16145 — [MEDIUM] Patch libarrow for CVE-2026-25087 Spec File Validation
Build Verification
🤖 AI Build Log Analysis
🧪 Test Log AnalysisNo test log found (package may not have a %check section). Patch Analysis
Raw diff (upstream vs PR)--- upstream
+++ pr
@@ -1,771 +1,848 @@
-From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
-From: Antoine Pitrou <antoine@python.org>
-Date: Wed, 21 Jan 2026 17:54:00 +0100
-Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
-
----
- ci/scripts/cpp_test.sh | 9 +
- cpp/src/arrow/ipc/read_write_test.cc | 75 +++++----
- cpp/src/arrow/ipc/reader.cc | 222 ++++++++++++++++---------
- cpp/src/arrow/ipc/test_common.cc | 47 +++---
- cpp/src/arrow/type.h | 10 ++
- cpp/src/arrow/util/int_util_overflow.h | 33 ++++
- cpp/src/arrow/util/int_util_test.cc | 18 ++
- 7 files changed, 286 insertions(+), 128 deletions(-)
-
-diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
-index 0ad59bc308f..5d6d5e099ab 100755
---- a/ci/scripts/cpp_test.sh
-+++ b/ci/scripts/cpp_test.sh
-@@ -182,6 +182,15 @@ if [ "${ARROW_FUZZING}" == "ON" ]; then
- # Some fuzz regression files may trigger huge memory allocations,
- # let the allocator return null instead of aborting.
- export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
-+ export ARROW_FUZZING_VERBOSITY=1
-+ # Run golden IPC integration files: these should ideally load without errors,
-+ # though some very old ones carry invalid data (such as decimal values
-+ # larger than their advertised precision).
-+ # shellcheck disable=SC2046
-+ "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
-+ # shellcheck disable=SC2046
-+ "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
-+ # Run known crash files
- "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/crash-*
- "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/*-testcase-*
- "${binary_output_dir}/arrow-ipc-file-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-file/*-testcase-*
-diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
-index 315d8bd07d9..9f7df541bd7 100644
---- a/cpp/src/arrow/ipc/read_write_test.cc
-+++ b/cpp/src/arrow/ipc/read_write_test.cc
-@@ -1252,40 +1252,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
- Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr,
- MetadataVector* out_metadata_list = nullptr) override {
-- std::shared_ptr<io::RandomAccessFile> buf_reader;
-- if (kCoalesce) {
-- // Use a non-zero-copy enabled BufferReader so we can test paths properly
-- buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
-- } else {
-- buf_reader = std::make_shared<io::BufferReader>(buffer_);
-- }
-- AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
-+ // The generator doesn't track stats.
-+ EXPECT_EQ(nullptr, out_stats);
-
-- {
-- auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
-- // Do NOT assert OK since some tests check whether this fails properly
-- EXPECT_FINISHES(fut);
-- ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
-- EXPECT_EQ(num_batches_written_, reader->num_record_batches());
-- // Generator will keep reader alive internally
-- ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
-- }
-+ auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
-+ std::shared_ptr<io::RandomAccessFile> buf_reader;
-+ if (kCoalesce) {
-+ // Use a non-zero-copy enabled BufferReader so we can test paths properly
-+ buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
-+ } else {
-+ buf_reader = std::make_shared<io::BufferReader>(buffer_);
-+ }
-+ AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+diff --git a/SPECS/libarrow/CVE-2026-25087.patch b/SPECS/libarrow/CVE-2026-25087.patch
+new file mode 100644
+index 00000000000..6dc36806b10
+--- /dev/null
++++ b/SPECS/libarrow/CVE-2026-25087.patch
+@@ -0,0 +1,842 @@
++From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
++From: Antoine Pitrou <antoine@python.org>
++Date: Wed, 21 Jan 2026 17:54:00 +0100
++Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
+
-+ {
-+ auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
-+ ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
-+ EXPECT_EQ(num_batches_written_, reader->num_record_batches());
-+ if (pre_buffer) {
-+ RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
-+ }
-+ // Generator will keep reader alive internally
-+ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
-+ }
-
-- // Generator is async-reentrant
-- std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+ // Generator is async-reentrant
-+ std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+ for (int i = 0; i < num_batches_written_; ++i) {
-+ futures.push_back(generator());
-+ }
-+ auto fut = generator();
-+ ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
-+ EXPECT_EQ(nullptr, final_batch);
++Upstream Patch reference: https://patch-diff.githubusercontent.com/raw/apache/arrow/pull/48925.patch
+
-+ RecordBatchVector batches;
-+ for (auto& future : futures) {
-+ ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
-+ EXPECT_NE(nullptr, batch);
-+ batches.push_back(batch);
-+ }
-+ return batches;
-+ };
++---
++ ci/scripts/cpp_test.sh | 12 ++
++ cpp/src/arrow/ipc/read_write_test.cc | 75 +++++---
++ cpp/src/arrow/ipc/reader.cc | 252 +++++++++++++++++--------
++ cpp/src/arrow/ipc/test_common.cc | 47 +++--
++ cpp/src/arrow/type.h | 10 +
++ cpp/src/arrow/util/int_util_overflow.h | 33 ++++
++ cpp/src/arrow/util/int_util_test.cc | 18 ++
++ 7 files changed, 316 insertions(+), 131 deletions(-)
+
-+ ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
-+ // Also read with pre-buffered metadata, and check the results are equal
-+ ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
- for (int i = 0; i < num_batches_written_; ++i) {
-- futures.push_back(generator());
-- }
-- auto fut = generator();
-- EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
-- for (auto& future : futures) {
-- EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
-- out_batches->push_back(batch);
-+ AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
-+ /*check_metadata=*/true);
- }
--
-- // The generator doesn't track stats.
-- EXPECT_EQ(nullptr, out_stats);
--
- return Status::OK();
- }
- };
-diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
-index 8e125fc5ede..f1571f76c24 100644
---- a/cpp/src/arrow/ipc/reader.cc
-+++ b/cpp/src/arrow/ipc/reader.cc
-@@ -54,6 +54,7 @@
- #include "arrow/util/compression.h"
- #include "arrow/util/endian.h"
- #include "arrow/util/fuzz_internal.h"
-+#include "arrow/util/int_util_overflow.h"
- #include "arrow/util/key_value_metadata.h"
- #include "arrow/util/logging_internal.h"
- #include "arrow/util/parallel.h"
-@@ -72,6 +73,7 @@ namespace arrow {
-
- namespace flatbuf = org::apache::arrow::flatbuf;
-
-+using internal::AddWithOverflow;
- using internal::checked_cast;
- using internal::checked_pointer_cast;
-
-@@ -177,14 +179,16 @@ class ArrayLoader {
-
- explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
- MetadataVersion metadata_version, const IpcReadOptions& options,
-- int64_t file_offset)
-+ int64_t file_offset, int64_t file_length)
- : metadata_(metadata),
- metadata_version_(metadata_version),
- file_(nullptr),
- file_offset_(file_offset),
-+ file_length_(file_length),
- max_recursion_depth_(options.max_recursion_depth) {}
-
- Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
-+ // This construct permits overriding GetBuffer at compile time
- if (skip_io_) {
- return Status::OK();
- }
-@@ -194,7 +198,10 @@ class ArrayLoader {
- if (length < 0) {
- return Status::Invalid("Negative length for reading buffer ", buffer_index_);
- }
-- // This construct permits overriding GetBuffer at compile time
-+ auto read_end = AddWithOverflow({offset, length});
-+ if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
-+ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+ }
- if (!bit_util::IsMultipleOf8(offset)) {
- return Status::Invalid("Buffer ", buffer_index_,
- " did not start on 8-byte aligned offset: ", offset);
-@@ -202,6 +209,9 @@ class ArrayLoader {
- if (file_) {
- return file_->ReadAt(offset, length).Value(out);
- } else {
-+ if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
-+ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+ }
- read_request_.RequestRange(offset + file_offset_, length, out);
- return Status::OK();
- }
-@@ -292,6 +302,16 @@ class ArrayLoader {
- // we can skip that buffer without reading from shared memory
- RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
-
-+ if (::arrow::internal::has_variadic_buffers(type_id)) {
-+ ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
-+ GetVariadicCount(variadic_count_index_++));
-+ const int64_t start = static_cast<int64_t>(out_->buffers.size());
-+ // NOTE: this must be done before any other call to `GetBuffer` because
-+ // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
-+ // objects.
-+ out_->buffers.resize(start + data_buffer_count);
-+ }
++diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
++index 0c6e1c6..1110378 100755
++--- a/ci/scripts/cpp_test.sh
+++++ b/ci/scripts/cpp_test.sh
++@@ -107,6 +107,18 @@ fi
++
++ if [ "${ARROW_FUZZING}" == "ON" ]; then
++ # Fuzzing regression tests
+++ # Some fuzz regression files may trigger huge memory allocations,
+++ # let the allocator return null instead of aborting.
+++ export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
+++ export ARROW_FUZZING_VERBOSITY=1
+++ # Run golden IPC integration files: these should ideally load without errors,
+++ # though some very old ones carry invalid data (such as decimal values
+++ # larger than their advertised precision).
+++ # shellcheck disable=SC2046
+++ "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
+++ # shellcheck disable=SC2046
+++ "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
+++ # Run known crash files
++ ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/crash-*
++ ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/*-testcase-*
++ ${binary_output_dir}/arrow-ipc-file-fuzz ${ARROW_TEST_DATA}/arrow-ipc-file/*-testcase-*
++diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
++index bd2c2b7..af749ec 100644
++--- a/cpp/src/arrow/ipc/read_write_test.cc
+++++ b/cpp/src/arrow/ipc/read_write_test.cc
++@@ -1220,40 +1220,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
++ Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
++ ReadStats* out_stats = nullptr,
++ MetadataVector* out_metadata_list = nullptr) override {
++- std::shared_ptr<io::RandomAccessFile> buf_reader;
++- if (kCoalesce) {
++- // Use a non-zero-copy enabled BufferReader so we can test paths properly
++- buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
++- } else {
++- buf_reader = std::make_shared<io::BufferReader>(buffer_);
++- }
++- AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++ // The generator doesn't track stats.
+++ EXPECT_EQ(nullptr, out_stats);
++
++- {
++- auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
++- // Do NOT assert OK since some tests check whether this fails properly
++- EXPECT_FINISHES(fut);
++- ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
++- EXPECT_EQ(num_batches_written_, reader->num_record_batches());
++- // Generator will keep reader alive internally
++- ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
++- }
+++ auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
+++ std::shared_ptr<io::RandomAccessFile> buf_reader;
+++ if (kCoalesce) {
+++ // Use a non-zero-copy enabled BufferReader so we can test paths properly
+++ buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
+++ } else {
+++ buf_reader = std::make_shared<io::BufferReader>(buffer_);
+++ }
+++ AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++
+++ {
+++ auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
+++ ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
+++ EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+++ if (pre_buffer) {
+++ RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
+++ }
+++ // Generator will keep reader alive internally
+++ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
+++ }
++
++- // Generator is async-reentrant
++- std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++ // Generator is async-reentrant
+++ std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++ for (int i = 0; i < num_batches_written_; ++i) {
+++ futures.push_back(generator());
+++ }
+++ auto fut = generator();
+++ ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
+++ EXPECT_EQ(nullptr, final_batch);
+++
+++ RecordBatchVector batches;
+++ for (auto& future : futures) {
+++ ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
+++ EXPECT_NE(nullptr, batch);
+++ batches.push_back(batch);
+++ }
+++ return batches;
+++ };
+++
+++ ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
+++ // Also read with pre-buffered metadata, and check the results are equal
+++ ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
++ for (int i = 0; i < num_batches_written_; ++i) {
++- futures.push_back(generator());
++- }
++- auto fut = generator();
++- EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
++- for (auto& future : futures) {
++- EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
++- out_batches->push_back(batch);
+++ AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
+++ /*check_metadata=*/true);
++ }
++-
++- // The generator doesn't track stats.
++- EXPECT_EQ(nullptr, out_stats);
++-
++ return Status::OK();
++ }
++ };
++diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
++index d272c78..3a2987b 100644
++--- a/cpp/src/arrow/ipc/reader.cc
+++++ b/cpp/src/arrow/ipc/reader.cc
++@@ -52,6 +52,7 @@
++ #include "arrow/util/checked_cast.h"
++ #include "arrow/util/compression.h"
++ #include "arrow/util/endian.h"
+++#include "arrow/util/int_util_overflow.h"
++ #include "arrow/util/key_value_metadata.h"
++ #include "arrow/util/logging.h"
++ #include "arrow/util/parallel.h"
++@@ -73,6 +74,8 @@ namespace flatbuf = org::apache::arrow::flatbuf;
++ using internal::checked_cast;
++ using internal::checked_pointer_cast;
++
+++using internal::AddWithOverflow;
+++
++ namespace ipc {
++
++ using internal::FileBlock;
++@@ -166,23 +169,26 @@ class ArrayLoader {
++ public:
++ explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++ MetadataVersion metadata_version, const IpcReadOptions& options,
++- io::RandomAccessFile* file)
+++ io::RandomAccessFile* file, std::optional<int64_t> file_length)
++ : metadata_(metadata),
++ metadata_version_(metadata_version),
++ file_(file),
++ file_offset_(0),
+++ file_length_(file_length),
++ max_recursion_depth_(options.max_recursion_depth) {}
++
++ explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++ MetadataVersion metadata_version, const IpcReadOptions& options,
++- int64_t file_offset)
+++ int64_t file_offset, std::optional<int64_t> file_length)
++ : metadata_(metadata),
++ metadata_version_(metadata_version),
++ file_(nullptr),
++ file_offset_(file_offset),
+++ file_length_(file_length),
++ max_recursion_depth_(options.max_recursion_depth) {}
++
++ Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
+++ // This construct permits overriding GetBuffer at compile time
++ if (skip_io_) {
++ return Status::OK();
++ }
++@@ -192,7 +198,10 @@ class ArrayLoader {
++ if (length < 0) {
++ return Status::Invalid("Negative length for reading buffer ", buffer_index_);
++ }
++- // This construct permits overriding GetBuffer at compile time
+++ auto read_end = AddWithOverflow({offset, length});
+++ if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
+++ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++ }
++ if (!bit_util::IsMultipleOf8(offset)) {
++ return Status::Invalid("Buffer ", buffer_index_,
++ " did not start on 8-byte aligned offset: ", offset);
++@@ -200,6 +209,9 @@ class ArrayLoader {
++ if (file_) {
++ return file_->ReadAt(offset, length).Value(out);
++ } else {
+++ if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
+++ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++ }
++ read_request_.RequestRange(offset + file_offset_, length, out);
++ return Status::OK();
++ }
++@@ -284,6 +296,16 @@ class ArrayLoader {
++ // we can skip that buffer without reading from shared memory
++ RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
++
+++ if (::arrow::internal::has_variadic_buffers(type_id)) {
+++ ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
+++ GetVariadicCount(variadic_count_index_++));
+++ const int64_t start = static_cast<int64_t>(out_->buffers.size());
+++ // NOTE: this must be done before any other call to `GetBuffer` because
+++ // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
+++ // objects.
+++ out_->buffers.resize(start + data_buffer_count);
+++ }
+++
++ if (internal::HasValidityBitmap(type_id, metadata_version_)) {
++ // Extract null_bitmap which is common to all arrays except for unions
++ // and nulls.
++@@ -292,6 +314,7 @@ class ArrayLoader {
++ }
++ buffer_index_++;
++ }
+++
++ return Status::OK();
++ }
++
++@@ -390,14 +413,9 @@ class ArrayLoader {
++ Status Visit(const BinaryViewType& type) {
++ out_->buffers.resize(2);
++
++- RETURN_NOT_OK(LoadCommon(type.id()));
++- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
++-
++- ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
++- GetVariadicCount(variadic_count_index_++));
++- out_->buffers.resize(data_buffer_count + 2);
++- for (size_t i = 0; i < data_buffer_count; ++i) {
++- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
+++ RETURN_NOT_OK(LoadCommon(type.id())); // also initializes variadic buffers
+++ for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
+++ RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
++ }
++ return Status::OK();
++ }
++@@ -495,6 +513,7 @@ class ArrayLoader {
++ const MetadataVersion metadata_version_;
++ io::RandomAccessFile* file_;
++ int64_t file_offset_;
+++ std::optional<int64_t> file_length_;
++ int max_recursion_depth_;
++ int buffer_index_ = 0;
++ int field_index_ = 0;
++@@ -583,7 +602,12 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
++ const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema,
++ const std::vector<bool>* inclusion_mask, const IpcReadContext& context,
++ io::RandomAccessFile* file) {
++- ArrayLoader loader(metadata, context.metadata_version, context.options, file);
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
+++ ArrayLoader loader(metadata, context.metadata_version, context.options, file,
+++ file_length);
++
++ ArrayDataVector columns(schema->num_fields());
++ ArrayDataVector filtered_columns;
++@@ -832,8 +856,12 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
++ ARROW_ASSIGN_OR_RAISE(auto value_type, context.dictionary_memo->GetDictionaryType(id));
++
++ // Load the dictionary data from the dictionary batch
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
++ ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()),
++- context.options, file);
+++ context.options, file, file_length);
++ auto dict_data = std::make_shared<ArrayData>();
++ const Field dummy_field("", value_type);
++ RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get()));
++@@ -1152,8 +1180,19 @@ Result<std::shared_ptr<RecordBatchStreamReader>> RecordBatchStreamReader::Open(
++
++ // Common functions used in both the random-access file reader and the
++ // asynchronous generator
++-static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
++- return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
+++static inline Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
+++ int64_t max_offset) {
+++ auto block =
+++ FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
+++ if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
+++ return Status::IOError("Invalid Block in IPC file footer");
+++ }
+++ auto block_end =
+++ AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
+++ if (!block_end.has_value() || block_end > max_offset) {
+++ return Status::IOError("Invalid Block in IPC file footer");
+++ }
+++ return block;
++ }
++
++ Status CheckAligned(const FileBlock& block) {
++@@ -1267,7 +1306,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ const std::shared_ptr<Schema>& schema,
++ const std::vector<bool>* inclusion_mask,
++ MetadataVersion metadata_version = MetadataVersion::V5) {
++- ArrayLoader loader(metadata, metadata_version, options, file);
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
+++ ArrayLoader loader(metadata, metadata_version, options, file, file_length);
++ for (int i = 0; i < schema->num_fields(); ++i) {
++ const Field& field = *schema->field(i);
++ if (!inclusion_mask || (*inclusion_mask)[i]) {
++@@ -1336,8 +1379,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ read_options, file, schema, &inclusion_mask);
++ };
++ }
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++ ARROW_ASSIGN_OR_RAISE(auto message,
++- ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
+++ ReadMessageFromBlock(block, fields_loader));
++
++ CHECK_HAS_BODY(*message);
++ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++@@ -1353,8 +1397,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ Result<int64_t> CountRows() override {
++ int64_t total = 0;
++ for (int i = 0; i < num_record_batches(); i++) {
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++ ARROW_ASSIGN_OR_RAISE(auto outer_message,
++- ReadMessageFromBlock(GetRecordBatchBlock(i)));
+++ ReadMessageFromBlock(block));
++ auto metadata = outer_message->metadata();
++ const flatbuf::Message* message = nullptr;
++ RETURN_NOT_OK(
++@@ -1468,13 +1513,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Status DoPreBufferMetadata(const std::vector<int>& indices) {
++ RETURN_NOT_OK(CacheMetadata(indices));
++- EnsureDictionaryReadStarted();
+++ RETURN_NOT_OK(EnsureDictionaryReadStarted());
++ Future<> all_metadata_ready = WaitForMetadatas(indices);
++ for (int index : indices) {
++ Future<std::shared_ptr<Message>> metadata_loaded =
++ all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
++ stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
++- FileBlock block = GetRecordBatchBlock(index);
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++ ARROW_ASSIGN_OR_RAISE(
++ std::shared_ptr<Buffer> metadata,
++ metadata_cache_->Read({block.offset, block.metadata_length}));
++@@ -1523,12 +1568,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ }
++ };
++
++- FileBlock GetRecordBatchBlock(int i) const {
++- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+++ Result<FileBlock> GetRecordBatchBlock(int i) const {
+++ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
++ }
++
++- FileBlock GetDictionaryBlock(int i) const {
++- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+++ Result<FileBlock> GetDictionaryBlock(int i) const {
+++ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
++ }
++
++ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
++@@ -1541,16 +1586,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Status ReadDictionaries() {
++ // Read all the dictionaries
+++ std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
+++ for (int i = 0; i < num_dictionaries(); ++i) {
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
+++ ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
+++ }
+++ return ReadDictionaries(messages);
+++ }
+++
+++ Status ReadDictionaries(
+++ const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
+++ DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
++ IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
++ for (int i = 0; i < num_dictionaries(); ++i) {
++- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
++- RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
++- stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
+++ RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
++ }
++ return Status::OK();
++ }
++
++- Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
+++ Status ReadOneDictionary(int dict_index, Message* message,
+++ const IpcReadContext& context) {
++ CHECK_HAS_BODY(*message);
++ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++ DictionaryKind kind;
++@@ -1560,44 +1615,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ } else if (kind == DictionaryKind::Delta) {
++ stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
++ }
+++ stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
++ return Status::OK();
++ }
++
++- void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
+++ Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
++ // Adds all dictionaries to the range cache
++ for (int i = 0; i < num_dictionaries(); ++i) {
++- FileBlock block = GetDictionaryBlock(i);
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
++ ranges->push_back({block.offset, block.metadata_length + block.body_length});
++ }
+++ return Status::OK();
++ }
++
++- void AddMetadataRanges(const std::vector<int>& indices,
++- std::vector<io::ReadRange>* ranges) {
+++ Status AddMetadataRanges(const std::vector<int>& indices,
+++ std::vector<io::ReadRange>* ranges) {
++ for (int index : indices) {
++- FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++ ranges->push_back({block.offset, block.metadata_length});
++ }
+++ return Status::OK();
++ }
++
++ Status CacheMetadata(const std::vector<int>& indices) {
++ std::vector<io::ReadRange> ranges;
++ if (!read_dictionaries_) {
++- AddDictionaryRanges(&ranges);
+++ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++ }
++- AddMetadataRanges(indices, &ranges);
+++ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++ return metadata_cache_->Cache(std::move(ranges));
++ }
++
++- void EnsureDictionaryReadStarted() {
+++ Status EnsureDictionaryReadStarted() {
++ if (!dictionary_load_finished_.is_valid()) {
++ read_dictionaries_ = true;
++ std::vector<io::ReadRange> ranges;
++- AddDictionaryRanges(&ranges);
+++ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++ dictionary_load_finished_ =
++ metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
++ return ReadDictionaries();
++ });
++ }
+++ return Status::OK();
++ }
++
++ Status WaitForDictionaryReadFinished() {
++@@ -1615,7 +1674,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Future<> WaitForMetadatas(const std::vector<int>& indices) {
++ std::vector<io::ReadRange> ranges;
++- AddMetadataRanges(indices, &ranges);
+++ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++ return metadata_cache_->WaitFor(std::move(ranges));
++ }
++
++@@ -1659,12 +1718,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ const flatbuf::RecordBatch* batch,
++ IpcReadContext context, io::RandomAccessFile* file,
++ std::shared_ptr<io::RandomAccessFile> owned_file,
++- int64_t block_data_offset)
+++ int64_t block_data_offset, int64_t block_data_length)
++ : schema(std::move(sch)),
++ context(std::move(context)),
++ file(file),
++ owned_file(std::move(owned_file)),
++- loader(batch, context.metadata_version, context.options, block_data_offset),
+++ loader(batch, context.metadata_version, context.options, block_data_offset,
+++ block_data_length),
++ columns(schema->num_fields()),
++ cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
++ length(batch->length()) {}
++@@ -1763,14 +1823,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ return dictionary_load_finished_.Then([message_fut] { return message_fut; })
++ .Then([this, index](const std::shared_ptr<Message>& message_obj)
++ -> Future<std::shared_ptr<RecordBatch>> {
++- FileBlock block = GetRecordBatchBlock(index);
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
++ ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
++ ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
++ ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
++
++ auto read_context = std::make_shared<CachedRecordBatchReadContext>(
++ schema_, batch, std::move(context), file_, owned_file_,
++- block.offset + static_cast<int64_t>(block.metadata_length));
+++ block.offset + static_cast<int64_t>(block.metadata_length),
+++ block.body_length);
++ RETURN_NOT_OK(read_context->CalculateLoadRequest());
++ return read_context->ReadAsync().Then(
++ [read_context] { return read_context->CreateRecordBatch(); });
++@@ -1958,25 +2019,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
++ WholeIpcFileRecordBatchGenerator::operator()() {
++ auto state = state_;
++ if (!read_dictionaries_.is_valid()) {
++- std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
++- for (int i = 0; i < state->num_dictionaries(); i++) {
++- auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
++- messages[i] = ReadBlock(block);
++- }
++- auto read_messages = All(std::move(messages));
++- if (executor_) read_messages = executor_->Transfer(read_messages);
++- read_dictionaries_ = read_messages.Then(
++- [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
++- -> Status {
++- ARROW_ASSIGN_OR_RAISE(auto messages,
++- arrow::internal::UnwrapOrRaise(maybe_messages));
++- return ReadDictionaries(state.get(), std::move(messages));
++- });
+++ if (state->dictionary_load_finished_.is_valid()) {
+++ // PreBufferMetadata has started reading dictionaries in the background
+++ read_dictionaries_ = state->dictionary_load_finished_;
+++ } else {
+++ // Start reading dictionaries
+++ std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
+++ for (int i = 0; i < state->num_dictionaries(); i++) {
+++ ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
+++ messages[i] = ReadBlock(block);
+++ }
+++ auto read_messages = All(std::move(messages));
+++ if (executor_) read_messages = executor_->Transfer(read_messages);
+++ read_dictionaries_ = read_messages.Then(
+++ [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
+++ -> Status {
+++ ARROW_ASSIGN_OR_RAISE(auto messages,
+++ arrow::internal::UnwrapOrRaise(maybe_messages));
+++ return state->ReadDictionaries(messages);
+++ });
+++ }
++ }
++ if (index_ >= state_->num_record_batches()) {
++ return Future<Item>::MakeFinished(IterationTraits<Item>::End());
++ }
++- auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+++ ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
++ auto read_message = ReadBlock(block);
++ auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
++ // Force transfer. This may be wasteful in some cases, but ensures we get off the
++@@ -2012,16 +2079,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
++ }
++ }
++
++-Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
++- RecordBatchFileReaderImpl* state,
++- std::vector<std::shared_ptr<Message>> dictionary_messages) {
++- IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
++- for (const auto& message : dictionary_messages) {
++- RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
++- }
++- return Status::OK();
++-}
++-
++ Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
++ RecordBatchFileReaderImpl* state, Message* message) {
++ CHECK_HAS_BODY(*message);
++@@ -2598,23 +2655,37 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
++ return st;
++ }
++
+++Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
+++ if (batch.batch) {
+++ RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
+++ }
+++ // XXX do something with custom metadata?
+++ return Status::OK();
+++}
+++
++ } // namespace
++
+++IpcReadOptions FuzzingOptions() {
+++ IpcReadOptions options;
+++ options.memory_pool = default_memory_pool();
+++ options.max_recursion_depth = 256;
+++ return options;
+++}
+++
++ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++ auto buffer = std::make_shared<Buffer>(data, size);
++ io::BufferReader buffer_reader(buffer);
++
++- std::shared_ptr<RecordBatchReader> batch_reader;
++- ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
+++ ARROW_ASSIGN_OR_RAISE(auto batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
++ Status st;
++
++ while (true) {
++- std::shared_ptr<arrow::RecordBatch> batch;
++- RETURN_NOT_OK(batch_reader->ReadNext(&batch));
++- if (batch == nullptr) {
+++ ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
+++ if (!batch.batch && !batch.custom_metadata) {
+++ // EOS
++ break;
++ }
++- st &= ValidateFuzzBatch(*batch);
+++ st &= ValidateFuzzBatch(batch);
++ }
++
++ return st;
++@@ -2622,19 +2693,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++
++ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
++ auto buffer = std::make_shared<Buffer>(data, size);
++- io::BufferReader buffer_reader(buffer);
++
++- std::shared_ptr<RecordBatchFileReader> batch_reader;
++- ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchFileReader::Open(&buffer_reader));
++- Status st;
+++ Status final_status;
+++
+++ auto do_read = [&](bool pre_buffer) {
+++ io::BufferReader buffer_reader(buffer);
+++ ARROW_ASSIGN_OR_RAISE(auto batch_reader,
+++ RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
+++ if (pre_buffer) {
+++ // Pre-buffer all record batches
+++ RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
+++ }
++
++- const int n_batches = batch_reader->num_record_batches();
++- for (int i = 0; i < n_batches; ++i) {
++- ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
++- st &= ValidateFuzzBatch(*batch);
+++ const int n_batches = batch_reader->num_record_batches();
+++ for (int i = 0; i < n_batches; ++i) {
+++ RecordBatchWithMetadata batch;
+++ auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
+++ final_status &= st;
+++ if (!st.ok()) {
+++ continue;
+++ }
+++ final_status &= ValidateFuzzBatch(batch);
+++ }
+++ return Status::OK();
+++ };
+++
+++ for (const bool pre_buffer : {false, true}) {
+++ final_status &= do_read(pre_buffer);
++ }
++
++- return st;
+++ return final_status;
++ }
++
++ Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
++diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
++index 87c02e2..3a632fe 100644
++--- a/cpp/src/arrow/ipc/test_common.cc
+++++ b/cpp/src/arrow/ipc/test_common.cc
++@@ -16,6 +16,7 @@
++ // under the License.
++
++ #include <algorithm>
+++#include <concepts>
++ #include <cstdint>
++ #include <functional>
++ #include <memory>
++@@ -362,19 +363,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
++ return builder.Finish(out);
++ }
++
++-template <class BuilderType>
++-static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
++- MemoryPool* pool,
++- std::shared_ptr<Array>* out) {
++- BuilderType builder(pool);
+++template <std::derived_from<ArrayBuilder> BuilderType>
+++static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
+++ BuilderType builder, int64_t length, bool include_nulls) {
+++ if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
+++ // Try to emit several variadic buffers by choosing a small block size.
+++ builder.SetBlockSize(512);
+++ }
++ for (int64_t i = 0; i < length; ++i) {
++ if (include_nulls && (i % 7 == 0)) {
++ RETURN_NOT_OK(builder.AppendNull());
++ } else {
++- RETURN_NOT_OK(builder.Append(std::to_string(i)));
+++ // Make sure that some strings are long enough to have non-inline binary views
+++ const auto base = std::to_string(i);
+++ std::string value;
+++ for (int64_t j = 0; j < 3 * (i % 10); ++j) {
+++ value += base;
+++ }
+++ RETURN_NOT_OK(builder.Append(value));
++ }
++ }
++- return builder.Finish(out);
+++ return builder.Finish();
++ }
++
++ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
++@@ -384,22 +393,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
++ ArrayVector arrays;
++ FieldVector fields;
++
++- auto AppendColumn = [&](auto& MakeArray) {
++- arrays.emplace_back();
++- RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
++-
++- const auto& type = arrays.back()->type();
++- fields.push_back(field(type->ToString(), type));
+++ auto AppendColumn = [&](auto builder) {
+++ ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
+++ std::move(builder), length, with_nulls));
+++ arrays.push_back(array);
+++ fields.push_back(field(array->type()->ToString(), array->type()));
++ return Status::OK();
++ };
++
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
+++ auto pool = default_memory_pool();
+++ RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
++ if (with_view_types) {
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
+++ RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
++ }
++
++ *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
++diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
++index 5b1331a..42e83f6 100644
++--- a/cpp/src/arrow/type.h
+++++ b/cpp/src/arrow/type.h
++@@ -2494,6 +2494,16 @@ constexpr bool HasValidityBitmap(Type::type id) {
++ }
++ }
++
+++constexpr bool has_variadic_buffers(Type::type id) {
+++ switch (id) {
+++ case Type::BINARY_VIEW:
+++ case Type::STRING_VIEW:
+++ return true;
+++ default:
+++ return false;
+++ }
+++}
+++
++ ARROW_EXPORT
++ std::string ToString(Type::type id);
++
++diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
++index ffe78be..841d503 100644
++--- a/cpp/src/arrow/util/int_util_overflow.h
+++++ b/cpp/src/arrow/util/int_util_overflow.h
++@@ -18,7 +18,9 @@
++ #pragma once
++
++ #include <cstdint>
+++#include <initializer_list>
++ #include <limits>
+++#include <optional>
++ #include <type_traits>
++
++ #include "arrow/status.h"
++@@ -114,5 +116,36 @@ SignedInt SafeLeftShift(SignedInt u, Shift shift) {
++ return static_cast<SignedInt>(static_cast<UnsignedInt>(u) << shift);
++ }
++
+++// Convenience functions over an arbitrary number of arguments
+++template <typename Int>
+++std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
+++ if (vs.size() == 0) {
+++ return {};
+++ }
+++ auto it = vs.begin();
+++ Int v = *it++;
+++ while (it != vs.end()) {
+++ if (ARROW_PREDICT_FALSE(AddWithOverflow(v, *it++, &v))) {
+++ return {};
+++ }
+++ }
+++ return v;
+++}
+++
+++template <typename Int>
+++std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
+++ if (vs.size() == 0) {
+++ return {};
+++ }
+++ auto it = vs.begin();
+++ Int v = *it++;
+++ while (it != vs.end()) {
+++ if (ARROW_PREDICT_FALSE(MultiplyWithOverflow(v, *it++, &v))) {
+++ return {};
+++ }
+++ }
+++ return v;
+++}
+++
++ } // namespace internal
++ } // namespace arrow
++diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
++index 7217c10..cffa4e9 100644
++--- a/cpp/src/arrow/util/int_util_test.cc
+++++ b/cpp/src/arrow/util/int_util_test.cc
++@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
++ this->CheckOk(almost_min, almost_max + T{2}, T{1});
++ }
++
+++TEST(AddWithOverflow, Variadic) {
+++ ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
+++ ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
+++ ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
+++}
+++
+++TEST(MultiplyWithOverflow, Variadic) {
+++ ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
+++ ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
+++ ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
+++}
+++
++ } // namespace internal
++ } // namespace arrow
++--
++2.45.4
+
- if (internal::HasValidityBitmap(type_id, metadata_version_)) {
- // Extract null_bitmap which is common to all arrays except for unions
- // and nulls.
-@@ -300,6 +320,7 @@ class ArrayLoader {
- }
- buffer_index_++;
- }
-+
- return Status::OK();
- }
-
-@@ -398,14 +419,9 @@ class ArrayLoader {
- Status Visit(const BinaryViewType& type) {
- out_->buffers.resize(2);
-
-- RETURN_NOT_OK(LoadCommon(type.id()));
-- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
--
-- ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
-- GetVariadicCount(variadic_count_index_++));
-- out_->buffers.resize(data_buffer_count + 2);
-- for (int64_t i = 0; i < data_buffer_count; ++i) {
-- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
-+ RETURN_NOT_OK(LoadCommon(type.id())); // also initializes variadic buffers
-+ for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
-+ RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
- }
- return Status::OK();
- }
-@@ -503,6 +519,7 @@ class ArrayLoader {
- const MetadataVersion metadata_version_;
- io::RandomAccessFile* file_;
- int64_t file_offset_;
-+ std::optional<int64_t> file_length_;
- int max_recursion_depth_;
- int buffer_index_ = 0;
- int field_index_ = 0;
-@@ -1173,8 +1190,19 @@ namespace {
-
- // Common functions used in both the random-access file reader and the
- // asynchronous generator
--inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
-- return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
-+Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
-+ int64_t max_offset) {
-+ auto block =
-+ FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
-+ if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
-+ return Status::IOError("Invalid Block in IPC file footer");
-+ }
-+ auto block_end =
-+ AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
-+ if (!block_end.has_value() || block_end > max_offset) {
-+ return Status::IOError("Invalid Block in IPC file footer");
-+ }
-+ return block;
- }
-
- Status CheckAligned(const FileBlock& block) {
-@@ -1362,8 +1390,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- read_options, file, schema, &inclusion_mask);
- };
- }
-- ARROW_ASSIGN_OR_RAISE(auto message,
-- ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(block, fields_loader));
-
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
-@@ -1379,8 +1407,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- Result<int64_t> CountRows() override {
- int64_t total = 0;
- for (int i = 0; i < num_record_batches(); i++) {
-- ARROW_ASSIGN_OR_RAISE(auto outer_message,
-- ReadMessageFromBlock(GetRecordBatchBlock(i)));
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(auto outer_message, ReadMessageFromBlock(block));
- auto metadata = outer_message->metadata();
- const flatbuf::Message* message = nullptr;
- RETURN_NOT_OK(
-@@ -1494,13 +1522,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Status DoPreBufferMetadata(const std::vector<int>& indices) {
- RETURN_NOT_OK(CacheMetadata(indices));
-- EnsureDictionaryReadStarted();
-+ RETURN_NOT_OK(EnsureDictionaryReadStarted());
- Future<> all_metadata_ready = WaitForMetadatas(indices);
- for (int index : indices) {
- Future<std::shared_ptr<Message>> metadata_loaded =
- all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
- stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
-- FileBlock block = GetRecordBatchBlock(index);
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
- ARROW_ASSIGN_OR_RAISE(
- std::shared_ptr<Buffer> metadata,
- metadata_cache_->Read({block.offset, block.metadata_length}));
-@@ -1549,12 +1577,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- }
- };
-
-- FileBlock GetRecordBatchBlock(int i) const {
-- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
-+ Result<FileBlock> GetRecordBatchBlock(int i) const {
-+ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
- }
-
-- FileBlock GetDictionaryBlock(int i) const {
-- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
-+ Result<FileBlock> GetDictionaryBlock(int i) const {
-+ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
- }
-
- Result<std::unique_ptr<Message>> ReadMessageFromBlock(
-@@ -1567,16 +1595,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Status ReadDictionaries() {
- // Read all the dictionaries
-+ std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
-+ for (int i = 0; i < num_dictionaries(); ++i) {
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
-+ }
-+ return ReadDictionaries(messages);
-+ }
-+
-+ Status ReadDictionaries(
-+ const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
-+ DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
- IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
- for (int i = 0; i < num_dictionaries(); ++i) {
-- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
-- RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
-- stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
-+ RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
- }
- return Status::OK();
- }
-
-- Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
-+ Status ReadOneDictionary(int dict_index, Message* message,
-+ const IpcReadContext& context) {
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
- DictionaryKind kind;
-@@ -1586,44 +1624,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- } else if (kind == DictionaryKind::Delta) {
- stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
- }
-+ stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
- return Status::OK();
- }
-
-- void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
-+ Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
- // Adds all dictionaries to the range cache
- for (int i = 0; i < num_dictionaries(); ++i) {
-- FileBlock block = GetDictionaryBlock(i);
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
- ranges->push_back({block.offset, block.metadata_length + block.body_length});
- }
-+ return Status::OK();
- }
-
-- void AddMetadataRanges(const std::vector<int>& indices,
-- std::vector<io::ReadRange>* ranges) {
-+ Status AddMetadataRanges(const std::vector<int>& indices,
-+ std::vector<io::ReadRange>* ranges) {
- for (int index : indices) {
-- FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
- ranges->push_back({block.offset, block.metadata_length});
- }
-+ return Status::OK();
- }
-
- Status CacheMetadata(const std::vector<int>& indices) {
- std::vector<io::ReadRange> ranges;
- if (!read_dictionaries_) {
-- AddDictionaryRanges(&ranges);
-+ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
- }
-- AddMetadataRanges(indices, &ranges);
-+ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
- return metadata_cache_->Cache(std::move(ranges));
- }
-
-- void EnsureDictionaryReadStarted() {
-+ Status EnsureDictionaryReadStarted() {
- if (!dictionary_load_finished_.is_valid()) {
- read_dictionaries_ = true;
- std::vector<io::ReadRange> ranges;
-- AddDictionaryRanges(&ranges);
-+ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
- dictionary_load_finished_ =
- metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
- return ReadDictionaries();
- });
- }
-+ return Status::OK();
- }
-
- Status WaitForDictionaryReadFinished() {
-@@ -1641,7 +1683,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Future<> WaitForMetadatas(const std::vector<int>& indices) {
- std::vector<io::ReadRange> ranges;
-- AddMetadataRanges(indices, &ranges);
-+ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
- return metadata_cache_->WaitFor(std::move(ranges));
- }
-
-@@ -1685,12 +1727,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- const flatbuf::RecordBatch* batch,
- IpcReadContext context, io::RandomAccessFile* file,
- std::shared_ptr<io::RandomAccessFile> owned_file,
-- int64_t block_data_offset)
-+ int64_t block_data_offset, int64_t block_data_length)
- : schema(std::move(sch)),
- context(std::move(context)),
- file(file),
- owned_file(std::move(owned_file)),
-- loader(batch, context.metadata_version, context.options, block_data_offset),
-+ loader(batch, context.metadata_version, context.options, block_data_offset,
-+ block_data_length),
- columns(schema->num_fields()),
- cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
- length(batch->length()) {}
-@@ -1789,14 +1832,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- return dictionary_load_finished_.Then([message_fut] { return message_fut; })
- .Then([this, index](const std::shared_ptr<Message>& message_obj)
- -> Future<std::shared_ptr<RecordBatch>> {
-- FileBlock block = GetRecordBatchBlock(index);
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
- ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
- ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
- ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
-
- auto read_context = std::make_shared<CachedRecordBatchReadContext>(
- schema_, batch, std::move(context), file_, owned_file_,
-- block.offset + static_cast<int64_t>(block.metadata_length));
-+ block.offset + static_cast<int64_t>(block.metadata_length),
-+ block.body_length);
- RETURN_NOT_OK(read_context->CalculateLoadRequest());
- return read_context->ReadAsync().Then(
- [read_context] { return read_context->CreateRecordBatch(); });
-@@ -1915,25 +1959,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
- WholeIpcFileRecordBatchGenerator::operator()() {
- auto state = state_;
- if (!read_dictionaries_.is_valid()) {
-- std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
-- for (int i = 0; i < state->num_dictionaries(); i++) {
-- auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
-- messages[i] = ReadBlock(block);
-- }
-- auto read_messages = All(std::move(messages));
-- if (executor_) read_messages = executor_->Transfer(read_messages);
-- read_dictionaries_ = read_messages.Then(
-- [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-- -> Status {
-- ARROW_ASSIGN_OR_RAISE(auto messages,
-- arrow::internal::UnwrapOrRaise(maybe_messages));
-- return ReadDictionaries(state.get(), std::move(messages));
-- });
-+ if (state->dictionary_load_finished_.is_valid()) {
-+ // PreBufferMetadata has started reading dictionaries in the background
-+ read_dictionaries_ = state->dictionary_load_finished_;
-+ } else {
-+ // Start reading dictionaries
-+ std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
-+ for (int i = 0; i < state->num_dictionaries(); i++) {
-+ ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
-+ messages[i] = ReadBlock(block);
-+ }
-+ auto read_messages = All(std::move(messages));
-+ if (executor_) read_messages = executor_->Transfer(read_messages);
-+ read_dictionaries_ = read_messages.Then(
-+ [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-+ -> Status {
-+ ARROW_ASSIGN_OR_RAISE(auto messages,
-+ arrow::internal::UnwrapOrRaise(maybe_messages));
-+ return state->ReadDictionaries(messages);
-+ });
-+ }
- }
- if (index_ >= state_->num_record_batches()) {
- return Future<Item>::MakeFinished(IterationTraits<Item>::End());
- }
-- auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
-+ ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
- auto read_message = ReadBlock(block);
- auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
- // Force transfer. This may be wasteful in some cases, but ensures we get off the
-@@ -1969,16 +2019,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
- }
- }
-
--Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
-- RecordBatchFileReaderImpl* state,
-- std::vector<std::shared_ptr<Message>> dictionary_messages) {
-- IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
-- for (const auto& message : dictionary_messages) {
-- RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
-- }
-- return Status::OK();
--}
--
- Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
- RecordBatchFileReaderImpl* state, Message* message) {
- CHECK_HAS_BODY(*message);
-@@ -2630,6 +2670,14 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
- return st;
- }
-
-+Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
-+ if (batch.batch) {
-+ RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
-+ }
-+ // XXX do something with custom metadata?
-+ return Status::OK();
-+}
-+
- IpcReadOptions FuzzingOptions() {
- IpcReadOptions options;
- options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
-@@ -2648,12 +2696,12 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
- Status st;
-
- while (true) {
-- std::shared_ptr<arrow::RecordBatch> batch;
-- RETURN_NOT_OK(batch_reader->ReadNext(&batch));
-- if (batch == nullptr) {
-+ ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
-+ if (!batch.batch && !batch.custom_metadata) {
-+ // EOS
- break;
- }
-- st &= ValidateFuzzBatch(*batch);
-+ st &= ValidateFuzzBatch(batch);
- }
-
- return st;
-@@ -2661,20 +2709,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
-
- Status FuzzIpcFile(const uint8_t* data, int64_t size) {
- auto buffer = std::make_shared<Buffer>(data, size);
-- io::BufferReader buffer_reader(buffer);
-
-- std::shared_ptr<RecordBatchFileReader> batch_reader;
-- ARROW_ASSIGN_OR_RAISE(batch_reader,
-- RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
-- Status st;
-+ Status final_status;
-
-- const int n_batches = batch_reader->num_record_batches();
-- for (int i = 0; i < n_batches; ++i) {
-- ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
-- st &= ValidateFuzzBatch(*batch);
-+ auto do_read = [&](bool pre_buffer) {
-+ io::BufferReader buffer_reader(buffer);
-+ ARROW_ASSIGN_OR_RAISE(auto batch_reader,
-+ RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
-+ if (pre_buffer) {
-+ // Pre-buffer all record batches
-+ RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
-+ }
-+
-+ const int n_batches = batch_reader->num_record_batches();
-+ for (int i = 0; i < n_batches; ++i) {
-+ RecordBatchWithMetadata batch;
-+ auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
-+ final_status &= st;
-+ if (!st.ok()) {
-+ continue;
-+ }
-+ final_status &= ValidateFuzzBatch(batch);
-+ }
-+ return Status::OK();
-+ };
-+
-+ for (const bool pre_buffer : {false, true}) {
-+ final_status &= do_read(pre_buffer);
- }
-
-- return st;
-+ return final_status;
- }
-
- Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
-diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
-index 02e6b816c0b..ceca6d9e434 100644
---- a/cpp/src/arrow/ipc/test_common.cc
-+++ b/cpp/src/arrow/ipc/test_common.cc
-@@ -16,6 +16,7 @@
- // under the License.
-
- #include <algorithm>
-+#include <concepts>
- #include <cstdint>
- #include <functional>
- #include <memory>
-@@ -368,19 +369,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
- return builder.Finish(out);
- }
-
--template <class BuilderType>
--static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
-- MemoryPool* pool,
-- std::shared_ptr<Array>* out) {
-- BuilderType builder(pool);
-+template <std::derived_from<ArrayBuilder> BuilderType>
-+static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
-+ BuilderType builder, int64_t length, bool include_nulls) {
-+ if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
-+ // Try to emit several variadic buffers by choosing a small block size.
-+ builder.SetBlockSize(512);
-+ }
- for (int64_t i = 0; i < length; ++i) {
- if (include_nulls && (i % 7 == 0)) {
- RETURN_NOT_OK(builder.AppendNull());
- } else {
-- RETURN_NOT_OK(builder.Append(std::to_string(i)));
-+ // Make sure that some strings are long enough to have non-inline binary views
-+ const auto base = std::to_string(i);
-+ std::string value;
-+ for (int64_t j = 0; j < 3 * (i % 10); ++j) {
-+ value += base;
-+ }
-+ RETURN_NOT_OK(builder.Append(value));
- }
- }
-- return builder.Finish(out);
-+ return builder.Finish();
- }
-
- Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
-@@ -390,22 +399,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
- ArrayVector arrays;
- FieldVector fields;
-
-- auto AppendColumn = [&](auto& MakeArray) {
-- arrays.emplace_back();
-- RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
--
-- const auto& type = arrays.back()->type();
-- fields.push_back(field(type->ToString(), type));
-+ auto AppendColumn = [&](auto builder) {
-+ ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
-+ std::move(builder), length, with_nulls));
-+ arrays.push_back(array);
-+ fields.push_back(field(array->type()->ToString(), array->type()));
- return Status::OK();
- };
-
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
-+ auto pool = default_memory_pool();
-+ RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
- if (with_view_types) {
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
-+ RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
- }
-
- *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
-diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
-index f68d2dcb619..e3582056ead 100644
---- a/cpp/src/arrow/type.h
-+++ b/cpp/src/arrow/type.h
-@@ -2575,6 +2575,16 @@ constexpr bool may_have_validity_bitmap(Type::type id) {
- }
- }
-
-+constexpr bool has_variadic_buffers(Type::type id) {
-+ switch (id) {
-+ case Type::BINARY_VIEW:
-+ case Type::STRING_VIEW:
-+ return true;
-+ default:
-+ return false;
-+ }
-+}
-+
- ARROW_DEPRECATED("Deprecated in 17.0.0. Use may_have_validity_bitmap() instead.")
- constexpr bool HasValidityBitmap(Type::type id) { return may_have_validity_bitmap(id); }
-
-diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
-index 93066fecafa..69714a935a4 100644
---- a/cpp/src/arrow/util/int_util_overflow.h
-+++ b/cpp/src/arrow/util/int_util_overflow.h
-@@ -18,7 +18,9 @@
- #pragma once
-
- #include <cstdint>
-+#include <initializer_list>
- #include <limits>
-+#include <optional>
- #include <type_traits>
-
- #include "arrow/status.h"
-@@ -162,6 +164,37 @@ NON_GENERIC_OPS_WITH_OVERFLOW(DivideWithOverflow)
- #undef NON_GENERIC_OPS_WITH_OVERFLOW
- #undef NON_GENERIC_OP_WITH_OVERFLOW
-
-+// Convenience functions over an arbitrary number of arguments
-+template <typename Int>
-+std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
-+ if (vs.size() == 0) {
-+ return {};
-+ }
-+ auto it = vs.begin();
-+ Int v = *it++;
-+ while (it != vs.end()) {
-+ if (ARROW_PREDICT_FALSE(AddWithOverflowGeneric(v, *it++, &v))) {
-+ return {};
-+ }
-+ }
-+ return v;
-+}
-+
-+template <typename Int>
-+std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
-+ if (vs.size() == 0) {
-+ return {};
-+ }
-+ auto it = vs.begin();
-+ Int v = *it++;
-+ while (it != vs.end()) {
-+ if (ARROW_PREDICT_FALSE(MultiplyWithOverflowGeneric(v, *it++, &v))) {
-+ return {};
-+ }
-+ }
-+ return v;
-+}
-+
- // Define function NegateWithOverflow with the signature `bool(T u, T* out)`
- // where T is a signed integer type. On overflow, these functions return true.
- // Otherwise, false is returned and `out` is updated with the result of the
-diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
-index 7217c1097e4..cffa4e9d15e 100644
---- a/cpp/src/arrow/util/int_util_test.cc
-+++ b/cpp/src/arrow/util/int_util_test.cc
-@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
- this->CheckOk(almost_min, almost_max + T{2}, T{1});
- }
-
-+TEST(AddWithOverflow, Variadic) {
-+ ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
-+ ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
-+ ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
-+}
-+
-+TEST(MultiplyWithOverflow, Variadic) {
-+ ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
-+ ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
-+ ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
-+}
-+
- } // namespace internal
- } // namespace arrow
Verdict❌ CHANGES REQUESTED — Please address the issues flagged above. |
Patch has been applied cleanly.
|
🔒 CVE Patch Review: CVE-2026-25087PR #16145 — [MEDIUM] Patch libarrow for CVE-2026-25087 Spec File Validation
Build Verification
🤖 AI Build Log Analysis
🧪 Test Log AnalysisNo test log found (package may not have a %check section). Patch Analysis
Raw diff (upstream vs PR)--- upstream
+++ pr
@@ -1,771 +1,848 @@
-From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
-From: Antoine Pitrou <antoine@python.org>
-Date: Wed, 21 Jan 2026 17:54:00 +0100
-Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
-
----
- ci/scripts/cpp_test.sh | 9 +
- cpp/src/arrow/ipc/read_write_test.cc | 75 +++++----
- cpp/src/arrow/ipc/reader.cc | 222 ++++++++++++++++---------
- cpp/src/arrow/ipc/test_common.cc | 47 +++---
- cpp/src/arrow/type.h | 10 ++
- cpp/src/arrow/util/int_util_overflow.h | 33 ++++
- cpp/src/arrow/util/int_util_test.cc | 18 ++
- 7 files changed, 286 insertions(+), 128 deletions(-)
-
-diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
-index 0ad59bc308f..5d6d5e099ab 100755
---- a/ci/scripts/cpp_test.sh
-+++ b/ci/scripts/cpp_test.sh
-@@ -182,6 +182,15 @@ if [ "${ARROW_FUZZING}" == "ON" ]; then
- # Some fuzz regression files may trigger huge memory allocations,
- # let the allocator return null instead of aborting.
- export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
-+ export ARROW_FUZZING_VERBOSITY=1
-+ # Run golden IPC integration files: these should ideally load without errors,
-+ # though some very old ones carry invalid data (such as decimal values
-+ # larger than their advertised precision).
-+ # shellcheck disable=SC2046
-+ "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
-+ # shellcheck disable=SC2046
-+ "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
-+ # Run known crash files
- "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/crash-*
- "${binary_output_dir}/arrow-ipc-stream-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-stream/*-testcase-*
- "${binary_output_dir}/arrow-ipc-file-fuzz" "${ARROW_TEST_DATA}"/arrow-ipc-file/*-testcase-*
-diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
-index 315d8bd07d9..9f7df541bd7 100644
---- a/cpp/src/arrow/ipc/read_write_test.cc
-+++ b/cpp/src/arrow/ipc/read_write_test.cc
-@@ -1252,40 +1252,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
- Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
- ReadStats* out_stats = nullptr,
- MetadataVector* out_metadata_list = nullptr) override {
-- std::shared_ptr<io::RandomAccessFile> buf_reader;
-- if (kCoalesce) {
-- // Use a non-zero-copy enabled BufferReader so we can test paths properly
-- buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
-- } else {
-- buf_reader = std::make_shared<io::BufferReader>(buffer_);
-- }
-- AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
-+ // The generator doesn't track stats.
-+ EXPECT_EQ(nullptr, out_stats);
-
-- {
-- auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
-- // Do NOT assert OK since some tests check whether this fails properly
-- EXPECT_FINISHES(fut);
-- ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
-- EXPECT_EQ(num_batches_written_, reader->num_record_batches());
-- // Generator will keep reader alive internally
-- ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
-- }
-+ auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
-+ std::shared_ptr<io::RandomAccessFile> buf_reader;
-+ if (kCoalesce) {
-+ // Use a non-zero-copy enabled BufferReader so we can test paths properly
-+ buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
-+ } else {
-+ buf_reader = std::make_shared<io::BufferReader>(buffer_);
-+ }
-+ AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+diff --git a/SPECS/libarrow/CVE-2026-25087.patch b/SPECS/libarrow/CVE-2026-25087.patch
+new file mode 100644
+index 00000000000..6dc36806b10
+--- /dev/null
++++ b/SPECS/libarrow/CVE-2026-25087.patch
+@@ -0,0 +1,842 @@
++From a4ae90929d6e959e9a1fb29f3907bbbf2799472e Mon Sep 17 00:00:00 2001
++From: Antoine Pitrou <antoine@python.org>
++Date: Wed, 21 Jan 2026 17:54:00 +0100
++Subject: [PATCH] GH-48924: [C++][CI] Fuzz IPC file metadata pre-buffering
+
-+ {
-+ auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
-+ ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
-+ EXPECT_EQ(num_batches_written_, reader->num_record_batches());
-+ if (pre_buffer) {
-+ RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
-+ }
-+ // Generator will keep reader alive internally
-+ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
-+ }
-
-- // Generator is async-reentrant
-- std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+ // Generator is async-reentrant
-+ std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
-+ for (int i = 0; i < num_batches_written_; ++i) {
-+ futures.push_back(generator());
-+ }
-+ auto fut = generator();
-+ ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
-+ EXPECT_EQ(nullptr, final_batch);
++Upstream Patch reference: https://patch-diff.githubusercontent.com/raw/apache/arrow/pull/48925.patch
+
-+ RecordBatchVector batches;
-+ for (auto& future : futures) {
-+ ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
-+ EXPECT_NE(nullptr, batch);
-+ batches.push_back(batch);
-+ }
-+ return batches;
-+ };
++---
++ ci/scripts/cpp_test.sh | 12 ++
++ cpp/src/arrow/ipc/read_write_test.cc | 75 +++++---
++ cpp/src/arrow/ipc/reader.cc | 252 +++++++++++++++++--------
++ cpp/src/arrow/ipc/test_common.cc | 47 +++--
++ cpp/src/arrow/type.h | 10 +
++ cpp/src/arrow/util/int_util_overflow.h | 33 ++++
++ cpp/src/arrow/util/int_util_test.cc | 18 ++
++ 7 files changed, 316 insertions(+), 131 deletions(-)
+
-+ ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
-+ // Also read with pre-buffered metadata, and check the results are equal
-+ ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
- for (int i = 0; i < num_batches_written_; ++i) {
-- futures.push_back(generator());
-- }
-- auto fut = generator();
-- EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
-- for (auto& future : futures) {
-- EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
-- out_batches->push_back(batch);
-+ AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
-+ /*check_metadata=*/true);
- }
--
-- // The generator doesn't track stats.
-- EXPECT_EQ(nullptr, out_stats);
--
- return Status::OK();
- }
- };
-diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
-index 8e125fc5ede..f1571f76c24 100644
---- a/cpp/src/arrow/ipc/reader.cc
-+++ b/cpp/src/arrow/ipc/reader.cc
-@@ -54,6 +54,7 @@
- #include "arrow/util/compression.h"
- #include "arrow/util/endian.h"
- #include "arrow/util/fuzz_internal.h"
-+#include "arrow/util/int_util_overflow.h"
- #include "arrow/util/key_value_metadata.h"
- #include "arrow/util/logging_internal.h"
- #include "arrow/util/parallel.h"
-@@ -72,6 +73,7 @@ namespace arrow {
-
- namespace flatbuf = org::apache::arrow::flatbuf;
-
-+using internal::AddWithOverflow;
- using internal::checked_cast;
- using internal::checked_pointer_cast;
-
-@@ -177,14 +179,16 @@ class ArrayLoader {
-
- explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
- MetadataVersion metadata_version, const IpcReadOptions& options,
-- int64_t file_offset)
-+ int64_t file_offset, int64_t file_length)
- : metadata_(metadata),
- metadata_version_(metadata_version),
- file_(nullptr),
- file_offset_(file_offset),
-+ file_length_(file_length),
- max_recursion_depth_(options.max_recursion_depth) {}
-
- Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
-+ // This construct permits overriding GetBuffer at compile time
- if (skip_io_) {
- return Status::OK();
- }
-@@ -194,7 +198,10 @@ class ArrayLoader {
- if (length < 0) {
- return Status::Invalid("Negative length for reading buffer ", buffer_index_);
- }
-- // This construct permits overriding GetBuffer at compile time
-+ auto read_end = AddWithOverflow({offset, length});
-+ if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
-+ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+ }
- if (!bit_util::IsMultipleOf8(offset)) {
- return Status::Invalid("Buffer ", buffer_index_,
- " did not start on 8-byte aligned offset: ", offset);
-@@ -202,6 +209,9 @@ class ArrayLoader {
- if (file_) {
- return file_->ReadAt(offset, length).Value(out);
- } else {
-+ if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
-+ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
-+ }
- read_request_.RequestRange(offset + file_offset_, length, out);
- return Status::OK();
- }
-@@ -292,6 +302,16 @@ class ArrayLoader {
- // we can skip that buffer without reading from shared memory
- RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
-
-+ if (::arrow::internal::has_variadic_buffers(type_id)) {
-+ ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
-+ GetVariadicCount(variadic_count_index_++));
-+ const int64_t start = static_cast<int64_t>(out_->buffers.size());
-+ // NOTE: this must be done before any other call to `GetBuffer` because
-+ // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
-+ // objects.
-+ out_->buffers.resize(start + data_buffer_count);
-+ }
++diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
++index 0c6e1c6..1110378 100755
++--- a/ci/scripts/cpp_test.sh
+++++ b/ci/scripts/cpp_test.sh
++@@ -107,6 +107,18 @@ fi
++
++ if [ "${ARROW_FUZZING}" == "ON" ]; then
++ # Fuzzing regression tests
+++ # Some fuzz regression files may trigger huge memory allocations,
+++ # let the allocator return null instead of aborting.
+++ export ASAN_OPTIONS="$ASAN_OPTIONS allocator_may_return_null=1"
+++ export ARROW_FUZZING_VERBOSITY=1
+++ # Run golden IPC integration files: these should ideally load without errors,
+++ # though some very old ones carry invalid data (such as decimal values
+++ # larger than their advertised precision).
+++ # shellcheck disable=SC2046
+++ "${binary_output_dir}/arrow-ipc-stream-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.stream")
+++ # shellcheck disable=SC2046
+++ "${binary_output_dir}/arrow-ipc-file-fuzz" $(find "${ARROW_TEST_DATA}"/arrow-ipc-stream/integration -name "*.arrow_file")
+++ # Run known crash files
++ ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/crash-*
++ ${binary_output_dir}/arrow-ipc-stream-fuzz ${ARROW_TEST_DATA}/arrow-ipc-stream/*-testcase-*
++ ${binary_output_dir}/arrow-ipc-file-fuzz ${ARROW_TEST_DATA}/arrow-ipc-file/*-testcase-*
++diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
++index bd2c2b7..af749ec 100644
++--- a/cpp/src/arrow/ipc/read_write_test.cc
+++++ b/cpp/src/arrow/ipc/read_write_test.cc
++@@ -1220,40 +1220,55 @@ struct FileGeneratorWriterHelper : public FileWriterHelper {
++ Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
++ ReadStats* out_stats = nullptr,
++ MetadataVector* out_metadata_list = nullptr) override {
++- std::shared_ptr<io::RandomAccessFile> buf_reader;
++- if (kCoalesce) {
++- // Use a non-zero-copy enabled BufferReader so we can test paths properly
++- buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
++- } else {
++- buf_reader = std::make_shared<io::BufferReader>(buffer_);
++- }
++- AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++ // The generator doesn't track stats.
+++ EXPECT_EQ(nullptr, out_stats);
++
++- {
++- auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
++- // Do NOT assert OK since some tests check whether this fails properly
++- EXPECT_FINISHES(fut);
++- ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
++- EXPECT_EQ(num_batches_written_, reader->num_record_batches());
++- // Generator will keep reader alive internally
++- ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
++- }
+++ auto read_batches = [&](bool pre_buffer) -> Result<RecordBatchVector> {
+++ std::shared_ptr<io::RandomAccessFile> buf_reader;
+++ if (kCoalesce) {
+++ // Use a non-zero-copy enabled BufferReader so we can test paths properly
+++ buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
+++ } else {
+++ buf_reader = std::make_shared<io::BufferReader>(buffer_);
+++ }
+++ AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+++
+++ {
+++ auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
+++ ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
+++ EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+++ if (pre_buffer) {
+++ RETURN_NOT_OK(reader->PreBufferMetadata(/*indices=*/{}));
+++ }
+++ // Generator will keep reader alive internally
+++ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
+++ }
++
++- // Generator is async-reentrant
++- std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++ // Generator is async-reentrant
+++ std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+++ for (int i = 0; i < num_batches_written_; ++i) {
+++ futures.push_back(generator());
+++ }
+++ auto fut = generator();
+++ ARROW_ASSIGN_OR_RAISE(auto final_batch, fut.result());
+++ EXPECT_EQ(nullptr, final_batch);
+++
+++ RecordBatchVector batches;
+++ for (auto& future : futures) {
+++ ARROW_ASSIGN_OR_RAISE(auto batch, future.result());
+++ EXPECT_NE(nullptr, batch);
+++ batches.push_back(batch);
+++ }
+++ return batches;
+++ };
+++
+++ ARROW_ASSIGN_OR_RAISE(*out_batches, read_batches(/*pre_buffer=*/false));
+++ // Also read with pre-buffered metadata, and check the results are equal
+++ ARROW_ASSIGN_OR_RAISE(auto batches_pre_buffered, read_batches(/*pre_buffer=*/true));
++ for (int i = 0; i < num_batches_written_; ++i) {
++- futures.push_back(generator());
++- }
++- auto fut = generator();
++- EXPECT_FINISHES_OK_AND_EQ(nullptr, fut);
++- for (auto& future : futures) {
++- EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
++- out_batches->push_back(batch);
+++ AssertBatchesEqual(*batches_pre_buffered[i], *(*out_batches)[i],
+++ /*check_metadata=*/true);
++ }
++-
++- // The generator doesn't track stats.
++- EXPECT_EQ(nullptr, out_stats);
++-
++ return Status::OK();
++ }
++ };
++diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
++index d272c78..3a2987b 100644
++--- a/cpp/src/arrow/ipc/reader.cc
+++++ b/cpp/src/arrow/ipc/reader.cc
++@@ -52,6 +52,7 @@
++ #include "arrow/util/checked_cast.h"
++ #include "arrow/util/compression.h"
++ #include "arrow/util/endian.h"
+++#include "arrow/util/int_util_overflow.h"
++ #include "arrow/util/key_value_metadata.h"
++ #include "arrow/util/logging.h"
++ #include "arrow/util/parallel.h"
++@@ -73,6 +74,8 @@ namespace flatbuf = org::apache::arrow::flatbuf;
++ using internal::checked_cast;
++ using internal::checked_pointer_cast;
++
+++using internal::AddWithOverflow;
+++
++ namespace ipc {
++
++ using internal::FileBlock;
++@@ -166,23 +169,26 @@ class ArrayLoader {
++ public:
++ explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++ MetadataVersion metadata_version, const IpcReadOptions& options,
++- io::RandomAccessFile* file)
+++ io::RandomAccessFile* file, std::optional<int64_t> file_length)
++ : metadata_(metadata),
++ metadata_version_(metadata_version),
++ file_(file),
++ file_offset_(0),
+++ file_length_(file_length),
++ max_recursion_depth_(options.max_recursion_depth) {}
++
++ explicit ArrayLoader(const flatbuf::RecordBatch* metadata,
++ MetadataVersion metadata_version, const IpcReadOptions& options,
++- int64_t file_offset)
+++ int64_t file_offset, std::optional<int64_t> file_length)
++ : metadata_(metadata),
++ metadata_version_(metadata_version),
++ file_(nullptr),
++ file_offset_(file_offset),
+++ file_length_(file_length),
++ max_recursion_depth_(options.max_recursion_depth) {}
++
++ Status ReadBuffer(int64_t offset, int64_t length, std::shared_ptr<Buffer>* out) {
+++ // This construct permits overriding GetBuffer at compile time
++ if (skip_io_) {
++ return Status::OK();
++ }
++@@ -192,7 +198,10 @@ class ArrayLoader {
++ if (length < 0) {
++ return Status::Invalid("Negative length for reading buffer ", buffer_index_);
++ }
++- // This construct permits overriding GetBuffer at compile time
+++ auto read_end = AddWithOverflow({offset, length});
+++ if (!read_end.has_value() || (file_length_.has_value() && read_end > file_length_)) {
+++ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++ }
++ if (!bit_util::IsMultipleOf8(offset)) {
++ return Status::Invalid("Buffer ", buffer_index_,
++ " did not start on 8-byte aligned offset: ", offset);
++@@ -200,6 +209,9 @@ class ArrayLoader {
++ if (file_) {
++ return file_->ReadAt(offset, length).Value(out);
++ } else {
+++ if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
+++ return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file area");
+++ }
++ read_request_.RequestRange(offset + file_offset_, length, out);
++ return Status::OK();
++ }
++@@ -284,6 +296,16 @@ class ArrayLoader {
++ // we can skip that buffer without reading from shared memory
++ RETURN_NOT_OK(GetFieldMetadata(field_index_++, out_));
++
+++ if (::arrow::internal::has_variadic_buffers(type_id)) {
+++ ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
+++ GetVariadicCount(variadic_count_index_++));
+++ const int64_t start = static_cast<int64_t>(out_->buffers.size());
+++ // NOTE: this must be done before any other call to `GetBuffer` because
+++ // BatchDataReadRequest will keep pointers to `std::shared_ptr<Buffer>`
+++ // objects.
+++ out_->buffers.resize(start + data_buffer_count);
+++ }
+++
++ if (internal::HasValidityBitmap(type_id, metadata_version_)) {
++ // Extract null_bitmap which is common to all arrays except for unions
++ // and nulls.
++@@ -292,6 +314,7 @@ class ArrayLoader {
++ }
++ buffer_index_++;
++ }
+++
++ return Status::OK();
++ }
++
++@@ -390,14 +413,9 @@ class ArrayLoader {
++ Status Visit(const BinaryViewType& type) {
++ out_->buffers.resize(2);
++
++- RETURN_NOT_OK(LoadCommon(type.id()));
++- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
++-
++- ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
++- GetVariadicCount(variadic_count_index_++));
++- out_->buffers.resize(data_buffer_count + 2);
++- for (size_t i = 0; i < data_buffer_count; ++i) {
++- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
+++ RETURN_NOT_OK(LoadCommon(type.id())); // also initializes variadic buffers
+++ for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
+++ RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
++ }
++ return Status::OK();
++ }
++@@ -495,6 +513,7 @@ class ArrayLoader {
++ const MetadataVersion metadata_version_;
++ io::RandomAccessFile* file_;
++ int64_t file_offset_;
+++ std::optional<int64_t> file_length_;
++ int max_recursion_depth_;
++ int buffer_index_ = 0;
++ int field_index_ = 0;
++@@ -583,7 +602,12 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
++ const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema,
++ const std::vector<bool>* inclusion_mask, const IpcReadContext& context,
++ io::RandomAccessFile* file) {
++- ArrayLoader loader(metadata, context.metadata_version, context.options, file);
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
+++ ArrayLoader loader(metadata, context.metadata_version, context.options, file,
+++ file_length);
++
++ ArrayDataVector columns(schema->num_fields());
++ ArrayDataVector filtered_columns;
++@@ -832,8 +856,12 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
++ ARROW_ASSIGN_OR_RAISE(auto value_type, context.dictionary_memo->GetDictionaryType(id));
++
++ // Load the dictionary data from the dictionary batch
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
++ ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()),
++- context.options, file);
+++ context.options, file, file_length);
++ auto dict_data = std::make_shared<ArrayData>();
++ const Field dummy_field("", value_type);
++ RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get()));
++@@ -1152,8 +1180,19 @@ Result<std::shared_ptr<RecordBatchStreamReader>> RecordBatchStreamReader::Open(
++
++ // Common functions used in both the random-access file reader and the
++ // asynchronous generator
++-static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
++- return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
+++static inline Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
+++ int64_t max_offset) {
+++ auto block =
+++ FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
+++ if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
+++ return Status::IOError("Invalid Block in IPC file footer");
+++ }
+++ auto block_end =
+++ AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
+++ if (!block_end.has_value() || block_end > max_offset) {
+++ return Status::IOError("Invalid Block in IPC file footer");
+++ }
+++ return block;
++ }
++
++ Status CheckAligned(const FileBlock& block) {
++@@ -1267,7 +1306,11 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ const std::shared_ptr<Schema>& schema,
++ const std::vector<bool>* inclusion_mask,
++ MetadataVersion metadata_version = MetadataVersion::V5) {
++- ArrayLoader loader(metadata, metadata_version, options, file);
+++ std::optional<int64_t> file_length;
+++ if (file) {
+++ ARROW_ASSIGN_OR_RAISE(file_length, file->GetSize());
+++ }
+++ ArrayLoader loader(metadata, metadata_version, options, file, file_length);
++ for (int i = 0; i < schema->num_fields(); ++i) {
++ const Field& field = *schema->field(i);
++ if (!inclusion_mask || (*inclusion_mask)[i]) {
++@@ -1336,8 +1379,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ read_options, file, schema, &inclusion_mask);
++ };
++ }
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++ ARROW_ASSIGN_OR_RAISE(auto message,
++- ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
+++ ReadMessageFromBlock(block, fields_loader));
++
++ CHECK_HAS_BODY(*message);
++ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++@@ -1353,8 +1397,9 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ Result<int64_t> CountRows() override {
++ int64_t total = 0;
++ for (int i = 0; i < num_record_batches(); i++) {
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
++ ARROW_ASSIGN_OR_RAISE(auto outer_message,
++- ReadMessageFromBlock(GetRecordBatchBlock(i)));
+++ ReadMessageFromBlock(block));
++ auto metadata = outer_message->metadata();
++ const flatbuf::Message* message = nullptr;
++ RETURN_NOT_OK(
++@@ -1468,13 +1513,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Status DoPreBufferMetadata(const std::vector<int>& indices) {
++ RETURN_NOT_OK(CacheMetadata(indices));
++- EnsureDictionaryReadStarted();
+++ RETURN_NOT_OK(EnsureDictionaryReadStarted());
++ Future<> all_metadata_ready = WaitForMetadatas(indices);
++ for (int index : indices) {
++ Future<std::shared_ptr<Message>> metadata_loaded =
++ all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
++ stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
++- FileBlock block = GetRecordBatchBlock(index);
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++ ARROW_ASSIGN_OR_RAISE(
++ std::shared_ptr<Buffer> metadata,
++ metadata_cache_->Read({block.offset, block.metadata_length}));
++@@ -1523,12 +1568,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ }
++ };
++
++- FileBlock GetRecordBatchBlock(int i) const {
++- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+++ Result<FileBlock> GetRecordBatchBlock(int i) const {
+++ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
++ }
++
++- FileBlock GetDictionaryBlock(int i) const {
++- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
+++ Result<FileBlock> GetDictionaryBlock(int i) const {
+++ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
++ }
++
++ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
++@@ -1541,16 +1586,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Status ReadDictionaries() {
++ // Read all the dictionaries
+++ std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
+++ for (int i = 0; i < num_dictionaries(); ++i) {
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
+++ ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
+++ }
+++ return ReadDictionaries(messages);
+++ }
+++
+++ Status ReadDictionaries(
+++ const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
+++ DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
++ IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
++ for (int i = 0; i < num_dictionaries(); ++i) {
++- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
++- RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
++- stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
+++ RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
++ }
++ return Status::OK();
++ }
++
++- Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
+++ Status ReadOneDictionary(int dict_index, Message* message,
+++ const IpcReadContext& context) {
++ CHECK_HAS_BODY(*message);
++ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
++ DictionaryKind kind;
++@@ -1560,44 +1615,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ } else if (kind == DictionaryKind::Delta) {
++ stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
++ }
+++ stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
++ return Status::OK();
++ }
++
++- void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
+++ Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
++ // Adds all dictionaries to the range cache
++ for (int i = 0; i < num_dictionaries(); ++i) {
++- FileBlock block = GetDictionaryBlock(i);
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
++ ranges->push_back({block.offset, block.metadata_length + block.body_length});
++ }
+++ return Status::OK();
++ }
++
++- void AddMetadataRanges(const std::vector<int>& indices,
++- std::vector<io::ReadRange>* ranges) {
+++ Status AddMetadataRanges(const std::vector<int>& indices,
+++ std::vector<io::ReadRange>* ranges) {
++ for (int index : indices) {
++- FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
+++ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
++ ranges->push_back({block.offset, block.metadata_length});
++ }
+++ return Status::OK();
++ }
++
++ Status CacheMetadata(const std::vector<int>& indices) {
++ std::vector<io::ReadRange> ranges;
++ if (!read_dictionaries_) {
++- AddDictionaryRanges(&ranges);
+++ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++ }
++- AddMetadataRanges(indices, &ranges);
+++ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++ return metadata_cache_->Cache(std::move(ranges));
++ }
++
++- void EnsureDictionaryReadStarted() {
+++ Status EnsureDictionaryReadStarted() {
++ if (!dictionary_load_finished_.is_valid()) {
++ read_dictionaries_ = true;
++ std::vector<io::ReadRange> ranges;
++- AddDictionaryRanges(&ranges);
+++ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
++ dictionary_load_finished_ =
++ metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
++ return ReadDictionaries();
++ });
++ }
+++ return Status::OK();
++ }
++
++ Status WaitForDictionaryReadFinished() {
++@@ -1615,7 +1674,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++
++ Future<> WaitForMetadatas(const std::vector<int>& indices) {
++ std::vector<io::ReadRange> ranges;
++- AddMetadataRanges(indices, &ranges);
+++ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
++ return metadata_cache_->WaitFor(std::move(ranges));
++ }
++
++@@ -1659,12 +1718,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ const flatbuf::RecordBatch* batch,
++ IpcReadContext context, io::RandomAccessFile* file,
++ std::shared_ptr<io::RandomAccessFile> owned_file,
++- int64_t block_data_offset)
+++ int64_t block_data_offset, int64_t block_data_length)
++ : schema(std::move(sch)),
++ context(std::move(context)),
++ file(file),
++ owned_file(std::move(owned_file)),
++- loader(batch, context.metadata_version, context.options, block_data_offset),
+++ loader(batch, context.metadata_version, context.options, block_data_offset,
+++ block_data_length),
++ columns(schema->num_fields()),
++ cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
++ length(batch->length()) {}
++@@ -1763,14 +1823,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
++ return dictionary_load_finished_.Then([message_fut] { return message_fut; })
++ .Then([this, index](const std::shared_ptr<Message>& message_obj)
++ -> Future<std::shared_ptr<RecordBatch>> {
++- FileBlock block = GetRecordBatchBlock(index);
+++ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
++ ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
++ ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
++ ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
++
++ auto read_context = std::make_shared<CachedRecordBatchReadContext>(
++ schema_, batch, std::move(context), file_, owned_file_,
++- block.offset + static_cast<int64_t>(block.metadata_length));
+++ block.offset + static_cast<int64_t>(block.metadata_length),
+++ block.body_length);
++ RETURN_NOT_OK(read_context->CalculateLoadRequest());
++ return read_context->ReadAsync().Then(
++ [read_context] { return read_context->CreateRecordBatch(); });
++@@ -1958,25 +2019,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
++ WholeIpcFileRecordBatchGenerator::operator()() {
++ auto state = state_;
++ if (!read_dictionaries_.is_valid()) {
++- std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
++- for (int i = 0; i < state->num_dictionaries(); i++) {
++- auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
++- messages[i] = ReadBlock(block);
++- }
++- auto read_messages = All(std::move(messages));
++- if (executor_) read_messages = executor_->Transfer(read_messages);
++- read_dictionaries_ = read_messages.Then(
++- [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
++- -> Status {
++- ARROW_ASSIGN_OR_RAISE(auto messages,
++- arrow::internal::UnwrapOrRaise(maybe_messages));
++- return ReadDictionaries(state.get(), std::move(messages));
++- });
+++ if (state->dictionary_load_finished_.is_valid()) {
+++ // PreBufferMetadata has started reading dictionaries in the background
+++ read_dictionaries_ = state->dictionary_load_finished_;
+++ } else {
+++ // Start reading dictionaries
+++ std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
+++ for (int i = 0; i < state->num_dictionaries(); i++) {
+++ ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
+++ messages[i] = ReadBlock(block);
+++ }
+++ auto read_messages = All(std::move(messages));
+++ if (executor_) read_messages = executor_->Transfer(read_messages);
+++ read_dictionaries_ = read_messages.Then(
+++ [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
+++ -> Status {
+++ ARROW_ASSIGN_OR_RAISE(auto messages,
+++ arrow::internal::UnwrapOrRaise(maybe_messages));
+++ return state->ReadDictionaries(messages);
+++ });
+++ }
++ }
++ if (index_ >= state_->num_record_batches()) {
++ return Future<Item>::MakeFinished(IterationTraits<Item>::End());
++ }
++- auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+++ ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
++ auto read_message = ReadBlock(block);
++ auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
++ // Force transfer. This may be wasteful in some cases, but ensures we get off the
++@@ -2012,16 +2079,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
++ }
++ }
++
++-Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
++- RecordBatchFileReaderImpl* state,
++- std::vector<std::shared_ptr<Message>> dictionary_messages) {
++- IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
++- for (const auto& message : dictionary_messages) {
++- RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
++- }
++- return Status::OK();
++-}
++-
++ Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
++ RecordBatchFileReaderImpl* state, Message* message) {
++ CHECK_HAS_BODY(*message);
++@@ -2598,23 +2655,37 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
++ return st;
++ }
++
+++Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
+++ if (batch.batch) {
+++ RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
+++ }
+++ // XXX do something with custom metadata?
+++ return Status::OK();
+++}
+++
++ } // namespace
++
+++IpcReadOptions FuzzingOptions() {
+++ IpcReadOptions options;
+++ options.memory_pool = default_memory_pool();
+++ options.max_recursion_depth = 256;
+++ return options;
+++}
+++
++ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++ auto buffer = std::make_shared<Buffer>(data, size);
++ io::BufferReader buffer_reader(buffer);
++
++- std::shared_ptr<RecordBatchReader> batch_reader;
++- ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
+++ ARROW_ASSIGN_OR_RAISE(auto batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
++ Status st;
++
++ while (true) {
++- std::shared_ptr<arrow::RecordBatch> batch;
++- RETURN_NOT_OK(batch_reader->ReadNext(&batch));
++- if (batch == nullptr) {
+++ ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
+++ if (!batch.batch && !batch.custom_metadata) {
+++ // EOS
++ break;
++ }
++- st &= ValidateFuzzBatch(*batch);
+++ st &= ValidateFuzzBatch(batch);
++ }
++
++ return st;
++@@ -2622,19 +2693,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
++
++ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
++ auto buffer = std::make_shared<Buffer>(data, size);
++- io::BufferReader buffer_reader(buffer);
++
++- std::shared_ptr<RecordBatchFileReader> batch_reader;
++- ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchFileReader::Open(&buffer_reader));
++- Status st;
+++ Status final_status;
+++
+++ auto do_read = [&](bool pre_buffer) {
+++ io::BufferReader buffer_reader(buffer);
+++ ARROW_ASSIGN_OR_RAISE(auto batch_reader,
+++ RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
+++ if (pre_buffer) {
+++ // Pre-buffer all record batches
+++ RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
+++ }
++
++- const int n_batches = batch_reader->num_record_batches();
++- for (int i = 0; i < n_batches; ++i) {
++- ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
++- st &= ValidateFuzzBatch(*batch);
+++ const int n_batches = batch_reader->num_record_batches();
+++ for (int i = 0; i < n_batches; ++i) {
+++ RecordBatchWithMetadata batch;
+++ auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
+++ final_status &= st;
+++ if (!st.ok()) {
+++ continue;
+++ }
+++ final_status &= ValidateFuzzBatch(batch);
+++ }
+++ return Status::OK();
+++ };
+++
+++ for (const bool pre_buffer : {false, true}) {
+++ final_status &= do_read(pre_buffer);
++ }
++
++- return st;
+++ return final_status;
++ }
++
++ Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
++diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
++index 87c02e2..3a632fe 100644
++--- a/cpp/src/arrow/ipc/test_common.cc
+++++ b/cpp/src/arrow/ipc/test_common.cc
++@@ -16,6 +16,7 @@
++ // under the License.
++
++ #include <algorithm>
+++#include <concepts>
++ #include <cstdint>
++ #include <functional>
++ #include <memory>
++@@ -362,19 +363,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
++ return builder.Finish(out);
++ }
++
++-template <class BuilderType>
++-static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
++- MemoryPool* pool,
++- std::shared_ptr<Array>* out) {
++- BuilderType builder(pool);
+++template <std::derived_from<ArrayBuilder> BuilderType>
+++static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
+++ BuilderType builder, int64_t length, bool include_nulls) {
+++ if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
+++ // Try to emit several variadic buffers by choosing a small block size.
+++ builder.SetBlockSize(512);
+++ }
++ for (int64_t i = 0; i < length; ++i) {
++ if (include_nulls && (i % 7 == 0)) {
++ RETURN_NOT_OK(builder.AppendNull());
++ } else {
++- RETURN_NOT_OK(builder.Append(std::to_string(i)));
+++ // Make sure that some strings are long enough to have non-inline binary views
+++ const auto base = std::to_string(i);
+++ std::string value;
+++ for (int64_t j = 0; j < 3 * (i % 10); ++j) {
+++ value += base;
+++ }
+++ RETURN_NOT_OK(builder.Append(value));
++ }
++ }
++- return builder.Finish(out);
+++ return builder.Finish();
++ }
++
++ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
++@@ -384,22 +393,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
++ ArrayVector arrays;
++ FieldVector fields;
++
++- auto AppendColumn = [&](auto& MakeArray) {
++- arrays.emplace_back();
++- RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
++-
++- const auto& type = arrays.back()->type();
++- fields.push_back(field(type->ToString(), type));
+++ auto AppendColumn = [&](auto builder) {
+++ ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
+++ std::move(builder), length, with_nulls));
+++ arrays.push_back(array);
+++ fields.push_back(field(array->type()->ToString(), array->type()));
++ return Status::OK();
++ };
++
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
+++ auto pool = default_memory_pool();
+++ RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
++ if (with_view_types) {
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
++- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
+++ RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
+++ RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
++ }
++
++ *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
++diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
++index 5b1331a..42e83f6 100644
++--- a/cpp/src/arrow/type.h
+++++ b/cpp/src/arrow/type.h
++@@ -2494,6 +2494,16 @@ constexpr bool HasValidityBitmap(Type::type id) {
++ }
++ }
++
+++constexpr bool has_variadic_buffers(Type::type id) {
+++ switch (id) {
+++ case Type::BINARY_VIEW:
+++ case Type::STRING_VIEW:
+++ return true;
+++ default:
+++ return false;
+++ }
+++}
+++
++ ARROW_EXPORT
++ std::string ToString(Type::type id);
++
++diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
++index ffe78be..841d503 100644
++--- a/cpp/src/arrow/util/int_util_overflow.h
+++++ b/cpp/src/arrow/util/int_util_overflow.h
++@@ -18,7 +18,9 @@
++ #pragma once
++
++ #include <cstdint>
+++#include <initializer_list>
++ #include <limits>
+++#include <optional>
++ #include <type_traits>
++
++ #include "arrow/status.h"
++@@ -114,5 +116,36 @@ SignedInt SafeLeftShift(SignedInt u, Shift shift) {
++ return static_cast<SignedInt>(static_cast<UnsignedInt>(u) << shift);
++ }
++
+++// Convenience functions over an arbitrary number of arguments
+++template <typename Int>
+++std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
+++ if (vs.size() == 0) {
+++ return {};
+++ }
+++ auto it = vs.begin();
+++ Int v = *it++;
+++ while (it != vs.end()) {
+++ if (ARROW_PREDICT_FALSE(AddWithOverflow(v, *it++, &v))) {
+++ return {};
+++ }
+++ }
+++ return v;
+++}
+++
+++template <typename Int>
+++std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
+++ if (vs.size() == 0) {
+++ return {};
+++ }
+++ auto it = vs.begin();
+++ Int v = *it++;
+++ while (it != vs.end()) {
+++ if (ARROW_PREDICT_FALSE(MultiplyWithOverflow(v, *it++, &v))) {
+++ return {};
+++ }
+++ }
+++ return v;
+++}
+++
++ } // namespace internal
++ } // namespace arrow
++diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
++index 7217c10..cffa4e9 100644
++--- a/cpp/src/arrow/util/int_util_test.cc
+++++ b/cpp/src/arrow/util/int_util_test.cc
++@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
++ this->CheckOk(almost_min, almost_max + T{2}, T{1});
++ }
++
+++TEST(AddWithOverflow, Variadic) {
+++ ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
+++ ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
+++ ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
+++ ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
+++}
+++
+++TEST(MultiplyWithOverflow, Variadic) {
+++ ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
+++ ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
+++ ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
+++ ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
+++}
+++
++ } // namespace internal
++ } // namespace arrow
++--
++2.45.4
+
- if (internal::HasValidityBitmap(type_id, metadata_version_)) {
- // Extract null_bitmap which is common to all arrays except for unions
- // and nulls.
-@@ -300,6 +320,7 @@ class ArrayLoader {
- }
- buffer_index_++;
- }
-+
- return Status::OK();
- }
-
-@@ -398,14 +419,9 @@ class ArrayLoader {
- Status Visit(const BinaryViewType& type) {
- out_->buffers.resize(2);
-
-- RETURN_NOT_OK(LoadCommon(type.id()));
-- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[1]));
--
-- ARROW_ASSIGN_OR_RAISE(auto data_buffer_count,
-- GetVariadicCount(variadic_count_index_++));
-- out_->buffers.resize(data_buffer_count + 2);
-- for (int64_t i = 0; i < data_buffer_count; ++i) {
-- RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i + 2]));
-+ RETURN_NOT_OK(LoadCommon(type.id())); // also initializes variadic buffers
-+ for (int64_t i = 1; i < static_cast<int64_t>(out_->buffers.size()); ++i) {
-+ RETURN_NOT_OK(GetBuffer(buffer_index_++, &out_->buffers[i]));
- }
- return Status::OK();
- }
-@@ -503,6 +519,7 @@ class ArrayLoader {
- const MetadataVersion metadata_version_;
- io::RandomAccessFile* file_;
- int64_t file_offset_;
-+ std::optional<int64_t> file_length_;
- int max_recursion_depth_;
- int buffer_index_ = 0;
- int field_index_ = 0;
-@@ -1173,8 +1190,19 @@ namespace {
-
- // Common functions used in both the random-access file reader and the
- // asynchronous generator
--inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
-- return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
-+Result<FileBlock> FileBlockFromFlatbuffer(const flatbuf::Block* fb_block,
-+ int64_t max_offset) {
-+ auto block =
-+ FileBlock{fb_block->offset(), fb_block->metaDataLength(), fb_block->bodyLength()};
-+ if (block.metadata_length < 0 || block.body_length < 0 || block.offset < 0) {
-+ return Status::IOError("Invalid Block in IPC file footer");
-+ }
-+ auto block_end =
-+ AddWithOverflow<int64_t>({block.offset, block.metadata_length, block.body_length});
-+ if (!block_end.has_value() || block_end > max_offset) {
-+ return Status::IOError("Invalid Block in IPC file footer");
-+ }
-+ return block;
- }
-
- Status CheckAligned(const FileBlock& block) {
-@@ -1362,8 +1390,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- read_options, file, schema, &inclusion_mask);
- };
- }
-- ARROW_ASSIGN_OR_RAISE(auto message,
-- ReadMessageFromBlock(GetRecordBatchBlock(i), fields_loader));
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(block, fields_loader));
-
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
-@@ -1379,8 +1407,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- Result<int64_t> CountRows() override {
- int64_t total = 0;
- for (int i = 0; i < num_record_batches(); i++) {
-- ARROW_ASSIGN_OR_RAISE(auto outer_message,
-- ReadMessageFromBlock(GetRecordBatchBlock(i)));
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(auto outer_message, ReadMessageFromBlock(block));
- auto metadata = outer_message->metadata();
- const flatbuf::Message* message = nullptr;
- RETURN_NOT_OK(
-@@ -1494,13 +1522,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Status DoPreBufferMetadata(const std::vector<int>& indices) {
- RETURN_NOT_OK(CacheMetadata(indices));
-- EnsureDictionaryReadStarted();
-+ RETURN_NOT_OK(EnsureDictionaryReadStarted());
- Future<> all_metadata_ready = WaitForMetadatas(indices);
- for (int index : indices) {
- Future<std::shared_ptr<Message>> metadata_loaded =
- all_metadata_ready.Then([this, index]() -> Result<std::shared_ptr<Message>> {
- stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
-- FileBlock block = GetRecordBatchBlock(index);
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
- ARROW_ASSIGN_OR_RAISE(
- std::shared_ptr<Buffer> metadata,
- metadata_cache_->Read({block.offset, block.metadata_length}));
-@@ -1549,12 +1577,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- }
- };
-
-- FileBlock GetRecordBatchBlock(int i) const {
-- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
-+ Result<FileBlock> GetRecordBatchBlock(int i) const {
-+ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), footer_offset_);
- }
-
-- FileBlock GetDictionaryBlock(int i) const {
-- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
-+ Result<FileBlock> GetDictionaryBlock(int i) const {
-+ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i), footer_offset_);
- }
-
- Result<std::unique_ptr<Message>> ReadMessageFromBlock(
-@@ -1567,16 +1595,26 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Status ReadDictionaries() {
- // Read all the dictionaries
-+ std::vector<std::shared_ptr<Message>> messages(num_dictionaries());
-+ for (int i = 0; i < num_dictionaries(); ++i) {
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
-+ ARROW_ASSIGN_OR_RAISE(messages[i], ReadMessageFromBlock(block));
-+ }
-+ return ReadDictionaries(messages);
-+ }
-+
-+ Status ReadDictionaries(
-+ const std::vector<std::shared_ptr<Message>>& dictionary_messages) {
-+ DCHECK_EQ(dictionary_messages.size(), static_cast<size_t>(num_dictionaries()));
- IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
- for (int i = 0; i < num_dictionaries(); ++i) {
-- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessageFromBlock(GetDictionaryBlock(i)));
-- RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
-- stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
-+ RETURN_NOT_OK(ReadOneDictionary(i, dictionary_messages[i].get(), context));
- }
- return Status::OK();
- }
-
-- Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
-+ Status ReadOneDictionary(int dict_index, Message* message,
-+ const IpcReadContext& context) {
- CHECK_HAS_BODY(*message);
- ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
- DictionaryKind kind;
-@@ -1586,44 +1624,48 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- } else if (kind == DictionaryKind::Delta) {
- stats_.num_dictionary_deltas.fetch_add(1, std::memory_order_relaxed);
- }
-+ stats_.num_dictionary_batches.fetch_add(1, std::memory_order_relaxed);
- return Status::OK();
- }
-
-- void AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
-+ Status AddDictionaryRanges(std::vector<io::ReadRange>* ranges) const {
- // Adds all dictionaries to the range cache
- for (int i = 0; i < num_dictionaries(); ++i) {
-- FileBlock block = GetDictionaryBlock(i);
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetDictionaryBlock(i));
- ranges->push_back({block.offset, block.metadata_length + block.body_length});
- }
-+ return Status::OK();
- }
-
-- void AddMetadataRanges(const std::vector<int>& indices,
-- std::vector<io::ReadRange>* ranges) {
-+ Status AddMetadataRanges(const std::vector<int>& indices,
-+ std::vector<io::ReadRange>* ranges) {
- for (int index : indices) {
-- FileBlock block = GetRecordBatchBlock(static_cast<int>(index));
-+ ARROW_ASSIGN_OR_RAISE(FileBlock block, GetRecordBatchBlock(index));
- ranges->push_back({block.offset, block.metadata_length});
- }
-+ return Status::OK();
- }
-
- Status CacheMetadata(const std::vector<int>& indices) {
- std::vector<io::ReadRange> ranges;
- if (!read_dictionaries_) {
-- AddDictionaryRanges(&ranges);
-+ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
- }
-- AddMetadataRanges(indices, &ranges);
-+ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
- return metadata_cache_->Cache(std::move(ranges));
- }
-
-- void EnsureDictionaryReadStarted() {
-+ Status EnsureDictionaryReadStarted() {
- if (!dictionary_load_finished_.is_valid()) {
- read_dictionaries_ = true;
- std::vector<io::ReadRange> ranges;
-- AddDictionaryRanges(&ranges);
-+ RETURN_NOT_OK(AddDictionaryRanges(&ranges));
- dictionary_load_finished_ =
- metadata_cache_->WaitFor(std::move(ranges)).Then([this] {
- return ReadDictionaries();
- });
- }
-+ return Status::OK();
- }
-
- Status WaitForDictionaryReadFinished() {
-@@ -1641,7 +1683,7 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
-
- Future<> WaitForMetadatas(const std::vector<int>& indices) {
- std::vector<io::ReadRange> ranges;
-- AddMetadataRanges(indices, &ranges);
-+ RETURN_NOT_OK(AddMetadataRanges(indices, &ranges));
- return metadata_cache_->WaitFor(std::move(ranges));
- }
-
-@@ -1685,12 +1727,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- const flatbuf::RecordBatch* batch,
- IpcReadContext context, io::RandomAccessFile* file,
- std::shared_ptr<io::RandomAccessFile> owned_file,
-- int64_t block_data_offset)
-+ int64_t block_data_offset, int64_t block_data_length)
- : schema(std::move(sch)),
- context(std::move(context)),
- file(file),
- owned_file(std::move(owned_file)),
-- loader(batch, context.metadata_version, context.options, block_data_offset),
-+ loader(batch, context.metadata_version, context.options, block_data_offset,
-+ block_data_length),
- columns(schema->num_fields()),
- cache(file, file->io_context(), io::CacheOptions::LazyDefaults()),
- length(batch->length()) {}
-@@ -1789,14 +1832,15 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
- return dictionary_load_finished_.Then([message_fut] { return message_fut; })
- .Then([this, index](const std::shared_ptr<Message>& message_obj)
- -> Future<std::shared_ptr<RecordBatch>> {
-- FileBlock block = GetRecordBatchBlock(index);
-+ ARROW_ASSIGN_OR_RAISE(auto block, GetRecordBatchBlock(index));
- ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
- ARROW_ASSIGN_OR_RAISE(auto batch, GetBatchFromMessage(message));
- ARROW_ASSIGN_OR_RAISE(auto context, GetIpcReadContext(message, batch));
-
- auto read_context = std::make_shared<CachedRecordBatchReadContext>(
- schema_, batch, std::move(context), file_, owned_file_,
-- block.offset + static_cast<int64_t>(block.metadata_length));
-+ block.offset + static_cast<int64_t>(block.metadata_length),
-+ block.body_length);
- RETURN_NOT_OK(read_context->CalculateLoadRequest());
- return read_context->ReadAsync().Then(
- [read_context] { return read_context->CreateRecordBatch(); });
-@@ -1915,25 +1959,31 @@ Future<WholeIpcFileRecordBatchGenerator::Item>
- WholeIpcFileRecordBatchGenerator::operator()() {
- auto state = state_;
- if (!read_dictionaries_.is_valid()) {
-- std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
-- for (int i = 0; i < state->num_dictionaries(); i++) {
-- auto block = FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
-- messages[i] = ReadBlock(block);
-- }
-- auto read_messages = All(std::move(messages));
-- if (executor_) read_messages = executor_->Transfer(read_messages);
-- read_dictionaries_ = read_messages.Then(
-- [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-- -> Status {
-- ARROW_ASSIGN_OR_RAISE(auto messages,
-- arrow::internal::UnwrapOrRaise(maybe_messages));
-- return ReadDictionaries(state.get(), std::move(messages));
-- });
-+ if (state->dictionary_load_finished_.is_valid()) {
-+ // PreBufferMetadata has started reading dictionaries in the background
-+ read_dictionaries_ = state->dictionary_load_finished_;
-+ } else {
-+ // Start reading dictionaries
-+ std::vector<Future<std::shared_ptr<Message>>> messages(state->num_dictionaries());
-+ for (int i = 0; i < state->num_dictionaries(); i++) {
-+ ARROW_ASSIGN_OR_RAISE(auto block, state->GetDictionaryBlock(i));
-+ messages[i] = ReadBlock(block);
-+ }
-+ auto read_messages = All(std::move(messages));
-+ if (executor_) read_messages = executor_->Transfer(read_messages);
-+ read_dictionaries_ = read_messages.Then(
-+ [=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-+ -> Status {
-+ ARROW_ASSIGN_OR_RAISE(auto messages,
-+ arrow::internal::UnwrapOrRaise(maybe_messages));
-+ return state->ReadDictionaries(messages);
-+ });
-+ }
- }
- if (index_ >= state_->num_record_batches()) {
- return Future<Item>::MakeFinished(IterationTraits<Item>::End());
- }
-- auto block = FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
-+ ARROW_ASSIGN_OR_RAISE(auto block, state->GetRecordBatchBlock(index_++));
- auto read_message = ReadBlock(block);
- auto read_messages = read_dictionaries_.Then([read_message]() { return read_message; });
- // Force transfer. This may be wasteful in some cases, but ensures we get off the
-@@ -1969,16 +2019,6 @@ Future<std::shared_ptr<Message>> WholeIpcFileRecordBatchGenerator::ReadBlock(
- }
- }
-
--Status WholeIpcFileRecordBatchGenerator::ReadDictionaries(
-- RecordBatchFileReaderImpl* state,
-- std::vector<std::shared_ptr<Message>> dictionary_messages) {
-- IpcReadContext context(&state->dictionary_memo_, state->options_, state->swap_endian_);
-- for (const auto& message : dictionary_messages) {
-- RETURN_NOT_OK(state->ReadOneDictionary(message.get(), context));
-- }
-- return Status::OK();
--}
--
- Result<std::shared_ptr<RecordBatch>> WholeIpcFileRecordBatchGenerator::ReadRecordBatch(
- RecordBatchFileReaderImpl* state, Message* message) {
- CHECK_HAS_BODY(*message);
-@@ -2630,6 +2670,14 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
- return st;
- }
-
-+Status ValidateFuzzBatch(const RecordBatchWithMetadata& batch) {
-+ if (batch.batch) {
-+ RETURN_NOT_OK(ValidateFuzzBatch(*batch.batch));
-+ }
-+ // XXX do something with custom metadata?
-+ return Status::OK();
-+}
-+
- IpcReadOptions FuzzingOptions() {
- IpcReadOptions options;
- options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
-@@ -2648,12 +2696,12 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
- Status st;
-
- while (true) {
-- std::shared_ptr<arrow::RecordBatch> batch;
-- RETURN_NOT_OK(batch_reader->ReadNext(&batch));
-- if (batch == nullptr) {
-+ ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadNext());
-+ if (!batch.batch && !batch.custom_metadata) {
-+ // EOS
- break;
- }
-- st &= ValidateFuzzBatch(*batch);
-+ st &= ValidateFuzzBatch(batch);
- }
-
- return st;
-@@ -2661,20 +2709,36 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
-
- Status FuzzIpcFile(const uint8_t* data, int64_t size) {
- auto buffer = std::make_shared<Buffer>(data, size);
-- io::BufferReader buffer_reader(buffer);
-
-- std::shared_ptr<RecordBatchFileReader> batch_reader;
-- ARROW_ASSIGN_OR_RAISE(batch_reader,
-- RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
-- Status st;
-+ Status final_status;
-
-- const int n_batches = batch_reader->num_record_batches();
-- for (int i = 0; i < n_batches; ++i) {
-- ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->ReadRecordBatch(i));
-- st &= ValidateFuzzBatch(*batch);
-+ auto do_read = [&](bool pre_buffer) {
-+ io::BufferReader buffer_reader(buffer);
-+ ARROW_ASSIGN_OR_RAISE(auto batch_reader,
-+ RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
-+ if (pre_buffer) {
-+ // Pre-buffer all record batches
-+ RETURN_NOT_OK(batch_reader->PreBufferMetadata(/*indices=*/{}));
-+ }
-+
-+ const int n_batches = batch_reader->num_record_batches();
-+ for (int i = 0; i < n_batches; ++i) {
-+ RecordBatchWithMetadata batch;
-+ auto st = batch_reader->ReadRecordBatchWithCustomMetadata(i).Value(&batch);
-+ final_status &= st;
-+ if (!st.ok()) {
-+ continue;
-+ }
-+ final_status &= ValidateFuzzBatch(batch);
-+ }
-+ return Status::OK();
-+ };
-+
-+ for (const bool pre_buffer : {false, true}) {
-+ final_status &= do_read(pre_buffer);
- }
-
-- return st;
-+ return final_status;
- }
-
- Status FuzzIpcTensorStream(const uint8_t* data, int64_t size) {
-diff --git a/cpp/src/arrow/ipc/test_common.cc b/cpp/src/arrow/ipc/test_common.cc
-index 02e6b816c0b..ceca6d9e434 100644
---- a/cpp/src/arrow/ipc/test_common.cc
-+++ b/cpp/src/arrow/ipc/test_common.cc
-@@ -16,6 +16,7 @@
- // under the License.
-
- #include <algorithm>
-+#include <concepts>
- #include <cstdint>
- #include <functional>
- #include <memory>
-@@ -368,19 +369,27 @@ Status MakeRandomStringArray(int64_t length, bool include_nulls, MemoryPool* poo
- return builder.Finish(out);
- }
-
--template <class BuilderType>
--static Status MakeBinaryArrayWithUniqueValues(int64_t length, bool include_nulls,
-- MemoryPool* pool,
-- std::shared_ptr<Array>* out) {
-- BuilderType builder(pool);
-+template <std::derived_from<ArrayBuilder> BuilderType>
-+static Result<std::shared_ptr<Array>> MakeBinaryArrayWithUniqueValues(
-+ BuilderType builder, int64_t length, bool include_nulls) {
-+ if constexpr (std::is_base_of_v<BinaryViewBuilder, BuilderType>) {
-+ // Try to emit several variadic buffers by choosing a small block size.
-+ builder.SetBlockSize(512);
-+ }
- for (int64_t i = 0; i < length; ++i) {
- if (include_nulls && (i % 7 == 0)) {
- RETURN_NOT_OK(builder.AppendNull());
- } else {
-- RETURN_NOT_OK(builder.Append(std::to_string(i)));
-+ // Make sure that some strings are long enough to have non-inline binary views
-+ const auto base = std::to_string(i);
-+ std::string value;
-+ for (int64_t j = 0; j < 3 * (i % 10); ++j) {
-+ value += base;
-+ }
-+ RETURN_NOT_OK(builder.Append(value));
- }
- }
-- return builder.Finish(out);
-+ return builder.Finish();
- }
-
- Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_nulls,
-@@ -390,22 +399,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out, bool with_n
- ArrayVector arrays;
- FieldVector fields;
-
-- auto AppendColumn = [&](auto& MakeArray) {
-- arrays.emplace_back();
-- RETURN_NOT_OK(MakeArray(length, with_nulls, default_memory_pool(), &arrays.back()));
--
-- const auto& type = arrays.back()->type();
-- fields.push_back(field(type->ToString(), type));
-+ auto AppendColumn = [&](auto builder) {
-+ ARROW_ASSIGN_OR_RAISE(auto array, MakeBinaryArrayWithUniqueValues(
-+ std::move(builder), length, with_nulls));
-+ arrays.push_back(array);
-+ fields.push_back(field(array->type()->ToString(), array->type()));
- return Status::OK();
- };
-
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeStringBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<LargeBinaryBuilder>));
-+ auto pool = default_memory_pool();
-+ RETURN_NOT_OK(AppendColumn(StringBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(BinaryBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(LargeStringBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(LargeBinaryBuilder(pool)));
- if (with_view_types) {
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<StringViewBuilder>));
-- RETURN_NOT_OK(AppendColumn(MakeBinaryArrayWithUniqueValues<BinaryViewBuilder>));
-+ RETURN_NOT_OK(AppendColumn(StringViewBuilder(pool)));
-+ RETURN_NOT_OK(AppendColumn(BinaryViewBuilder(pool)));
- }
-
- *out = RecordBatch::Make(schema(std::move(fields)), length, std::move(arrays));
-diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
-index f68d2dcb619..e3582056ead 100644
---- a/cpp/src/arrow/type.h
-+++ b/cpp/src/arrow/type.h
-@@ -2575,6 +2575,16 @@ constexpr bool may_have_validity_bitmap(Type::type id) {
- }
- }
-
-+constexpr bool has_variadic_buffers(Type::type id) {
-+ switch (id) {
-+ case Type::BINARY_VIEW:
-+ case Type::STRING_VIEW:
-+ return true;
-+ default:
-+ return false;
-+ }
-+}
-+
- ARROW_DEPRECATED("Deprecated in 17.0.0. Use may_have_validity_bitmap() instead.")
- constexpr bool HasValidityBitmap(Type::type id) { return may_have_validity_bitmap(id); }
-
-diff --git a/cpp/src/arrow/util/int_util_overflow.h b/cpp/src/arrow/util/int_util_overflow.h
-index 93066fecafa..69714a935a4 100644
---- a/cpp/src/arrow/util/int_util_overflow.h
-+++ b/cpp/src/arrow/util/int_util_overflow.h
-@@ -18,7 +18,9 @@
- #pragma once
-
- #include <cstdint>
-+#include <initializer_list>
- #include <limits>
-+#include <optional>
- #include <type_traits>
-
- #include "arrow/status.h"
-@@ -162,6 +164,37 @@ NON_GENERIC_OPS_WITH_OVERFLOW(DivideWithOverflow)
- #undef NON_GENERIC_OPS_WITH_OVERFLOW
- #undef NON_GENERIC_OP_WITH_OVERFLOW
-
-+// Convenience functions over an arbitrary number of arguments
-+template <typename Int>
-+std::optional<Int> AddWithOverflow(std::initializer_list<Int> vs) {
-+ if (vs.size() == 0) {
-+ return {};
-+ }
-+ auto it = vs.begin();
-+ Int v = *it++;
-+ while (it != vs.end()) {
-+ if (ARROW_PREDICT_FALSE(AddWithOverflowGeneric(v, *it++, &v))) {
-+ return {};
-+ }
-+ }
-+ return v;
-+}
-+
-+template <typename Int>
-+std::optional<Int> MultiplyWithOverflow(std::initializer_list<Int> vs) {
-+ if (vs.size() == 0) {
-+ return {};
-+ }
-+ auto it = vs.begin();
-+ Int v = *it++;
-+ while (it != vs.end()) {
-+ if (ARROW_PREDICT_FALSE(MultiplyWithOverflowGeneric(v, *it++, &v))) {
-+ return {};
-+ }
-+ }
-+ return v;
-+}
-+
- // Define function NegateWithOverflow with the signature `bool(T u, T* out)`
- // where T is a signed integer type. On overflow, these functions return true.
- // Otherwise, false is returned and `out` is updated with the result of the
-diff --git a/cpp/src/arrow/util/int_util_test.cc b/cpp/src/arrow/util/int_util_test.cc
-index 7217c1097e4..cffa4e9d15e 100644
---- a/cpp/src/arrow/util/int_util_test.cc
-+++ b/cpp/src/arrow/util/int_util_test.cc
-@@ -649,5 +649,23 @@ TYPED_TEST(TestAddWithOverflow, Basics) {
- this->CheckOk(almost_min, almost_max + T{2}, T{1});
- }
-
-+TEST(AddWithOverflow, Variadic) {
-+ ASSERT_EQ(AddWithOverflow<int>({}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow({1, 2, 3}), 6);
-+ ASSERT_EQ(AddWithOverflow<int8_t>({1, 2, 125}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow<int8_t>({125, 2, 1}), std::nullopt);
-+ ASSERT_EQ(AddWithOverflow<int16_t>({1, 2, 125}), 128);
-+ ASSERT_EQ(AddWithOverflow<int16_t>({125, 2, 1}), 128);
-+}
-+
-+TEST(MultiplyWithOverflow, Variadic) {
-+ ASSERT_EQ(MultiplyWithOverflow<int>({}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow({1, 2, 3, 4}), 24);
-+ ASSERT_EQ(MultiplyWithOverflow<int8_t>({2, 2, 32}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow<int8_t>({32, 4, 1}), std::nullopt);
-+ ASSERT_EQ(MultiplyWithOverflow<int16_t>({2, 2, 32}), 128);
-+ ASSERT_EQ(MultiplyWithOverflow<int16_t>({32, 4, 1}), 128);
-+}
-+
- } // namespace internal
- } // namespace arrow
Verdict❌ CHANGES REQUESTED — Please address the issues flagged above. |
mfrw
left a comment
There was a problem hiding this comment.
NAK - the patch has some delta w.r.t upstream
| explicit ArrayLoader(const flatbuf::RecordBatch* metadata, | ||
| MetadataVersion metadata_version, const IpcReadOptions& options, | ||
| - io::RandomAccessFile* file) | ||
| + io::RandomAccessFile* file, std::optional<int64_t> file_length) |
There was a problem hiding this comment.
In the reference patch - this constructor is not modified.
why are we doing this ?
The warnings persist because they are false positives from GCC's conservative analysis of the nested [optional<shared_ptr<>>] fix ensures the code is correct and safe—the warnings are non-fatal and don't affect the build or functionality. This is expected behavior with GCC 13.2.0. |






Merge Checklist
All boxes should be checked before merging the PR (just tick any boxes which don't apply to this PR)
*-staticsubpackages, etc.) have had theirReleasetag incremented../cgmanifest.json,./toolkit/scripts/toolchain/cgmanifest.json,.github/workflows/cgmanifest.json)./LICENSES-AND-NOTICES/SPECS/data/licenses.json,./LICENSES-AND-NOTICES/SPECS/LICENSES-MAP.md,./LICENSES-AND-NOTICES/SPECS/LICENSE-EXCEPTIONS.PHOTON)*.signatures.jsonfilessudo make go-tidy-allandsudo make go-test-coveragepassSummary
What does the PR accomplish, why was it needed?
Patch libarrow for CVE-2026-25087
Astrolabe Patch Reference: apache/arrow#48925
Upstream patch reference: https://patch-diff.githubusercontent.com/raw/apache/arrow/pull/48925.patch
Patch Modified: Yes
reader.cc file has modified to align with upstream patch
Change Log
Does this affect the toolchain?
NO
Associated issues
Links to CVEs
Test Methodology
Installation:


Uninstallation:
