diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 35c312f60..6e80adde1 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -92,6 +92,7 @@ set(ICEBERG_SOURCES update/update_properties.cc update/update_schema.cc update/update_sort_order.cc + update/update_statistics.cc util/bucket_util.cc util/content_file_util.cc util/conversions.cc diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 7e2652d6c..91e64b37a 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -191,6 +191,8 @@ constexpr std::string_view kActionSetSnapshotRef = "set-snapshot-ref"; constexpr std::string_view kActionSetProperties = "set-properties"; constexpr std::string_view kActionRemoveProperties = "remove-properties"; constexpr std::string_view kActionSetLocation = "set-location"; +constexpr std::string_view kActionSetStatistics = "set-statistics"; +constexpr std::string_view kActionRemoveStatistics = "remove-statistics"; // TableUpdate field constants constexpr std::string_view kUUID = "uuid"; @@ -1399,6 +1401,18 @@ nlohmann::json ToJson(const TableUpdate& update) { json[kLocation] = u.location(); break; } + case TableUpdate::Kind::kSetStatistics: { + const auto& u = internal::checked_cast(update); + json[kAction] = kActionSetStatistics; + json[kStatistics] = ToJson(*u.statistics_file()); + break; + } + case TableUpdate::Kind::kRemoveStatistics: { + const auto& u = internal::checked_cast(update); + json[kAction] = kActionRemoveStatistics; + json[kSnapshotId] = u.snapshot_id(); + break; + } } return json; } @@ -1579,6 +1593,17 @@ Result> TableUpdateFromJson(const nlohmann::json& j ICEBERG_ASSIGN_OR_RAISE(auto location, GetJsonValue(json, kLocation)); return std::make_unique(std::move(location)); } + if (action == kActionSetStatistics) { + ICEBERG_ASSIGN_OR_RAISE(auto statistics_json, + GetJsonValue(json, kStatistics)); + ICEBERG_ASSIGN_OR_RAISE(auto statistics_file, + StatisticsFileFromJson(statistics_json)); + return std::make_unique(std::move(statistics_file)); + } + if (action == kActionRemoveStatistics) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue(json, kSnapshotId)); + return std::make_unique(snapshot_id); + } return JsonParseError("Unknown table update action: {}", action); } diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 317b4fa9e..f7307e822 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -110,6 +110,7 @@ iceberg_sources = files( 'update/update_properties.cc', 'update/update_schema.cc', 'update/update_sort_order.cc', + 'update/update_statistics.cc', 'util/bucket_util.cc', 'util/content_file_util.cc', 'util/conversions.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 5c406debc..c87bf1816 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -35,6 +35,7 @@ #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" +#include "iceberg/update/update_statistics.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -199,6 +200,13 @@ Result> Table::NewUpdateLocation() { return transaction->NewUpdateLocation(); } +Result> Table::NewUpdateStatistics() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdateStatistics(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index fd346e15a..750ab39df 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -152,6 +152,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewExpireSnapshots(); + /// \brief Create a new UpdateStatistics to update the table statistics and commit the + /// changes. + virtual Result> NewUpdateStatistics(); + /// \brief Create a new UpdateLocation to update the table location and commit the /// changes. virtual Result> NewUpdateLocation(); diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 7e357bbae..f51826fdf 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -45,6 +45,7 @@ #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/sort_order.h" +#include "iceberg/statistics_file.h" #include "iceberg/table_properties.h" #include "iceberg/table_update.h" #include "iceberg/util/checked_cast.h" @@ -620,6 +621,8 @@ class TableMetadataBuilder::Impl { Status RemoveRef(const std::string& name); Status RemoveSnapshots(const std::vector& snapshot_ids); Status RemovePartitionSpecs(const std::vector& spec_ids); + Status SetStatistics(const std::shared_ptr& statistics_file); + Status RemoveStatistics(int64_t snapshot_id); Result> Build(); @@ -1038,6 +1041,7 @@ void TableMetadataBuilder::Impl::SetLocation(std::string_view location) { } metadata_.location = std::string(location); changes_.push_back(std::make_unique(std::string(location))); + return; } Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapshot) { @@ -1173,6 +1177,40 @@ Status TableMetadataBuilder::Impl::SetRef(const std::string& name, return {}; } +Status TableMetadataBuilder::Impl::SetStatistics( + const std::shared_ptr& statistics_file) { + ICEBERG_PRECHECK(statistics_file != nullptr, "Cannot set null statistics file"); + + // Find and replace existing statistics for the same snapshot_id, or add new one + auto it = std::ranges::find_if( + metadata_.statistics, + [snapshot_id = statistics_file->snapshot_id](const auto& stat) { + return stat && stat->snapshot_id == snapshot_id; + }); + + if (it != metadata_.statistics.end()) { + *it = statistics_file; + } else { + metadata_.statistics.push_back(statistics_file); + } + + changes_.push_back(std::make_unique(std::move(statistics_file))); + return {}; +} + +Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) { + auto removed_count = + std::erase_if(metadata_.statistics, [snapshot_id](const auto& stat) { + return stat && stat->snapshot_id == snapshot_id; + }); + if (removed_count == 0) { + return {}; + } + + changes_.push_back(std::make_unique(snapshot_id)); + return {}; +} + std::unordered_set TableMetadataBuilder::Impl::IntermediateSnapshotIdSet( int64_t current_snapshot_id) const { std::unordered_set added_snapshot_ids; @@ -1589,12 +1627,14 @@ TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() { } TableMetadataBuilder& TableMetadataBuilder::SetStatistics( - const std::shared_ptr& statistics_file) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + std::shared_ptr statistics_file) { + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetStatistics(statistics_file)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveStatistics(snapshot_id)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics( diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index a4165b814..6473d7cf3 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -399,8 +399,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// /// \param statistics_file The statistics file to set /// \return Reference to this builder for method chaining - TableMetadataBuilder& SetStatistics( - const std::shared_ptr& statistics_file); + TableMetadataBuilder& SetStatistics(std::shared_ptr statistics_file); /// \brief Remove table statistics by snapshot ID /// diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 7a01bdee7..c797ce147 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -22,6 +22,7 @@ #include "iceberg/exception.h" #include "iceberg/schema.h" #include "iceberg/sort_order.h" +#include "iceberg/statistics_file.h" #include "iceberg/table_metadata.h" #include "iceberg/table_requirements.h" @@ -446,4 +447,56 @@ std::unique_ptr SetLocation::Clone() const { return std::make_unique(location_); } +// SetStatistics + +int64_t SetStatistics::snapshot_id() const { return statistics_file_->snapshot_id; } + +void SetStatistics::ApplyTo(TableMetadataBuilder& builder) const { + builder.SetStatistics(statistics_file_); +} + +void SetStatistics::GenerateRequirements(TableUpdateContext& context) const { + // SetStatistics doesn't generate any requirements +} + +bool SetStatistics::Equals(const TableUpdate& other) const { + if (other.kind() != Kind::kSetStatistics) { + return false; + } + const auto& other_set = static_cast(other); + if (!statistics_file_ != !other_set.statistics_file_) { + return false; + } + if (statistics_file_ && !(*statistics_file_ == *other_set.statistics_file_)) { + return false; + } + return true; +} + +std::unique_ptr SetStatistics::Clone() const { + return std::make_unique(statistics_file_); +} + +// RemoveStatistics + +void RemoveStatistics::ApplyTo(TableMetadataBuilder& builder) const { + builder.RemoveStatistics(snapshot_id_); +} + +void RemoveStatistics::GenerateRequirements(TableUpdateContext& context) const { + // RemoveStatistics doesn't generate any requirements +} + +bool RemoveStatistics::Equals(const TableUpdate& other) const { + if (other.kind() != Kind::kRemoveStatistics) { + return false; + } + const auto& other_remove = static_cast(other); + return snapshot_id_ == other_remove.snapshot_id_; +} + +std::unique_ptr RemoveStatistics::Clone() const { + return std::make_unique(snapshot_id_); +} + } // namespace iceberg::table diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 3c9c9dbbc..5bbc243ef 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -59,6 +59,8 @@ class ICEBERG_EXPORT TableUpdate { kSetProperties, kRemoveProperties, kSetLocation, + kSetStatistics, + kRemoveStatistics, }; virtual ~TableUpdate(); @@ -509,6 +511,53 @@ class ICEBERG_EXPORT SetLocation : public TableUpdate { std::string location_; }; +/// \brief Represents setting statistics for a snapshot +class ICEBERG_EXPORT SetStatistics : public TableUpdate { + public: + explicit SetStatistics(std::shared_ptr statistics_file) + : statistics_file_(std::move(statistics_file)) {} + + int64_t snapshot_id() const; + + const std::shared_ptr& statistics_file() const { + return statistics_file_; + } + + void ApplyTo(TableMetadataBuilder& builder) const override; + + void GenerateRequirements(TableUpdateContext& context) const override; + + Kind kind() const override { return Kind::kSetStatistics; } + + bool Equals(const TableUpdate& other) const override; + + std::unique_ptr Clone() const override; + + private: + std::shared_ptr statistics_file_; +}; + +/// \brief Represents removing statistics for a snapshot +class ICEBERG_EXPORT RemoveStatistics : public TableUpdate { + public: + explicit RemoveStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {} + + int64_t snapshot_id() const { return snapshot_id_; } + + void ApplyTo(TableMetadataBuilder& builder) const override; + + void GenerateRequirements(TableUpdateContext& context) const override; + + Kind kind() const override { return Kind::kRemoveStatistics; } + + bool Equals(const TableUpdate& other) const override; + + std::unique_ptr Clone() const override; + + private: + int64_t snapshot_id_; +}; + } // namespace table } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index d243a48bf..2661360d3 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -177,7 +177,8 @@ if(ICEBERG_BUILD_BUNDLE) update_partition_spec_test.cc update_properties_test.cc update_schema_test.cc - update_sort_order_test.cc) + update_sort_order_test.cc + update_statistics_test.cc) add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc) diff --git a/src/iceberg/test/json_internal_test.cc b/src/iceberg/test/json_internal_test.cc index 0b5f4f59b..c285afaba 100644 --- a/src/iceberg/test/json_internal_test.cc +++ b/src/iceberg/test/json_internal_test.cc @@ -31,6 +31,7 @@ #include "iceberg/snapshot.h" #include "iceberg/sort_field.h" #include "iceberg/sort_order.h" +#include "iceberg/statistics_file.h" #include "iceberg/table_requirement.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" @@ -508,6 +509,54 @@ TEST(JsonInternalTest, TableUpdateSetLocation) { EXPECT_EQ(*internal::checked_cast(parsed.value().get()), update); } +TEST(JsonInternalTest, TableUpdateSetStatistics) { + auto stats_file = std::make_shared(); + stats_file->snapshot_id = 123456789; + stats_file->path = "s3://bucket/warehouse/table/metadata/stats-123456789.puffin"; + stats_file->file_size_in_bytes = 1024; + stats_file->file_footer_size_in_bytes = 128; + stats_file->blob_metadata = {BlobMetadata{.type = "ndv", + .source_snapshot_id = 123456789, + .source_snapshot_sequence_number = 1, + .fields = {1, 2}, + .properties = {{"prop1", "value1"}}}}; + + table::SetStatistics update(stats_file); + nlohmann::json expected = R"({ + "action": "set-statistics", + "statistics": { + "snapshot-id": 123456789, + "statistics-path": "s3://bucket/warehouse/table/metadata/stats-123456789.puffin", + "file-size-in-bytes": 1024, + "file-footer-size-in-bytes": 128, + "blob-metadata": [{ + "type": "ndv", + "snapshot-id": 123456789, + "sequence-number": 1, + "fields": [1, 2], + "properties": {"prop1": "value1"} + }] + } + })"_json; + + EXPECT_EQ(ToJson(update), expected); + auto parsed = TableUpdateFromJson(expected); + ASSERT_THAT(parsed, IsOk()); + EXPECT_EQ(*internal::checked_cast(parsed.value().get()), update); +} + +TEST(JsonInternalTest, TableUpdateRemoveStatistics) { + table::RemoveStatistics update(123456789); + nlohmann::json expected = + R"({"action":"remove-statistics","snapshot-id":123456789})"_json; + + EXPECT_EQ(ToJson(update), expected); + auto parsed = TableUpdateFromJson(expected); + ASSERT_THAT(parsed, IsOk()); + EXPECT_EQ(*internal::checked_cast(parsed.value().get()), + update); +} + TEST(JsonInternalTest, TableUpdateUnknownAction) { nlohmann::json json = R"({"action":"unknown-action"})"_json; auto result = TableUpdateFromJson(json); diff --git a/src/iceberg/test/update_statistics_test.cc b/src/iceberg/test/update_statistics_test.cc new file mode 100644 index 000000000..7b9d4f582 --- /dev/null +++ b/src/iceberg/test/update_statistics_test.cc @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_statistics.h" + +#include +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/statistics_file.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" + +namespace iceberg { + +class UpdateStatisticsTest : public UpdateTestBase { + protected: + // Helper function to create a statistics file + std::shared_ptr MakeStatisticsFile(int64_t snapshot_id, + const std::string& path, + int64_t file_size = 1024, + int64_t footer_size = 128) { + auto stats_file = std::make_shared(); + stats_file->snapshot_id = snapshot_id; + stats_file->path = path; + stats_file->file_size_in_bytes = file_size; + stats_file->file_footer_size_in_bytes = footer_size; + + BlobMetadata blob; + blob.type = "apache-datasketches-theta-v1"; + blob.source_snapshot_id = snapshot_id; + blob.source_snapshot_sequence_number = 1; + blob.fields = {1, 2}; + blob.properties = {{"ndv", "100"}}; + stats_file->blob_metadata.push_back(blob); + + return stats_file; + } + + // Helper to find statistics file by snapshot_id in the result vector + std::shared_ptr FindStatistics( + const std::vector>>& to_set, + int64_t snapshot_id) { + auto it = std::ranges::find_if( + to_set, [snapshot_id](const auto& p) { return p.first == snapshot_id; }); + return it != to_set.end() ? it->second : nullptr; + } +}; + +TEST_F(UpdateStatisticsTest, EmptyUpdate) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_TRUE(result.to_remove.empty()); +} + +TEST_F(UpdateStatisticsTest, SetStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + update->SetStatistics(stats_file); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file); +} + +TEST_F(UpdateStatisticsTest, SetMultipleStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file_1 = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + auto stats_file_2 = + MakeStatisticsFile(2, "/warehouse/test_table/metadata/stats-2.puffin"); + + update->SetStatistics(stats_file_1).SetStatistics(stats_file_2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 2); + EXPECT_TRUE(result.to_remove.empty()); + EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file_1); + EXPECT_EQ(FindStatistics(result.to_set, 2), stats_file_2); +} + +TEST_F(UpdateStatisticsTest, RemoveStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + update->RemoveStatistics(1); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_THAT(result.to_remove, ::testing::Contains(1)); +} + +TEST_F(UpdateStatisticsTest, RemoveMultipleStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + update->RemoveStatistics(1).RemoveStatistics(2).RemoveStatistics(3); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 3); + EXPECT_THAT(result.to_remove, ::testing::UnorderedElementsAre(1, 2, 3)); +} + +TEST_F(UpdateStatisticsTest, SetAndRemoveDifferentSnapshots) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + + update->SetStatistics(stats_file).RemoveStatistics(2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_THAT(result.to_remove, ::testing::Contains(2)); +} + +TEST_F(UpdateStatisticsTest, ReplaceStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file_1 = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1a.puffin"); + auto stats_file_2 = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1b.puffin", 2048, 256); + + // Set statistics for snapshot 1, then replace it + update->SetStatistics(stats_file_1).SetStatistics(stats_file_2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + // Should have the second one (replacement) + EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file_2); + EXPECT_NE(FindStatistics(result.to_set, 1), stats_file_1); +} + +TEST_F(UpdateStatisticsTest, SetThenRemoveSameSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + + // Set statistics for snapshot 1, then remove it + update->SetStatistics(stats_file).RemoveStatistics(1); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_THAT(result.to_remove, ::testing::Contains(1)); +} + +TEST_F(UpdateStatisticsTest, RemoveThenSetSameSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + + // Remove statistics for snapshot 1, then set new ones + update->RemoveStatistics(1).SetStatistics(stats_file); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file); +} + +TEST_F(UpdateStatisticsTest, SetNullStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + + update->SetStatistics(nullptr); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Statistics file cannot be null")); +} + +TEST_F(UpdateStatisticsTest, CommitSuccess) { + // Test empty commit + ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateStatistics()); + EXPECT_THAT(empty_update->Commit(), IsOk()); + + // Reload table after first commit + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + + // Get a valid snapshot ID from the table + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + int64_t snapshot_id = current_snapshot->snapshot_id; + + // Test commit with statistics changes + ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(snapshot_id, "/warehouse/test_table/metadata/stats-1.puffin"); + update->SetStatistics(stats_file); + + EXPECT_THAT(update->Commit(), IsOk()); + + // Verify the statistics were committed and persisted + ICEBERG_UNWRAP_OR_FAIL(auto final_table, catalog_->LoadTable(table_ident_)); + const auto& statistics = final_table->metadata()->statistics; + EXPECT_EQ(statistics.size(), 1); + EXPECT_EQ(*statistics[0], *stats_file); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 10a87e653..e294c0378 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -24,6 +24,7 @@ #include "iceberg/catalog.h" #include "iceberg/schema.h" +#include "iceberg/statistics_file.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -38,6 +39,7 @@ #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" #include "iceberg/update/update_sort_order.h" +#include "iceberg/update/update_statistics.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" @@ -189,6 +191,17 @@ Status Transaction::Apply(PendingUpdate& update) { ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply()); metadata_builder_->SetLocation(location); } break; + case PendingUpdate::Kind::kUpdateStatistics: { + auto& update_statistics = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply()); + // Apply statistics changes to the metadata builder + for (auto&& [_, stat_file] : result.to_set) { + metadata_builder_->SetStatistics(std::move(stat_file)); + } + for (const auto& snapshot_id : result.to_remove) { + metadata_builder_->RemoveStatistics(snapshot_id); + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -293,4 +306,11 @@ Result> Transaction::NewUpdateLocation() { return update_location; } +Result> Transaction::NewUpdateStatistics() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, + UpdateStatistics::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_statistics)); + return update_statistics; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 7133a3b5d..e4cffc3a2 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewExpireSnapshots(); + /// \brief Create a new UpdateStatistics to update table statistics and commit the + /// changes. + Result> NewUpdateStatistics(); + /// \brief Create a new UpdateLocation to update the table location and commit the /// changes. Result> NewUpdateLocation(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 251334c14..53425d0d7 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -195,6 +195,7 @@ class UpdatePartitionSpec; class UpdateProperties; class UpdateSchema; class UpdateSortOrder; +class UpdateStatistics; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 3387fd11a..76a9e038f 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -25,6 +25,7 @@ install_headers( 'update_schema.h', 'update_sort_order.h', 'update_properties.h', + 'update_statistics.h', ], subdir: 'iceberg/update', ) diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 441d086a8..3ba25cca9 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -49,6 +49,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateSchema, kUpdateSnapshot, kUpdateSortOrder, + kUpdateStatistics, }; /// \brief Return the kind of this pending update. diff --git a/src/iceberg/update/update_statistics.cc b/src/iceberg/update/update_statistics.cc new file mode 100644 index 000000000..461453369 --- /dev/null +++ b/src/iceberg/update/update_statistics.cc @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_statistics.h" + +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/statistics_file.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> UpdateStatistics::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateStatistics without a transaction"); + return std::shared_ptr(new UpdateStatistics(std::move(transaction))); +} + +UpdateStatistics::UpdateStatistics(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +UpdateStatistics::~UpdateStatistics() = default; + +UpdateStatistics& UpdateStatistics::SetStatistics( + std::shared_ptr statistics_file) { + ICEBERG_BUILDER_CHECK(statistics_file != nullptr, "Statistics file cannot be null"); + statistics_to_set_[statistics_file->snapshot_id] = std::move(statistics_file); + return *this; +} + +UpdateStatistics& UpdateStatistics::RemoveStatistics(int64_t snapshot_id) { + statistics_to_set_[snapshot_id] = nullptr; + return *this; +} + +Result UpdateStatistics::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + ApplyResult result; + for (const auto& [snapshot_id, stats] : statistics_to_set_) { + if (stats) { + result.to_set.emplace_back(snapshot_id, stats); + } else { + result.to_remove.push_back(snapshot_id); + } + } + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_statistics.h b/src/iceberg/update/update_statistics.h new file mode 100644 index 000000000..55e50fb1b --- /dev/null +++ b/src/iceberg/update/update_statistics.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_statistics.h +/// \brief Updates table statistics. + +namespace iceberg { + +/// \brief Updates table statistics. +class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdateStatistics() override; + + /// \brief Set statistics file for a snapshot. + /// + /// Associates a statistics file with a snapshot ID. If statistics already exist + /// for this snapshot, they will be replaced. + /// + /// \param statistics_file The statistics file to set + /// \return Reference to this UpdateStatistics for chaining + UpdateStatistics& SetStatistics(std::shared_ptr statistics_file); + + /// \brief Remove statistics for a snapshot. + /// + /// Marks the statistics for the given snapshot ID for removal. + /// + /// \param snapshot_id The snapshot ID whose statistics to remove + /// \return Reference to this UpdateStatistics for chaining + UpdateStatistics& RemoveStatistics(int64_t snapshot_id); + + Kind kind() const final { return Kind::kUpdateStatistics; } + + struct ApplyResult { + std::vector>> to_set; + std::vector to_remove; + }; + + Result Apply(); + + private: + explicit UpdateStatistics(std::shared_ptr transaction); + + std::unordered_map> statistics_to_set_; +}; + +} // namespace iceberg