diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 3414a862e..28e0b8d51 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -109,6 +109,7 @@ add_iceberg_test(util_test SOURCES bucket_util_test.cc config_test.cc + data_file_set_test.cc decimal_test.cc endian_test.cc formatter_test.cc diff --git a/src/iceberg/test/data_file_set_test.cc b/src/iceberg/test/data_file_set_test.cc new file mode 100644 index 000000000..60539adfa --- /dev/null +++ b/src/iceberg/test/data_file_set_test.cc @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/data_file_set.h" + +#include + +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/row/partition_values.h" + +namespace iceberg { + +class DataFileSetTest : public ::testing::Test { + protected: + std::shared_ptr CreateDataFile(const std::string& path, int64_t size = 100) { + auto file = std::make_shared(); + file->file_path = path; + file->file_format = FileFormatType::kParquet; + file->file_size_in_bytes = size; + file->record_count = 10; + file->content = DataFile::Content::kData; + return file; + } +}; + +TEST_F(DataFileSetTest, EmptySet) { + DataFileSet set; + EXPECT_TRUE(set.empty()); + EXPECT_EQ(set.size(), 0); + EXPECT_EQ(set.begin(), set.end()); + EXPECT_TRUE(set.as_span().empty()); +} + +TEST_F(DataFileSetTest, InsertSingleFile) { + DataFileSet set; + auto file = CreateDataFile("/path/to/file.parquet"); + + auto [iter, inserted] = set.insert(file); + EXPECT_TRUE(inserted); + EXPECT_EQ(*iter, file); + EXPECT_FALSE(set.empty()); + EXPECT_EQ(set.size(), 1); +} + +TEST_F(DataFileSetTest, InsertDuplicateFile) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file.parquet"); + auto file2 = CreateDataFile("/path/to/file.parquet"); // Same path + + auto [iter1, inserted1] = set.insert(file1); + EXPECT_TRUE(inserted1); + + auto [iter2, inserted2] = set.insert(file2); + EXPECT_FALSE(inserted2); + EXPECT_EQ(iter1, iter2); // Should point to the same element + EXPECT_EQ(set.size(), 1); // Should still be size 1 +} + +TEST_F(DataFileSetTest, InsertDifferentFiles) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file3.parquet"); + + set.insert(file1); + set.insert(file2); + set.insert(file3); + + EXPECT_EQ(set.size(), 3); + EXPECT_FALSE(set.empty()); +} + +TEST_F(DataFileSetTest, InsertionOrderPreserved) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file3.parquet"); + + set.insert(file1); + set.insert(file2); + set.insert(file3); + + // Iterate and verify order + std::vector paths; + for (const auto& file : set) { + paths.push_back(file->file_path); + } + + EXPECT_EQ(paths.size(), 3); + EXPECT_EQ(paths[0], "/path/to/file1.parquet"); + EXPECT_EQ(paths[1], "/path/to/file2.parquet"); + EXPECT_EQ(paths[2], "/path/to/file3.parquet"); +} + +TEST_F(DataFileSetTest, AsSpan) { + DataFileSet set; + EXPECT_TRUE(set.as_span().empty()); + + // Single element + auto file0 = CreateDataFile("/path/to/file0.parquet"); + set.insert(file0); + { + auto span = set.as_span(); + EXPECT_EQ(span.size(), 1); + EXPECT_EQ(span[0]->file_path, "/path/to/file0.parquet"); + EXPECT_EQ(span[0], file0); // Same pointer, span is a view + } + + // Multiple elements + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + set.insert(file1); + set.insert(file2); + + auto span = set.as_span(); + EXPECT_EQ(span.size(), 3); + EXPECT_EQ(span[0]->file_path, "/path/to/file0.parquet"); + EXPECT_EQ(span[1]->file_path, "/path/to/file1.parquet"); + EXPECT_EQ(span[2]->file_path, "/path/to/file2.parquet"); + + // Span matches set iteration order and identity + size_t i = 0; + for (const auto& file : set) { + EXPECT_EQ(span[i], file) << "Span element " << i << " should match set iterator"; + ++i; + } + EXPECT_EQ(i, span.size()); + + // Span works with range-for + i = 0; + for (const auto& file : span) { + EXPECT_EQ(file->file_path, span[i]->file_path); + ++i; + } + EXPECT_EQ(i, 3); + + set.clear(); + EXPECT_TRUE(set.as_span().empty()); +} + +TEST_F(DataFileSetTest, InsertDuplicatePreservesOrder) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file1.parquet"); // Duplicate of file1 + + set.insert(file1); + set.insert(file2); + set.insert(file3); // Should not insert, but order should be preserved + + EXPECT_EQ(set.size(), 2); + + std::vector paths; + for (const auto& file : set) { + paths.push_back(file->file_path); + } + + EXPECT_EQ(paths[0], "/path/to/file1.parquet"); + EXPECT_EQ(paths[1], "/path/to/file2.parquet"); +} + +TEST_F(DataFileSetTest, InsertNullFile) { + DataFileSet set; + std::shared_ptr null_file = nullptr; + + auto [iter, inserted] = set.insert(null_file); + EXPECT_FALSE(inserted); + EXPECT_EQ(iter, set.end()); + EXPECT_TRUE(set.empty()); + EXPECT_EQ(set.size(), 0); +} + +TEST_F(DataFileSetTest, InsertMoveSemantics) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + + // Insert using move + auto [iter1, inserted1] = set.insert(std::move(file1)); + EXPECT_TRUE(inserted1); + EXPECT_EQ(file1, nullptr); // Should be moved + + // Insert using copy + auto [iter2, inserted2] = set.insert(file2); + EXPECT_TRUE(inserted2); + EXPECT_NE(file2, nullptr); // Should still be valid + + EXPECT_EQ(set.size(), 2); +} + +TEST_F(DataFileSetTest, Clear) { + DataFileSet set; + set.insert(CreateDataFile("/path/to/file1.parquet")); + set.insert(CreateDataFile("/path/to/file2.parquet")); + + EXPECT_EQ(set.size(), 2); + set.clear(); + EXPECT_TRUE(set.empty()); + EXPECT_EQ(set.size(), 0); + EXPECT_EQ(set.begin(), set.end()); +} + +TEST_F(DataFileSetTest, IteratorOperations) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file1.parquet"); + auto file2 = CreateDataFile("/path/to/file2.parquet"); + auto file3 = CreateDataFile("/path/to/file3.parquet"); + + set.insert(file1); + set.insert(file2); + set.insert(file3); + + // Test const iterators + const auto& const_set = set; + EXPECT_NE(const_set.begin(), const_set.end()); + EXPECT_NE(const_set.cbegin(), const_set.cend()); + + // Test iterator increment + auto it = set.begin(); + EXPECT_EQ((*it)->file_path, "/path/to/file1.parquet"); + ++it; + EXPECT_EQ((*it)->file_path, "/path/to/file2.parquet"); + ++it; + EXPECT_EQ((*it)->file_path, "/path/to/file3.parquet"); + ++it; + EXPECT_EQ(it, set.end()); +} + +TEST_F(DataFileSetTest, RangeBasedForLoop) { + DataFileSet set; + set.insert(CreateDataFile("/path/to/file1.parquet")); + set.insert(CreateDataFile("/path/to/file2.parquet")); + set.insert(CreateDataFile("/path/to/file3.parquet")); + + int count = 0; + for (const auto& file : set) { + EXPECT_NE(file, nullptr); + ++count; + } + EXPECT_EQ(count, 3); +} + +TEST_F(DataFileSetTest, CaseSensitivePaths) { + DataFileSet set; + auto file1 = CreateDataFile("/path/to/file.parquet"); + auto file2 = CreateDataFile("/path/to/FILE.parquet"); // Different case + + set.insert(file1); + set.insert(file2); + + // Should be treated as different files + EXPECT_EQ(set.size(), 2); +} + +TEST_F(DataFileSetTest, MultipleInsertsSameFile) { + DataFileSet set; + auto file = CreateDataFile("/path/to/file.parquet"); + + // Insert the same file multiple times + set.insert(file); + set.insert(file); + set.insert(file); + + EXPECT_EQ(set.size(), 1); +} + +} // namespace iceberg diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index 791340be7..5e3007c4a 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -84,6 +84,7 @@ iceberg_tests = { 'sources': files( 'bucket_util_test.cc', 'config_test.cc', + 'data_file_set_test.cc', 'decimal_test.cc', 'endian_test.cc', 'formatter_test.cc', diff --git a/src/iceberg/update/fast_append.cc b/src/iceberg/update/fast_append.cc index c7f66f2fb..3c132a407 100644 --- a/src/iceberg/update/fast_append.cc +++ b/src/iceberg/update/fast_append.cc @@ -20,7 +20,6 @@ #include "iceberg/update/fast_append.h" #include -#include #include #include "iceberg/constants.h" @@ -198,10 +197,8 @@ Result> FastAppend::WriteNewManifests() { if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) { for (const auto& [spec_id, data_files] : new_data_files_by_spec_) { ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id)); - std::vector> files; - files.reserve(data_files.size()); - std::ranges::copy(data_files, std::back_inserter(files)); - ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, + WriteDataManifests(data_files.as_span(), spec)); new_manifests_.insert(new_manifests_.end(), std::make_move_iterator(written_manifests.begin()), std::make_move_iterator(written_manifests.end())); diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h index 87887c74d..7f5cbb097 100644 --- a/src/iceberg/update/fast_append.h +++ b/src/iceberg/update/fast_append.h @@ -30,7 +30,7 @@ #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/update/snapshot_update.h" -#include "iceberg/util/content_file_util.h" +#include "iceberg/util/data_file_set.h" namespace iceberg { diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 2bbb2d506..38c5129f4 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -30,7 +30,6 @@ #include "iceberg/manifest/manifest_writer.h" #include "iceberg/manifest/rolling_manifest_writer.h" #include "iceberg/partition_summary_internal.h" -#include "iceberg/snapshot.h" #include "iceberg/table.h" #include "iceberg/transaction.h" #include "iceberg/util/macros.h" @@ -166,10 +165,10 @@ SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction) // TODO(xxx): write manifests in parallel Result> SnapshotUpdate::WriteDataManifests( - const std::vector>& data_files, + std::span> files, const std::shared_ptr& spec, std::optional data_sequence_number) { - if (data_files.empty()) { + if (files.empty()) { return std::vector{}; } @@ -185,7 +184,7 @@ Result> SnapshotUpdate::WriteDataManifests( }, target_manifest_size_bytes_); - for (const auto& file : data_files) { + for (const auto& file : files) { ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number)); } ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); @@ -194,9 +193,9 @@ Result> SnapshotUpdate::WriteDataManifests( // TODO(xxx): write manifests in parallel Result> SnapshotUpdate::WriteDeleteManifests( - const std::vector>& delete_files, + std::span> files, const std::shared_ptr& spec) { - if (delete_files.empty()) { + if (files.empty()) { return std::vector{}; } @@ -211,9 +210,9 @@ Result> SnapshotUpdate::WriteDeleteManifests( }, target_manifest_size_bytes_); - for (const auto& file : delete_files) { - /// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with - /// file->data_sequenece_number + for (const auto& file : files) { + // FIXME: Java impl wrap it with `PendingDeleteFile` and deals with + // file->data_sequence_number ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file)); } ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index f31327fcd..12c3b19dc 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -22,13 +22,13 @@ #include #include #include +#include #include #include #include #include #include "iceberg/iceberg_export.h" -#include "iceberg/manifest/manifest_list.h" #include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" @@ -103,24 +103,22 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { /// \brief Write data manifests for the given data files /// - /// \param data_files The data files to write + /// \param files Data files to write /// \param spec The partition spec to use /// \param data_sequence_number Optional data sequence number for the files /// \return A vector of manifest files - /// TODO(xxx): Change signature to accept iterator begin/end instead of vector to avoid - /// intermediate vector allocations (e.g., from DataFileSet) Result> WriteDataManifests( - const std::vector>& data_files, + std::span> files, const std::shared_ptr& spec, std::optional data_sequence_number = std::nullopt); /// \brief Write delete manifests for the given delete files /// - /// \param delete_files The delete files to write + /// \param files Delete files to write /// \param spec The partition spec to use /// \return A vector of manifest files Result> WriteDeleteManifests( - const std::vector>& delete_files, + std::span> files, const std::shared_ptr& spec); Status SetTargetBranch(const std::string& branch); diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index 95a8d6343..f547716d2 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -22,12 +22,10 @@ /// \file iceberg/util/content_file_util.h /// Utility functions for content files (data files and delete files). -#include #include #include #include #include -#include #include "iceberg/iceberg_export.h" #include "iceberg/manifest/manifest_entry.h" @@ -36,72 +34,6 @@ namespace iceberg { -/// \brief A set of DataFile pointers with insertion order preserved and deduplicated by -/// file path. -class ICEBERG_EXPORT DataFileSet { - public: - using value_type = std::shared_ptr; - using iterator = typename std::vector::iterator; - using const_iterator = typename std::vector::const_iterator; - using difference_type = typename std::vector::difference_type; - - DataFileSet() = default; - - /// \brief Insert a data file into the set. - /// \param file The data file to insert - /// \return A pair with an iterator to the inserted element (or the existing one) and - /// a bool indicating whether insertion took place - std::pair insert(const value_type& file) { return InsertImpl(file); } - - /// \brief Insert a data file into the set (move version). - std::pair insert(value_type&& file) { - return InsertImpl(std::move(file)); - } - - /// \brief Get the number of elements in the set. - size_t size() const { return elements_.size(); } - - /// \brief Check if the set is empty. - bool empty() const { return elements_.empty(); } - - /// \brief Clear all elements from the set. - void clear() { - elements_.clear(); - index_by_path_.clear(); - } - - /// \brief Get iterator to the beginning. - iterator begin() { return elements_.begin(); } - const_iterator begin() const { return elements_.begin(); } - const_iterator cbegin() const { return elements_.cbegin(); } - - /// \brief Get iterator to the end. - iterator end() { return elements_.end(); } - const_iterator end() const { return elements_.end(); } - const_iterator cend() const { return elements_.cend(); } - - private: - std::pair InsertImpl(value_type file) { - if (!file) { - return {elements_.end(), false}; - } - - auto [index_iter, inserted] = - index_by_path_.try_emplace(file->file_path, elements_.size()); - if (!inserted) { - auto pos = static_cast(index_iter->second); - return {elements_.begin() + pos, false}; - } - - elements_.push_back(std::move(file)); - return {std::prev(elements_.end()), true}; - } - - // Vector to preserve insertion order - std::vector elements_; - std::unordered_map index_by_path_; -}; - /// \brief Utility functions for content files. struct ICEBERG_EXPORT ContentFileUtil { /// \brief Check if a delete file is a deletion vector (DV). diff --git a/src/iceberg/util/data_file_set.h b/src/iceberg/util/data_file_set.h new file mode 100644 index 000000000..741b34e56 --- /dev/null +++ b/src/iceberg/util/data_file_set.h @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/util/data_file_set.h +/// A set of DataFile pointers with insertion order preserved and deduplicated by file +/// path. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/util/string_util.h" + +namespace iceberg { + +/// \brief A set of DataFile pointers with insertion order preserved and deduplicated by +/// file path. +class ICEBERG_EXPORT DataFileSet { + public: + using value_type = std::shared_ptr; + using iterator = typename std::vector::iterator; + using const_iterator = typename std::vector::const_iterator; + using difference_type = typename std::vector::difference_type; + + DataFileSet() = default; + + /// \brief Insert a data file into the set. + /// \param file The data file to insert + /// \return A pair with an iterator to the inserted element (or the existing one) and + /// a bool indicating whether insertion took place + std::pair insert(const value_type& file) { return InsertImpl(file); } + + /// \brief Insert a data file into the set (move version). + std::pair insert(value_type&& file) { + return InsertImpl(std::move(file)); + } + + /// \brief Get the number of elements in the set. + size_t size() const { return elements_.size(); } + + /// \brief Check if the set is empty. + bool empty() const { return elements_.empty(); } + + /// \brief Clear all elements from the set. + void clear() { + elements_.clear(); + index_by_path_.clear(); + } + + /// \brief Get iterator to the beginning. + iterator begin() { return elements_.begin(); } + const_iterator begin() const { return elements_.begin(); } + const_iterator cbegin() const { return elements_.cbegin(); } + + /// \brief Get iterator to the end. + iterator end() { return elements_.end(); } + const_iterator end() const { return elements_.end(); } + const_iterator cend() const { return elements_.cend(); } + + /// \brief Get a non-owning view of the data files in insertion order. + std::span as_span() const { return elements_; } + + private: + std::pair InsertImpl(value_type file) { + if (!file) { + return {elements_.end(), false}; + } + + auto [index_iter, inserted] = + index_by_path_.try_emplace(file->file_path, elements_.size()); + if (!inserted) { + auto pos = static_cast(index_iter->second); + return {elements_.begin() + pos, false}; + } + + elements_.push_back(std::move(file)); + return {std::prev(elements_.end()), true}; + } + + // Vector to preserve insertion order + std::vector elements_; + std::unordered_map index_by_path_; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build index 95952bb8b..496a75758 100644 --- a/src/iceberg/util/meson.build +++ b/src/iceberg/util/meson.build @@ -22,6 +22,7 @@ install_headers( 'config.h', 'content_file_util.h', 'conversions.h', + 'data_file_set.h', 'decimal.h', 'endian.h', 'error_collector.h',