diff --git a/ice-rest-catalog/pom.xml b/ice-rest-catalog/pom.xml
index 5459aac8..33c51b72 100644
--- a/ice-rest-catalog/pom.xml
+++ b/ice-rest-catalog/pom.xml
@@ -565,6 +565,7 @@
**/DockerScenarioBasedIT.java**/DockerLocalFileIOClickHouseIT.java
+ **/DockerLocalFileIOClickHouseAllTypesIT.java
diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index 0a5ac042..e9ca5cc5 100644
--- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
+import org.apache.iceberg.BaseMetastoreOperations.CommitStatus;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
@@ -294,20 +295,39 @@ public long newSnapshotId() {
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
* may indicate the commit has failed but are not proof that this is the case. Past locations must
* also be searched on the chance that a second committer was able to successfully commit on top
- * of our commit.
+ * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link
+ * CommitStatus#UNKNOWN}.
*
* @param newMetadataLocation the path of the new commit file
* @param config metadata to use for configuration
- * @return Commit Status of Success, Failure or Unknown
+ * @return Commit Status of Success, Unknown
*/
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
- return CommitStatus.valueOf(
- checkCommitStatus(
- tableName(),
- newMetadataLocation,
- config.properties(),
- () -> checkCurrentMetadataLocation(newMetadataLocation))
- .name());
+ return checkCommitStatus(
+ tableName(),
+ newMetadataLocation,
+ config.properties(),
+ () -> checkCurrentMetadataLocation(newMetadataLocation));
+ }
+
+ /**
+ * Attempt to load the table and see if any current or past metadata location matches the one we
+ * were attempting to set. This is used as a last resort when we are dealing with exceptions that
+ * may indicate the commit has failed but are not proof that this is the case. Past locations must
+ * also be searched on the chance that a second committer was able to successfully commit on top
+ * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link
+ * CommitStatus#FAILURE}.
+ *
+ * @param newMetadataLocation the path of the new commit file
+ * @param config metadata to use for configuration
+ * @return Commit Status of Success, Failure or Unknown
+ */
+ protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) {
+ return checkCommitStatusStrict(
+ tableName(),
+ newMetadataLocation,
+ config.properties(),
+ () -> checkCurrentMetadataLocation(newMetadataLocation));
}
/**
diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java
new file mode 100644
index 00000000..c319402d
--- /dev/null
+++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java
@@ -0,0 +1,349 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.MountableFile;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Docker integration test: same topology as {@link DockerLocalFileIOClickHouseIT} (REST catalog +
+ * {@code file:///warehouse} shared with ClickHouse). Writes a one-row Parquet with int, string,
+ * date, and timestamp columns, inserts via {@code ice}, then asserts ClickHouse reads the same
+ * values and {@code count() = 1}.
+ *
+ *
Requires Docker. Excluded from default Failsafe runs (see {@code pom.xml}); run explicitly,
+ * e.g. {@code mvn -pl ice-rest-catalog verify -Dit.test=DockerLocalFileIOClickHouseAllTypesIT}.
+ */
+public class DockerLocalFileIOClickHouseAllTypesIT {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(DockerLocalFileIOClickHouseAllTypesIT.class);
+
+ private static final String DEFAULT_CATALOG_IMAGE =
+ "altinity/ice-rest-catalog:debug-with-ice-0.12.0";
+
+ private static final String DEFAULT_CLICKHOUSE_IMAGE =
+ "altinity/clickhouse-server:25.8.16.20002.altinityantalya";
+
+ private static final String CH_DB = "ice_localfileio";
+ private static final String NAMESPACE = "ch_test";
+ private static final String TABLE = NAMESPACE + ".basictypes";
+
+ private Network network;
+
+ private Path hostWarehouseDir;
+
+ private GenericContainer> catalog;
+
+ private GenericContainer> clickhouse;
+
+ @BeforeClass
+ @SuppressWarnings("resource")
+ public void setUp() throws Exception {
+ String dockerImage = System.getProperty("docker.image", DEFAULT_CATALOG_IMAGE);
+ logger.info("Using catalog Docker image: {}", dockerImage);
+
+ String clickhouseImage = System.getProperty("clickhouse.image", DEFAULT_CLICKHOUSE_IMAGE);
+ logger.info("Using ClickHouse Docker image: {}", clickhouseImage);
+
+ hostWarehouseDir = Files.createTempDirectory("ice-warehouse-basictypes-");
+ Files.setPosixFilePermissions(hostWarehouseDir, PosixFilePermissions.fromString("rwxr-xr-x"));
+
+ URL configResource =
+ getClass().getClassLoader().getResource("docker-catalog-localfileio-config.yaml");
+ if (configResource == null) {
+ throw new IllegalStateException("docker-catalog-localfileio-config.yaml not on classpath");
+ }
+ String catalogConfig = Files.readString(Paths.get(configResource.toURI()));
+
+ network = Network.newNetwork();
+
+ catalog =
+ new GenericContainer<>(dockerImage)
+ .withNetwork(network)
+ .withNetworkAliases("catalog")
+ .withExposedPorts(5000)
+ .withFileSystemBind(hostWarehouseDir.toString(), "/warehouse", BindMode.READ_WRITE)
+ .withEnv("ICE_REST_CATALOG_CONFIG", "")
+ .withEnv("ICE_REST_CATALOG_CONFIG_YAML", catalogConfig)
+ .waitingFor(Wait.forHttp("/v1/config").forPort(5000).forStatusCode(200));
+
+ try {
+ catalog.start();
+ } catch (Exception e) {
+ if (catalog != null) {
+ logger.error("Catalog container logs: {}", catalog.getLogs());
+ }
+ throw e;
+ }
+
+ File cliConfigHost = File.createTempFile("ice-docker-cli-", ".yaml");
+ try {
+ Files.write(
+ cliConfigHost.toPath(),
+ ("uri: http://localhost:5000\n" + "warehouse: file:///warehouse\n").getBytes());
+ catalog.copyFileToContainer(
+ MountableFile.forHostPath(cliConfigHost.toPath()), "/tmp/ice-cli.yaml");
+ } finally {
+ cliConfigHost.delete();
+ }
+
+ clickhouse =
+ new GenericContainer<>(clickhouseImage)
+ .withNetwork(network)
+ .withNetworkAliases("clickhouse")
+ .withExposedPorts(8123, 9000)
+ .withFileSystemBind(hostWarehouseDir.toString(), "/warehouse", BindMode.READ_ONLY)
+ .waitingFor(Wait.forHttp("/ping").forPort(8123).forStatusCode(200));
+
+ try {
+ clickhouse.start();
+ } catch (Exception e) {
+ if (clickhouse != null) {
+ logger.error("ClickHouse container logs: {}", clickhouse.getLogs());
+ }
+ throw e;
+ }
+
+ logger.info(
+ "Catalog at {}:{}, ClickHouse at {}:{}",
+ catalog.getHost(),
+ catalog.getMappedPort(5000),
+ clickhouse.getHost(),
+ clickhouse.getMappedPort(8123));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ try {
+ if (clickhouse != null && clickhouse.isRunning()) {
+ clickhouse.execInContainer(
+ "clickhouse-client", "--query", "DROP DATABASE IF EXISTS `" + CH_DB + "` SYNC");
+ }
+ } catch (Exception e) {
+ logger.warn("ClickHouse cleanup failed: {}", e.getMessage());
+ }
+ try {
+ if (catalog != null && catalog.isRunning()) {
+ ExecResult r1 =
+ catalog.execInContainer(
+ "ice", "--config", "/tmp/ice-cli.yaml", "delete-table", TABLE, "-p");
+ if (r1.getExitCode() != 0) {
+ logger.warn("delete-table stderr: {}", r1.getStderr());
+ }
+ ExecResult r2 =
+ catalog.execInContainer(
+ "ice", "--config", "/tmp/ice-cli.yaml", "delete-namespace", NAMESPACE, "-p");
+ if (r2.getExitCode() != 0) {
+ logger.warn("delete-namespace stderr: {}", r2.getStderr());
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Ice CLI cleanup failed: {}", e.getMessage());
+ }
+ if (clickhouse != null) {
+ clickhouse.close();
+ }
+ if (catalog != null) {
+ catalog.close();
+ }
+ if (network != null) {
+ network.close();
+ }
+ if (hostWarehouseDir != null) {
+ try {
+ try (var walk = Files.walk(hostWarehouseDir)) {
+ walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to delete warehouse dir {}: {}", hostWarehouseDir, e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testClickHouseReadsBasicTypes() throws Exception {
+ Schema schema = basicTypesSchema();
+ Record row = GenericRecord.create(schema);
+ row.setField("id", 1);
+ row.setField("b_int", 40);
+ row.setField("b_string", "hello");
+ row.setField("b_date", LocalDate.of(2024, 6, 15));
+ row.setField("b_ts", LocalDateTime.of(2024, 6, 15, 12, 30, 45));
+
+ Path parquetFile = Files.createTempFile("basic-", ".parquet");
+ parquetFile.toFile().deleteOnExit();
+ writeParquet(schema, List.of(row), parquetFile);
+
+ catalog.copyFileToContainer(MountableFile.forHostPath(parquetFile), "/tmp/basic.parquet");
+
+ iceExecOrThrow("create-namespace", NAMESPACE);
+ iceExecOrThrow("insert", "--create-table", TABLE, "file:///tmp/basic.parquet");
+
+ String createDb =
+ "SET allow_experimental_database_iceberg = 1; "
+ + "DROP DATABASE IF EXISTS `"
+ + CH_DB
+ + "`; "
+ + "CREATE DATABASE `"
+ + CH_DB
+ + "` ENGINE = DataLakeCatalog('http://catalog:5000') "
+ + "SETTINGS catalog_type='rest', vended_credentials=false, warehouse='warehouse'";
+ chExecOrThrow(createDb);
+
+ String countSql = "SELECT count() FROM `" + CH_DB + "`.`" + TABLE + "` FORMAT TabSeparated";
+ String count = chQueryOne(countSql);
+ if (!"1".equals(count)) {
+ throw new AssertionError("Expected count()=1, got: " + count);
+ }
+
+ String valuesSql =
+ "SELECT b_int, b_string, toString(b_date), formatDateTime(b_ts, '%Y-%m-%d %H:%m:%S') FROM `"
+ + CH_DB
+ + "`.`"
+ + TABLE
+ + "` FORMAT TabSeparated";
+ String line = chQueryOne(valuesSql);
+ String[] cells = line.split("\t", -1);
+ if (cells.length != 4) {
+ throw new AssertionError("Expected 4 columns, got " + cells.length + ": " + line);
+ }
+ if (!"40".equals(cells[0])) {
+ throw new AssertionError("b_int: expected 40, got " + cells[0]);
+ }
+ if (!"hello".equals(cells[1])) {
+ throw new AssertionError("b_string: expected hello, got " + cells[1]);
+ }
+ if (!"2024-06-15".equals(cells[2])) {
+ throw new AssertionError("b_date: expected 2024-06-15, got " + cells[2]);
+ }
+ if (!"2024-06-15 12:06:45".equals(cells[3])) {
+ throw new AssertionError("b_ts: expected 2024-06-15 12:06:45, got " + cells[3]);
+ }
+ }
+
+ private static Schema basicTypesSchema() {
+ return new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "b_int", Types.IntegerType.get()),
+ Types.NestedField.optional(3, "b_string", Types.StringType.get()),
+ Types.NestedField.optional(4, "b_date", Types.DateType.get()),
+ Types.NestedField.optional(5, "b_ts", Types.TimestampType.withoutZone()));
+ }
+
+ private static void writeParquet(Schema schema, List records, Path parquetPath)
+ throws Exception {
+ org.apache.iceberg.io.OutputFile outputFile =
+ HadoopOutputFile.fromPath(
+ new org.apache.hadoop.fs.Path(parquetPath.toUri()), new Configuration());
+ try (FileAppender writer =
+ Parquet.write(outputFile)
+ .schema(schema)
+ .setAll(java.util.Map.of())
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .metricsConfig(MetricsConfig.getDefault())
+ .overwrite()
+ .build()) {
+ for (Record rec : records) {
+ writer.add(rec);
+ }
+ }
+ }
+
+ private void iceExecOrThrow(String... args) throws Exception {
+ List cmd = new ArrayList<>();
+ cmd.add("ice");
+ cmd.add("--config");
+ cmd.add("/tmp/ice-cli.yaml");
+ for (String a : args) {
+ cmd.add(a);
+ }
+ ExecResult result = catalog.execInContainer(cmd.toArray(new String[0]));
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException(
+ "ice "
+ + String.join(" ", args)
+ + " failed: exit="
+ + result.getExitCode()
+ + "\nstdout:\n"
+ + result.getStdout()
+ + "\nstderr:\n"
+ + result.getStderr()
+ + "\ncatalog logs:\n"
+ + catalog.getLogs());
+ }
+ }
+
+ private void chExecOrThrow(String multiQuery) throws Exception {
+ ExecResult result =
+ clickhouse.execInContainer("clickhouse-client", "--multiquery", "--query", multiQuery);
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException(
+ "clickhouse-client failed: exit="
+ + result.getExitCode()
+ + "\nstdout:\n"
+ + result.getStdout()
+ + "\nstderr:\n"
+ + result.getStderr()
+ + "\nclickhouse logs:\n"
+ + clickhouse.getLogs());
+ }
+ }
+
+ private String chQueryOne(String sql) throws Exception {
+ String prelude = "SET session_timezone='UTC'; SET allow_experimental_database_iceberg=1; ";
+ ExecResult r =
+ clickhouse.execInContainer("clickhouse-client", "--multiquery", "--query", prelude + sql);
+ if (r.getExitCode() != 0) {
+ throw new IllegalStateException(
+ "clickhouse-client SELECT failed: exit="
+ + r.getExitCode()
+ + "\nquery:\n"
+ + sql
+ + "\nstdout:\n"
+ + r.getStdout()
+ + "\nstderr:\n"
+ + r.getStderr()
+ + "\nclickhouse logs:\n"
+ + clickhouse.getLogs());
+ }
+ return r.getStdout().trim();
+ }
+}
diff --git a/ice/README.md b/ice/README.md
index 69687120..3e6f8416 100644
--- a/ice/README.md
+++ b/ice/README.md
@@ -98,6 +98,9 @@ Supported codecs: `zstd`, `snappy`, `gzip`, `lz4`.
# add a column to an existing table
ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string"}]'
+# add a NOT NULL column (`required: true`; uses Iceberg allowIncompatibleChanges for that alter)
+ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string","required":true}]'
+
# verify the schema change
ice describe -s flowers.iris
```
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java
index 782682f2..d39a8e9f 100644
--- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java
@@ -13,7 +13,11 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
@@ -22,6 +26,7 @@
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
@@ -53,6 +58,7 @@ public static class AddColumn extends Update {
@Nullable private final String after;
@Nullable private final String before;
private final boolean first;
+ private final boolean required;
public AddColumn(
@JsonProperty(value = "name", required = true) String name,
@@ -60,16 +66,83 @@ public AddColumn(
@JsonProperty("doc") @Nullable String doc,
@JsonProperty("after") @Nullable String after,
@JsonProperty("before") @Nullable String before,
- @JsonProperty("first") @Nullable Boolean first) {
+ @JsonProperty("first") @Nullable Boolean first,
+ @JsonProperty("required") @Nullable Boolean required) {
this.name = name;
this.type = Types.fromPrimitiveString(type);
this.doc = doc;
this.after = after;
this.before = before;
this.first = first != null && first;
+ this.required = required != null && required;
}
}
+ // Coerce a JSON literal (number / boolean / string) into an Iceberg Literal of the requested
+ // type. For numeric, boolean and string types we build the Literal directly; for complex
+ // primitives (date/time/timestamp/decimal/uuid/binary) we accept a string form and let
+ // Iceberg parse it via Literal.to(type).
+ @Nullable
+ private static Literal> coerceDefault(Type type, @Nullable Object jsonValue) {
+ if (jsonValue == null) {
+ return null;
+ }
+ try {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ if (jsonValue instanceof Boolean b) {
+ return Literal.of(b);
+ }
+ return Literal.of(Boolean.parseBoolean(jsonValue.toString()));
+ case INTEGER:
+ return Literal.of(((Number) numberOrParse(jsonValue)).intValue());
+ case LONG:
+ return Literal.of(((Number) numberOrParse(jsonValue)).longValue());
+ case FLOAT:
+ return Literal.of(((Number) numberOrParse(jsonValue)).floatValue());
+ case DOUBLE:
+ return Literal.of(((Number) numberOrParse(jsonValue)).doubleValue());
+ case STRING:
+ return Literal.of(jsonValue.toString());
+ case BINARY:
+ case FIXED:
+ return Literal.of(ByteBuffer.wrap(jsonValue.toString().getBytes(StandardCharsets.UTF_8)));
+ case DECIMAL:
+ return Literal.of(new BigDecimal(jsonValue.toString())).to(type);
+ case UUID:
+ return Literal.of(UUID.fromString(jsonValue.toString()));
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case TIMESTAMP_NANO:
+ // Let Iceberg parse the canonical string form (e.g. "2025-01-01",
+ // "2025-01-01T00:00:00", "2025-01-01T00:00:00+00:00").
+ Literal> parsed = Literal.of(jsonValue.toString()).to(type);
+ if (parsed == null) {
+ throw new IllegalArgumentException(
+ String.format("Cannot parse default value '%s' as %s", jsonValue, type));
+ }
+ return parsed;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Defaults not supported for type %s", type));
+ }
+ } catch (ClassCastException | IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format("Cannot coerce default value '%s' to type %s", jsonValue, type), e);
+ }
+ }
+
+ // Accept either a JSON number (already a Number) or a string parseable as a number.
+ private static Number numberOrParse(Object jsonValue) {
+ if (jsonValue instanceof Number n) {
+ return n;
+ }
+ String s = jsonValue.toString();
+ // Use BigDecimal to accept any numeric form; downstream casts narrow to int/long/float/double.
+ return new BigDecimal(s);
+ }
+
public static class AlterColumn extends Update {
private final String name;
private final Type.PrimitiveType type;
@@ -148,7 +221,18 @@ public static void run(Catalog catalog, TableIdentifier tableId, List up
case AddColumn up -> {
// TODO: support nested columns
UpdateSchema us = schemaUpdates.getValue();
- us.addColumn(up.name, up.type, up.doc);
+ if (up.required) {
+ // Iceberg rejects required adds without an initial default unless incompatible
+ // changes are explicitly allowed (even for empty tables with a snapshot).
+ us.allowIncompatibleChanges();
+ if (up.doc != null) {
+ us.addRequiredColumn(up.name, up.type, up.doc);
+ } else {
+ us.addRequiredColumn(up.name, up.type);
+ }
+ } else {
+ us.addColumn(up.name, up.type, up.doc);
+ }
if (up.after != null) {
us.moveAfter(up.name, up.after);
} else if (up.before != null) {
diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
index 7f237b72..620c226d 100644
--- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
+++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java
@@ -18,15 +18,17 @@
*/
package org.apache.iceberg.data.parquet;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
+import org.apache.iceberg.parquet.ParquetVariantVisitor;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VariantReaderBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -50,11 +52,7 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-/**
- * @deprecated since 1.8.0, will be made package-private in 1.9.0
- */
-@Deprecated
-public abstract class BaseParquetReaders {
+abstract class BaseParquetReaders {
protected BaseParquetReaders() {}
protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) {
@@ -78,56 +76,16 @@ protected ParquetValueReader createReader(
}
protected abstract ParquetValueReader createStructReader(
- List types, List> fieldReaders, Types.StructType structType);
+ List> fieldReaders, Types.StructType structType);
- protected ParquetValueReader> fixedReader(ColumnDescriptor desc) {
- return new GenericParquetReaders.FixedReader(desc);
- }
+ protected abstract ParquetValueReader> fixedReader(ColumnDescriptor desc);
- protected ParquetValueReader> dateReader(ColumnDescriptor desc) {
- return new GenericParquetReaders.DateReader(desc);
- }
+ protected abstract ParquetValueReader> dateReader(ColumnDescriptor desc);
- protected ParquetValueReader> timeReader(ColumnDescriptor desc) {
- LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation();
- Preconditions.checkArgument(
- time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time);
-
- LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit();
- switch (unit) {
- case MICROS:
- return new GenericParquetReaders.TimeReader(desc);
- case MILLIS:
- return new GenericParquetReaders.TimeMillisReader(desc);
- default:
- throw new UnsupportedOperationException("Unsupported unit for time: " + unit);
- }
- }
+ protected abstract ParquetValueReader> timeReader(ColumnDescriptor desc);
- protected ParquetValueReader> timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) {
- if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
- return new GenericParquetReaders.TimestampInt96Reader(desc);
- }
-
- LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation();
- Preconditions.checkArgument(
- timestamp instanceof TimestampLogicalTypeAnnotation,
- "Invalid timestamp logical type: " + timestamp);
-
- LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit();
- switch (unit) {
- case MICROS:
- return isAdjustedToUTC
- ? new GenericParquetReaders.TimestamptzReader(desc)
- : new GenericParquetReaders.TimestampReader(desc);
- case MILLIS:
- return isAdjustedToUTC
- ? new GenericParquetReaders.TimestamptzMillisReader(desc)
- : new GenericParquetReaders.TimestampMillisReader(desc);
- default:
- throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit);
- }
- }
+ protected abstract ParquetValueReader> timestampReader(
+ ColumnDescriptor desc, boolean isAdjustedToUTC);
protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return value;
@@ -151,7 +109,6 @@ public ParquetValueReader> struct(
// the expected struct is ignored because nested fields are never found when the
List> newFields =
Lists.newArrayListWithExpectedSize(fieldReaders.size());
- List types = Lists.newArrayListWithExpectedSize(fieldReaders.size());
List fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
ParquetValueReader> fieldReader = fieldReaders.get(i);
@@ -159,11 +116,10 @@ public ParquetValueReader> struct(
Type fieldType = fields.get(i);
int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1;
newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader));
- types.add(fieldType);
}
}
- return createStructReader(types, newFields, expected);
+ return createStructReader(newFields, expected);
}
}
@@ -207,8 +163,7 @@ public Optional> visit(TimeLogicalTypeAnnotation timeLogic
@Override
public Optional> visit(
TimestampLogicalTypeAnnotation timestampLogicalType) {
- return Optional.of(
- timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC()));
+ return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC()));
}
@Override
@@ -218,6 +173,7 @@ public Optional> visit(IntLogicalTypeAnnotation intLogical
Preconditions.checkArgument(
intLogicalType.isSigned(), "Cannot read UINT64 as a long value");
*/
+
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
}
@@ -268,10 +224,12 @@ public ParquetValueReader> message(
@Override
public ParquetValueReader> struct(
Types.StructType expected, GroupType struct, List> fieldReaders) {
+ if (null == expected) {
+ return createStructReader(ImmutableList.of(), null);
+ }
+
// match the expected struct's order
Map> readersById = Maps.newHashMap();
- Map typesById = Maps.newHashMap();
- Map maxDefinitionLevelsById = Maps.newHashMap();
List fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
ParquetValueReader> fieldReader = fieldReaders.get(i);
@@ -280,55 +238,37 @@ public ParquetValueReader> struct(
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader));
- typesById.put(id, fieldType);
- if (idToConstant.containsKey(id)) {
- maxDefinitionLevelsById.put(id, fieldD);
- }
}
}
- List expectedFields =
- expected != null ? expected.fields() : ImmutableList.of();
+ int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
+ List expectedFields = expected.fields();
List> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
- List types = Lists.newArrayListWithExpectedSize(expectedFields.size());
- // Defaulting to parent max definition level
- int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
+
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
- ParquetValueReader> reader = readersById.get(id);
- if (idToConstant.containsKey(id)) {
- // containsKey is used because the constant may be null
- int fieldMaxDefinitionLevel =
- maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
- reorderedFields.add(
- ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
- types.add(null);
- } else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
- reorderedFields.add(ParquetValueReaders.position());
- types.add(null);
- } else if (id == MetadataColumns.IS_DELETED.fieldId()) {
- reorderedFields.add(ParquetValueReaders.constant(false));
- types.add(null);
- } else if (reader != null) {
- reorderedFields.add(reader);
- types.add(typesById.get(id));
- } else if (field.initialDefault() != null) {
- reorderedFields.add(
- ParquetValueReaders.constant(
- convertConstant(field.type(), field.initialDefault()),
- maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
- types.add(typesById.get(id));
- } else if (field.isOptional()) {
- reorderedFields.add(ParquetValueReaders.nulls());
- types.add(null);
- } else {
- throw new IllegalArgumentException(
- String.format("Missing required field: %s", field.name()));
- }
+ ParquetValueReader> reader =
+ ParquetValueReaders.replaceWithMetadataReader(
+ id, readersById.get(id), idToConstant, constantDefinitionLevel);
+ reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel));
+ }
+
+ return createStructReader(reorderedFields, expected);
+ }
+
+ private ParquetValueReader> defaultReader(
+ Types.NestedField field, ParquetValueReader> reader, int constantDL) {
+ if (reader != null) {
+ return reader;
+ } else if (field.initialDefault() != null) {
+ return ParquetValueReaders.constant(
+ convertConstant(field.type(), field.initialDefault()), constantDL);
+ } else if (field.isOptional()) {
+ return ParquetValueReaders.nulls();
}
- return createStructReader(types, reorderedFields, expected);
+ throw new IllegalArgumentException(String.format("Missing required field: %s", field.name()));
}
@Override
@@ -432,6 +372,17 @@ public ParquetValueReader> primitive(
}
}
+ @Override
+ public ParquetValueReader> variant(
+ Types.VariantType iVariant, GroupType variant, ParquetValueReader> reader) {
+ return reader;
+ }
+
+ @Override
+ public ParquetVariantVisitor> variantVisitor() {
+ return new VariantReaderBuilder(type, Arrays.asList(currentPath()));
+ }
+
MessageType type() {
return type;
}
diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
index 8199f698..6304a41e 100644
--- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
+++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java
@@ -18,13 +18,17 @@
*/
package org.apache.iceberg.data.parquet;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-import org.apache.iceberg.parquet.ParquetTypeVisitor;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
+import org.apache.iceberg.parquet.ParquetVariantVisitor;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VariantWriterBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -32,41 +36,31 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-/**
- * @deprecated since 1.8.0, will be made package-private in 1.9.0
- */
-@Deprecated
-public abstract class BaseParquetWriter {
+abstract class BaseParquetWriter {
- @SuppressWarnings("unchecked")
protected ParquetValueWriter createWriter(MessageType type) {
- return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type));
+ return createWriter(null, type);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected ParquetValueWriter createWriter(Types.StructType struct, MessageType type) {
+ return (ParquetValueWriter)
+ TypeWithSchemaVisitor.visit(struct, type, new WriteBuilder(type));
}
protected abstract ParquetValueWriters.StructWriter createStructWriter(
- List> writers);
+ Types.StructType struct, List> writers);
- protected ParquetValueWriter> fixedWriter(ColumnDescriptor desc) {
- return new GenericParquetWriter.FixedWriter(desc);
- }
+ protected abstract ParquetValueWriter> fixedWriter(ColumnDescriptor desc);
- protected ParquetValueWriter> dateWriter(ColumnDescriptor desc) {
- return new GenericParquetWriter.DateWriter(desc);
- }
+ protected abstract ParquetValueWriter> dateWriter(ColumnDescriptor desc);
- protected ParquetValueWriter> timeWriter(ColumnDescriptor desc) {
- return new GenericParquetWriter.TimeWriter(desc);
- }
+ protected abstract ParquetValueWriter> timeWriter(ColumnDescriptor desc);
- protected ParquetValueWriter> timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) {
- if (isAdjustedToUTC) {
- return new GenericParquetWriter.TimestamptzWriter(desc);
- } else {
- return new GenericParquetWriter.TimestampWriter(desc);
- }
- }
+ protected abstract ParquetValueWriter> timestampWriter(
+ ColumnDescriptor desc, boolean isAdjustedToUTC);
- private class WriteBuilder extends ParquetTypeVisitor> {
+ private class WriteBuilder extends TypeWithSchemaVisitor> {
private final MessageType type;
private WriteBuilder(MessageType type) {
@@ -75,14 +69,14 @@ private WriteBuilder(MessageType type) {
@Override
public ParquetValueWriter> message(
- MessageType message, List> fieldWriters) {
+ Types.StructType struct, MessageType message, List> fieldWriters) {
- return struct(message.asGroupType(), fieldWriters);
+ return struct(struct, message.asGroupType(), fieldWriters);
}
@Override
public ParquetValueWriter> struct(
- GroupType struct, List> fieldWriters) {
+ Types.StructType iceberg, GroupType struct, List> fieldWriters) {
List fields = struct.getFields();
List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size());
for (int i = 0; i < fields.size(); i += 1) {
@@ -91,11 +85,12 @@ public ParquetValueWriter> struct(
writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i)));
}
- return createStructWriter(writers);
+ return createStructWriter(iceberg, writers);
}
@Override
- public ParquetValueWriter> list(GroupType array, ParquetValueWriter> elementWriter) {
+ public ParquetValueWriter> list(
+ Types.ListType iceberg, GroupType array, ParquetValueWriter> elementWriter) {
GroupType repeated = array.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();
@@ -111,7 +106,10 @@ public ParquetValueWriter> list(GroupType array, ParquetValueWriter> element
@Override
public ParquetValueWriter> map(
- GroupType map, ParquetValueWriter> keyWriter, ParquetValueWriter> valueWriter) {
+ Types.MapType iceberg,
+ GroupType map,
+ ParquetValueWriter> keyWriter,
+ ParquetValueWriter> valueWriter) {
GroupType repeatedKeyValue = map.getFields().get(0).asGroupType();
String[] repeatedPath = currentPath();
@@ -131,7 +129,8 @@ public ParquetValueWriter> map(
}
@Override
- public ParquetValueWriter> primitive(PrimitiveType primitive) {
+ public ParquetValueWriter> primitive(
+ org.apache.iceberg.types.Type.PrimitiveType iceberg, PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());
LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
if (logicalType != null) {
@@ -161,6 +160,17 @@ public ParquetValueWriter> primitive(PrimitiveType primitive) {
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
}
+
+ @Override
+ public ParquetValueWriter> variant(
+ Types.VariantType iVariant, GroupType variant, ParquetValueWriter> result) {
+ return result;
+ }
+
+ @Override
+ public ParquetVariantVisitor> variantVisitor() {
+ return new VariantWriterBuilder(type, Arrays.asList(currentPath()));
+ }
}
private class LogicalTypeWriterVisitor
@@ -224,8 +234,8 @@ public Optional> visit(
public Optional> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
Preconditions.checkArgument(
- LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
- "Cannot write timestamp in %s, only MICROS is supported",
+ !LogicalTypeAnnotation.TimeUnit.MILLIS.equals(timestampType.getUnit()),
+ "Cannot write timestamp in %s, only MICROS and NANOS are supported",
timestampType.getUnit());
return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC()));
}
diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
index a00c1176..a003e1af 100644
--- a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
+++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
@@ -27,6 +27,7 @@
import java.util.function.Function;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
@@ -77,6 +78,55 @@ static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType,
}
}
+ // Iceberg 1.9 introduced this API in upstream ParquetConversions and uses it from
+ // ParquetMetrics. Provide a compatible shim here so callers compiled against 1.9
+ // continue to resolve against this shadowed class. The TIME/TIMESTAMP branch keeps
+ // the local millis -> micros normalization used by fromParquetPrimitive above.
+ @SuppressWarnings("unchecked")
+ static T convertValue(Type type, PrimitiveType parquetType, Object value) {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ case INTEGER:
+ case DATE:
+ case LONG:
+ case TIMESTAMP_NANO:
+ case FLOAT:
+ case DOUBLE:
+ return (T) value;
+ case TIME:
+ case TIMESTAMP:
+ if (parquetType.getLogicalTypeAnnotation()
+ instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnno
+ && tsAnno.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ return (T) Long.valueOf(((Number) value).longValue() * 1000L);
+ }
+ return (T) value;
+ case STRING:
+ return (T) ((Binary) value).toStringUsingUTF8();
+ case UUID:
+ return (T) UUIDUtil.convert(((Binary) value).toByteBuffer());
+ case FIXED:
+ case BINARY:
+ return (T) ((Binary) value).toByteBuffer();
+ case DECIMAL:
+ int scale =
+ ((DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation()).getScale();
+ switch (parquetType.getPrimitiveTypeName()) {
+ case INT32:
+ case INT64:
+ return (T) BigDecimal.valueOf(((Number) value).longValue(), scale);
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return (T) new BigDecimal(new BigInteger(((Binary) value).getBytes()), scale);
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported primitive type for decimal: " + parquetType.getPrimitiveTypeName());
+ }
+ default:
+ throw new IllegalArgumentException("Unsupported primitive type: " + type);
+ }
+ }
+
static Function