diff --git a/ice-rest-catalog/pom.xml b/ice-rest-catalog/pom.xml index 5459aac8..33c51b72 100644 --- a/ice-rest-catalog/pom.xml +++ b/ice-rest-catalog/pom.xml @@ -565,6 +565,7 @@ **/DockerScenarioBasedIT.java **/DockerLocalFileIOClickHouseIT.java + **/DockerLocalFileIOClickHouseAllTypesIT.java diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 0a5ac042..e9ca5cc5 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.BaseMetastoreOperations.CommitStatus; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -294,20 +295,39 @@ public long newSnapshotId() { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but are not proof that this is the case. Past locations must * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#UNKNOWN}. * * @param newMetadataLocation the path of the new commit file * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success, Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - return CommitStatus.valueOf( - checkCommitStatus( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)) - .name()); + return checkCommitStatus( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + } + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); } /** diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java new file mode 100644 index 00000000..c319402d --- /dev/null +++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java @@ -0,0 +1,349 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog; + +import java.io.File; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermissions; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopOutputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.Container.ExecResult; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.MountableFile; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Docker integration test: same topology as {@link DockerLocalFileIOClickHouseIT} (REST catalog + + * {@code file:///warehouse} shared with ClickHouse). Writes a one-row Parquet with int, string, + * date, and timestamp columns, inserts via {@code ice}, then asserts ClickHouse reads the same + * values and {@code count() = 1}. + * + *

Requires Docker. Excluded from default Failsafe runs (see {@code pom.xml}); run explicitly, + * e.g. {@code mvn -pl ice-rest-catalog verify -Dit.test=DockerLocalFileIOClickHouseAllTypesIT}. + */ +public class DockerLocalFileIOClickHouseAllTypesIT { + + private static final Logger logger = + LoggerFactory.getLogger(DockerLocalFileIOClickHouseAllTypesIT.class); + + private static final String DEFAULT_CATALOG_IMAGE = + "altinity/ice-rest-catalog:debug-with-ice-0.12.0"; + + private static final String DEFAULT_CLICKHOUSE_IMAGE = + "altinity/clickhouse-server:25.8.16.20002.altinityantalya"; + + private static final String CH_DB = "ice_localfileio"; + private static final String NAMESPACE = "ch_test"; + private static final String TABLE = NAMESPACE + ".basictypes"; + + private Network network; + + private Path hostWarehouseDir; + + private GenericContainer catalog; + + private GenericContainer clickhouse; + + @BeforeClass + @SuppressWarnings("resource") + public void setUp() throws Exception { + String dockerImage = System.getProperty("docker.image", DEFAULT_CATALOG_IMAGE); + logger.info("Using catalog Docker image: {}", dockerImage); + + String clickhouseImage = System.getProperty("clickhouse.image", DEFAULT_CLICKHOUSE_IMAGE); + logger.info("Using ClickHouse Docker image: {}", clickhouseImage); + + hostWarehouseDir = Files.createTempDirectory("ice-warehouse-basictypes-"); + Files.setPosixFilePermissions(hostWarehouseDir, PosixFilePermissions.fromString("rwxr-xr-x")); + + URL configResource = + getClass().getClassLoader().getResource("docker-catalog-localfileio-config.yaml"); + if (configResource == null) { + throw new IllegalStateException("docker-catalog-localfileio-config.yaml not on classpath"); + } + String catalogConfig = Files.readString(Paths.get(configResource.toURI())); + + network = Network.newNetwork(); + + catalog = + new GenericContainer<>(dockerImage) + .withNetwork(network) + .withNetworkAliases("catalog") + .withExposedPorts(5000) + .withFileSystemBind(hostWarehouseDir.toString(), "/warehouse", BindMode.READ_WRITE) + .withEnv("ICE_REST_CATALOG_CONFIG", "") + .withEnv("ICE_REST_CATALOG_CONFIG_YAML", catalogConfig) + .waitingFor(Wait.forHttp("/v1/config").forPort(5000).forStatusCode(200)); + + try { + catalog.start(); + } catch (Exception e) { + if (catalog != null) { + logger.error("Catalog container logs: {}", catalog.getLogs()); + } + throw e; + } + + File cliConfigHost = File.createTempFile("ice-docker-cli-", ".yaml"); + try { + Files.write( + cliConfigHost.toPath(), + ("uri: http://localhost:5000\n" + "warehouse: file:///warehouse\n").getBytes()); + catalog.copyFileToContainer( + MountableFile.forHostPath(cliConfigHost.toPath()), "/tmp/ice-cli.yaml"); + } finally { + cliConfigHost.delete(); + } + + clickhouse = + new GenericContainer<>(clickhouseImage) + .withNetwork(network) + .withNetworkAliases("clickhouse") + .withExposedPorts(8123, 9000) + .withFileSystemBind(hostWarehouseDir.toString(), "/warehouse", BindMode.READ_ONLY) + .waitingFor(Wait.forHttp("/ping").forPort(8123).forStatusCode(200)); + + try { + clickhouse.start(); + } catch (Exception e) { + if (clickhouse != null) { + logger.error("ClickHouse container logs: {}", clickhouse.getLogs()); + } + throw e; + } + + logger.info( + "Catalog at {}:{}, ClickHouse at {}:{}", + catalog.getHost(), + catalog.getMappedPort(5000), + clickhouse.getHost(), + clickhouse.getMappedPort(8123)); + } + + @AfterClass + public void tearDown() { + try { + if (clickhouse != null && clickhouse.isRunning()) { + clickhouse.execInContainer( + "clickhouse-client", "--query", "DROP DATABASE IF EXISTS `" + CH_DB + "` SYNC"); + } + } catch (Exception e) { + logger.warn("ClickHouse cleanup failed: {}", e.getMessage()); + } + try { + if (catalog != null && catalog.isRunning()) { + ExecResult r1 = + catalog.execInContainer( + "ice", "--config", "/tmp/ice-cli.yaml", "delete-table", TABLE, "-p"); + if (r1.getExitCode() != 0) { + logger.warn("delete-table stderr: {}", r1.getStderr()); + } + ExecResult r2 = + catalog.execInContainer( + "ice", "--config", "/tmp/ice-cli.yaml", "delete-namespace", NAMESPACE, "-p"); + if (r2.getExitCode() != 0) { + logger.warn("delete-namespace stderr: {}", r2.getStderr()); + } + } + } catch (Exception e) { + logger.warn("Ice CLI cleanup failed: {}", e.getMessage()); + } + if (clickhouse != null) { + clickhouse.close(); + } + if (catalog != null) { + catalog.close(); + } + if (network != null) { + network.close(); + } + if (hostWarehouseDir != null) { + try { + try (var walk = Files.walk(hostWarehouseDir)) { + walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + } catch (Exception e) { + logger.warn("Failed to delete warehouse dir {}: {}", hostWarehouseDir, e.getMessage()); + } + } + } + + @Test + public void testClickHouseReadsBasicTypes() throws Exception { + Schema schema = basicTypesSchema(); + Record row = GenericRecord.create(schema); + row.setField("id", 1); + row.setField("b_int", 40); + row.setField("b_string", "hello"); + row.setField("b_date", LocalDate.of(2024, 6, 15)); + row.setField("b_ts", LocalDateTime.of(2024, 6, 15, 12, 30, 45)); + + Path parquetFile = Files.createTempFile("basic-", ".parquet"); + parquetFile.toFile().deleteOnExit(); + writeParquet(schema, List.of(row), parquetFile); + + catalog.copyFileToContainer(MountableFile.forHostPath(parquetFile), "/tmp/basic.parquet"); + + iceExecOrThrow("create-namespace", NAMESPACE); + iceExecOrThrow("insert", "--create-table", TABLE, "file:///tmp/basic.parquet"); + + String createDb = + "SET allow_experimental_database_iceberg = 1; " + + "DROP DATABASE IF EXISTS `" + + CH_DB + + "`; " + + "CREATE DATABASE `" + + CH_DB + + "` ENGINE = DataLakeCatalog('http://catalog:5000') " + + "SETTINGS catalog_type='rest', vended_credentials=false, warehouse='warehouse'"; + chExecOrThrow(createDb); + + String countSql = "SELECT count() FROM `" + CH_DB + "`.`" + TABLE + "` FORMAT TabSeparated"; + String count = chQueryOne(countSql); + if (!"1".equals(count)) { + throw new AssertionError("Expected count()=1, got: " + count); + } + + String valuesSql = + "SELECT b_int, b_string, toString(b_date), formatDateTime(b_ts, '%Y-%m-%d %H:%m:%S') FROM `" + + CH_DB + + "`.`" + + TABLE + + "` FORMAT TabSeparated"; + String line = chQueryOne(valuesSql); + String[] cells = line.split("\t", -1); + if (cells.length != 4) { + throw new AssertionError("Expected 4 columns, got " + cells.length + ": " + line); + } + if (!"40".equals(cells[0])) { + throw new AssertionError("b_int: expected 40, got " + cells[0]); + } + if (!"hello".equals(cells[1])) { + throw new AssertionError("b_string: expected hello, got " + cells[1]); + } + if (!"2024-06-15".equals(cells[2])) { + throw new AssertionError("b_date: expected 2024-06-15, got " + cells[2]); + } + if (!"2024-06-15 12:06:45".equals(cells[3])) { + throw new AssertionError("b_ts: expected 2024-06-15 12:06:45, got " + cells[3]); + } + } + + private static Schema basicTypesSchema() { + return new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "b_int", Types.IntegerType.get()), + Types.NestedField.optional(3, "b_string", Types.StringType.get()), + Types.NestedField.optional(4, "b_date", Types.DateType.get()), + Types.NestedField.optional(5, "b_ts", Types.TimestampType.withoutZone())); + } + + private static void writeParquet(Schema schema, List records, Path parquetPath) + throws Exception { + org.apache.iceberg.io.OutputFile outputFile = + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(parquetPath.toUri()), new Configuration()); + try (FileAppender writer = + Parquet.write(outputFile) + .schema(schema) + .setAll(java.util.Map.of()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .metricsConfig(MetricsConfig.getDefault()) + .overwrite() + .build()) { + for (Record rec : records) { + writer.add(rec); + } + } + } + + private void iceExecOrThrow(String... args) throws Exception { + List cmd = new ArrayList<>(); + cmd.add("ice"); + cmd.add("--config"); + cmd.add("/tmp/ice-cli.yaml"); + for (String a : args) { + cmd.add(a); + } + ExecResult result = catalog.execInContainer(cmd.toArray(new String[0])); + if (result.getExitCode() != 0) { + throw new IllegalStateException( + "ice " + + String.join(" ", args) + + " failed: exit=" + + result.getExitCode() + + "\nstdout:\n" + + result.getStdout() + + "\nstderr:\n" + + result.getStderr() + + "\ncatalog logs:\n" + + catalog.getLogs()); + } + } + + private void chExecOrThrow(String multiQuery) throws Exception { + ExecResult result = + clickhouse.execInContainer("clickhouse-client", "--multiquery", "--query", multiQuery); + if (result.getExitCode() != 0) { + throw new IllegalStateException( + "clickhouse-client failed: exit=" + + result.getExitCode() + + "\nstdout:\n" + + result.getStdout() + + "\nstderr:\n" + + result.getStderr() + + "\nclickhouse logs:\n" + + clickhouse.getLogs()); + } + } + + private String chQueryOne(String sql) throws Exception { + String prelude = "SET session_timezone='UTC'; SET allow_experimental_database_iceberg=1; "; + ExecResult r = + clickhouse.execInContainer("clickhouse-client", "--multiquery", "--query", prelude + sql); + if (r.getExitCode() != 0) { + throw new IllegalStateException( + "clickhouse-client SELECT failed: exit=" + + r.getExitCode() + + "\nquery:\n" + + sql + + "\nstdout:\n" + + r.getStdout() + + "\nstderr:\n" + + r.getStderr() + + "\nclickhouse logs:\n" + + clickhouse.getLogs()); + } + return r.getStdout().trim(); + } +} diff --git a/ice/README.md b/ice/README.md index 69687120..3e6f8416 100644 --- a/ice/README.md +++ b/ice/README.md @@ -98,6 +98,9 @@ Supported codecs: `zstd`, `snappy`, `gzip`, `lz4`. # add a column to an existing table ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string"}]' +# add a NOT NULL column (`required: true`; uses Iceberg allowIncompatibleChanges for that alter) +ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string","required":true}]' + # verify the schema change ice describe -s flowers.iris ``` diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java index 782682f2..d39a8e9f 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java @@ -13,7 +13,11 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.UUID; import javax.annotation.Nullable; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -22,6 +26,7 @@ import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.slf4j.Logger; @@ -53,6 +58,7 @@ public static class AddColumn extends Update { @Nullable private final String after; @Nullable private final String before; private final boolean first; + private final boolean required; public AddColumn( @JsonProperty(value = "name", required = true) String name, @@ -60,16 +66,83 @@ public AddColumn( @JsonProperty("doc") @Nullable String doc, @JsonProperty("after") @Nullable String after, @JsonProperty("before") @Nullable String before, - @JsonProperty("first") @Nullable Boolean first) { + @JsonProperty("first") @Nullable Boolean first, + @JsonProperty("required") @Nullable Boolean required) { this.name = name; this.type = Types.fromPrimitiveString(type); this.doc = doc; this.after = after; this.before = before; this.first = first != null && first; + this.required = required != null && required; } } + // Coerce a JSON literal (number / boolean / string) into an Iceberg Literal of the requested + // type. For numeric, boolean and string types we build the Literal directly; for complex + // primitives (date/time/timestamp/decimal/uuid/binary) we accept a string form and let + // Iceberg parse it via Literal.to(type). + @Nullable + private static Literal coerceDefault(Type type, @Nullable Object jsonValue) { + if (jsonValue == null) { + return null; + } + try { + switch (type.typeId()) { + case BOOLEAN: + if (jsonValue instanceof Boolean b) { + return Literal.of(b); + } + return Literal.of(Boolean.parseBoolean(jsonValue.toString())); + case INTEGER: + return Literal.of(((Number) numberOrParse(jsonValue)).intValue()); + case LONG: + return Literal.of(((Number) numberOrParse(jsonValue)).longValue()); + case FLOAT: + return Literal.of(((Number) numberOrParse(jsonValue)).floatValue()); + case DOUBLE: + return Literal.of(((Number) numberOrParse(jsonValue)).doubleValue()); + case STRING: + return Literal.of(jsonValue.toString()); + case BINARY: + case FIXED: + return Literal.of(ByteBuffer.wrap(jsonValue.toString().getBytes(StandardCharsets.UTF_8))); + case DECIMAL: + return Literal.of(new BigDecimal(jsonValue.toString())).to(type); + case UUID: + return Literal.of(UUID.fromString(jsonValue.toString())); + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMP_NANO: + // Let Iceberg parse the canonical string form (e.g. "2025-01-01", + // "2025-01-01T00:00:00", "2025-01-01T00:00:00+00:00"). + Literal parsed = Literal.of(jsonValue.toString()).to(type); + if (parsed == null) { + throw new IllegalArgumentException( + String.format("Cannot parse default value '%s' as %s", jsonValue, type)); + } + return parsed; + default: + throw new IllegalArgumentException( + String.format("Defaults not supported for type %s", type)); + } + } catch (ClassCastException | IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("Cannot coerce default value '%s' to type %s", jsonValue, type), e); + } + } + + // Accept either a JSON number (already a Number) or a string parseable as a number. + private static Number numberOrParse(Object jsonValue) { + if (jsonValue instanceof Number n) { + return n; + } + String s = jsonValue.toString(); + // Use BigDecimal to accept any numeric form; downstream casts narrow to int/long/float/double. + return new BigDecimal(s); + } + public static class AlterColumn extends Update { private final String name; private final Type.PrimitiveType type; @@ -148,7 +221,18 @@ public static void run(Catalog catalog, TableIdentifier tableId, List up case AddColumn up -> { // TODO: support nested columns UpdateSchema us = schemaUpdates.getValue(); - us.addColumn(up.name, up.type, up.doc); + if (up.required) { + // Iceberg rejects required adds without an initial default unless incompatible + // changes are explicitly allowed (even for empty tables with a snapshot). + us.allowIncompatibleChanges(); + if (up.doc != null) { + us.addRequiredColumn(up.name, up.type, up.doc); + } else { + us.addRequiredColumn(up.name, up.type); + } + } else { + us.addColumn(up.name, up.type, up.doc); + } if (up.after != null) { us.moveAfter(up.name, up.after); } else if (up.before != null) { diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 7f237b72..620c226d 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,15 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -50,11 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetReaders { +abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -78,56 +76,16 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List types, List> fieldReaders, Types.StructType structType); + List> fieldReaders, Types.StructType structType); - protected ParquetValueReader fixedReader(ColumnDescriptor desc) { - return new GenericParquetReaders.FixedReader(desc); - } + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); - protected ParquetValueReader dateReader(ColumnDescriptor desc) { - return new GenericParquetReaders.DateReader(desc); - } + protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); - protected ParquetValueReader timeReader(ColumnDescriptor desc) { - LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time); - - LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit(); - switch (unit) { - case MICROS: - return new GenericParquetReaders.TimeReader(desc); - case MILLIS: - return new GenericParquetReaders.TimeMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for time: " + unit); - } - } + protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); - protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - return new GenericParquetReaders.TimestampInt96Reader(desc); - } - - LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - timestamp instanceof TimestampLogicalTypeAnnotation, - "Invalid timestamp logical type: " + timestamp); - - LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit(); - switch (unit) { - case MICROS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzReader(desc) - : new GenericParquetReaders.TimestampReader(desc); - case MILLIS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzMillisReader(desc) - : new GenericParquetReaders.TimestampMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); - } - } + protected abstract ParquetValueReader timestampReader( + ColumnDescriptor desc, boolean isAdjustedToUTC); protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -151,7 +109,6 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -159,11 +116,10 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - types.add(fieldType); } } - return createStructReader(types, newFields, expected); + return createStructReader(newFields, expected); } } @@ -207,8 +163,7 @@ public Optional> visit(TimeLogicalTypeAnnotation timeLogic @Override public Optional> visit( TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of( - timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC())); + return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); } @Override @@ -218,6 +173,7 @@ public Optional> visit(IntLogicalTypeAnnotation intLogical Preconditions.checkArgument( intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); */ + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); } @@ -268,10 +224,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return createStructReader(ImmutableList.of(), null); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -280,55 +238,37 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - types.add(null); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - types.add(null); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - types.add(null); - } else if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - types.add(typesById.get(id)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); + } + + return createStructReader(reorderedFields, expected); + } + + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); } - return createStructReader(types, reorderedFields, expected); + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); } @Override @@ -432,6 +372,17 @@ public ParquetValueReader primitive( } } + @Override + public ParquetValueReader variant( + Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { + return reader; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantReaderBuilder(type, Arrays.asList(currentPath())); + } + MessageType type() { return type; } diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 8199f698..6304a41e 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetVariantVisitor; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -32,41 +36,31 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetWriter { +abstract class BaseParquetWriter { - @SuppressWarnings("unchecked") protected ParquetValueWriter createWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return createWriter(null, type); + } + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(Types.StructType struct, MessageType type) { + return (ParquetValueWriter) + TypeWithSchemaVisitor.visit(struct, type, new WriteBuilder(type)); } protected abstract ParquetValueWriters.StructWriter createStructWriter( - List> writers); + Types.StructType struct, List> writers); - protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.FixedWriter(desc); - } + protected abstract ParquetValueWriter fixedWriter(ColumnDescriptor desc); - protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.DateWriter(desc); - } + protected abstract ParquetValueWriter dateWriter(ColumnDescriptor desc); - protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.TimeWriter(desc); - } + protected abstract ParquetValueWriter timeWriter(ColumnDescriptor desc); - protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (isAdjustedToUTC) { - return new GenericParquetWriter.TimestamptzWriter(desc); - } else { - return new GenericParquetWriter.TimestampWriter(desc); - } - } + protected abstract ParquetValueWriter timestampWriter( + ColumnDescriptor desc, boolean isAdjustedToUTC); - private class WriteBuilder extends ParquetTypeVisitor> { + private class WriteBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private WriteBuilder(MessageType type) { @@ -75,14 +69,14 @@ private WriteBuilder(MessageType type) { @Override public ParquetValueWriter message( - MessageType message, List> fieldWriters) { + Types.StructType struct, MessageType message, List> fieldWriters) { - return struct(message.asGroupType(), fieldWriters); + return struct(struct, message.asGroupType(), fieldWriters); } @Override public ParquetValueWriter struct( - GroupType struct, List> fieldWriters) { + Types.StructType iceberg, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); for (int i = 0; i < fields.size(); i += 1) { @@ -91,11 +85,12 @@ public ParquetValueWriter struct( writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return createStructWriter(writers); + return createStructWriter(iceberg, writers); } @Override - public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list( + Types.ListType iceberg, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -111,7 +106,10 @@ public ParquetValueWriter list(GroupType array, ParquetValueWriter element @Override public ParquetValueWriter map( - GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { + Types.MapType iceberg, + GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -131,7 +129,8 @@ public ParquetValueWriter map( } @Override - public ParquetValueWriter primitive(PrimitiveType primitive) { + public ParquetValueWriter primitive( + org.apache.iceberg.types.Type.PrimitiveType iceberg, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { @@ -161,6 +160,17 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + + @Override + public ParquetValueWriter variant( + Types.VariantType iVariant, GroupType variant, ParquetValueWriter result) { + return result; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantWriterBuilder(type, Arrays.asList(currentPath())); + } } private class LogicalTypeWriterVisitor @@ -224,8 +234,8 @@ public Optional> visit( public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { Preconditions.checkArgument( - LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", + !LogicalTypeAnnotation.TimeUnit.MILLIS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS and NANOS are supported", timestampType.getUnit()); return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC())); } diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index a00c1176..a003e1af 100644 --- a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; @@ -77,6 +78,55 @@ static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType, } } + // Iceberg 1.9 introduced this API in upstream ParquetConversions and uses it from + // ParquetMetrics. Provide a compatible shim here so callers compiled against 1.9 + // continue to resolve against this shadowed class. The TIME/TIMESTAMP branch keeps + // the local millis -> micros normalization used by fromParquetPrimitive above. + @SuppressWarnings("unchecked") + static T convertValue(Type type, PrimitiveType parquetType, Object value) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case LONG: + case TIMESTAMP_NANO: + case FLOAT: + case DOUBLE: + return (T) value; + case TIME: + case TIMESTAMP: + if (parquetType.getLogicalTypeAnnotation() + instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnno + && tsAnno.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return (T) Long.valueOf(((Number) value).longValue() * 1000L); + } + return (T) value; + case STRING: + return (T) ((Binary) value).toStringUsingUTF8(); + case UUID: + return (T) UUIDUtil.convert(((Binary) value).toByteBuffer()); + case FIXED: + case BINARY: + return (T) ((Binary) value).toByteBuffer(); + case DECIMAL: + int scale = + ((DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation()).getScale(); + switch (parquetType.getPrimitiveTypeName()) { + case INT32: + case INT64: + return (T) BigDecimal.valueOf(((Number) value).longValue(), scale); + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return (T) new BigDecimal(new BigInteger(((Binary) value).getBytes()), scale); + default: + throw new IllegalArgumentException( + "Unsupported primitive type for decimal: " + parquetType.getPrimitiveTypeName()); + } + default: + throw new IllegalArgumentException("Unsupported primitive type: " + type); + } + } + static Function converterFromParquet( PrimitiveType parquetType, Type icebergType) { Function fromParquet = converterFromParquet(parquetType); diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index a939cdf0..c771c83c 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,7 +41,6 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -49,8 +48,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,9 +63,6 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); - private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; - private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = - "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -90,6 +84,7 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; + private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -97,7 +92,6 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, - HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -110,10 +104,6 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); - if (requestInterceptor != null) { - clientBuilder.addRequestInterceptorLast(requestInterceptor); - } - int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -126,6 +116,7 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); + this.isRootClient = true; } /** @@ -139,6 +130,7 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; + this.isRootClient = false; } @Override @@ -160,7 +152,6 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } - // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -331,50 +322,13 @@ protected T execute( @Override public void close() throws IOException { - try { - if (authSession != null) { - authSession.close(); - } - } finally { + // Do not close the AuthSession as it's managed by the owner of this HTTPClient. + // Only close the underlying Apache HTTP client if this is a root HTTPClient. + if (isRootClient) { httpClient.close(CloseMode.GRACEFUL); } } - @VisibleForTesting - static HttpRequestInterceptor loadInterceptorDynamically( - String impl, Map properties) { - HttpRequestInterceptor instance; - - DynConstructors.Ctor ctor; - try { - ctor = - DynConstructors.builder(HttpRequestInterceptor.class) - .loader(HTTPClient.class.getClassLoader()) - .impl(impl) - .buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException( - String.format( - "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), - e); - } - - try { - instance = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); - } - - DynMethods.builder("initialize") - .hiddenImpl(impl, Map.class) - .orNoop() - .build(instance) - .invoke(properties); - - return instance; - } - static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -501,12 +455,6 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); - HttpRequestInterceptor interceptor = null; - - if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { - interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); - } - if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -518,7 +466,6 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, - interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java index a087acee..bd290fb5 100644 --- a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java +++ b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java @@ -122,7 +122,7 @@ public void testAddColumnAfter() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null, null)); AlterTable.run(catalog, tableId, updates); @@ -136,7 +136,8 @@ public void testAddColumnBefore() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null)); + Arrays.asList( + new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null, null)); AlterTable.run(catalog, tableId, updates); @@ -150,7 +151,7 @@ public void testAddColumnFirst() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true, null)); AlterTable.run(catalog, tableId, updates); @@ -165,7 +166,7 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null)); + Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null, null)); AlterTable.run(catalog, tableId, updates); @@ -173,4 +174,30 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { assertThat(table.schema().columns().stream().map(Types.NestedField::name).toList()) .containsExactly("id", "name", "bad", "timestamp_col", "date_col"); } + + @Test + public void testAddRequiredColumnOnEmptyTable() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, true)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isRequired()).isTrue(); + } + + @Test + public void testAddOptionalColumnByDefault() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, null)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isOptional()).isTrue(); + } } diff --git a/pom.xml b/pom.xml index e84e6369..c6ed84cc 100644 --- a/pom.xml +++ b/pom.xml @@ -14,12 +14,12 @@ 21 21 - 1.8.1 + 1.9.2 3.4.1 1.15.1 2.31.13 4.7.6 - 2.18.2 + 2.18.3 2.0.17 1.5.18 1.3.6