From e2b7ce1fa30c13f37c1b13892aa36453e7a5913b Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Thu, 23 Apr 2026 14:24:10 -0500 Subject: [PATCH 1/6] Upgraded iceberg-java to 1.9.2, upgrade overridden classes, added support for alter table add column with default, non/null columns. --- .../iceberg/BaseMetastoreTableOperations.java | 38 +++-- .../main/java/com/altinity/ice/cli/Main.java | 2 +- .../ice/cli/internal/cmd/AlterTable.java | 93 ++++++++++- .../data/parquet/BaseParquetReaders.java | 148 ++++++------------ .../data/parquet/BaseParquetWriter.java | 80 +++++----- .../iceberg/parquet/ParquetConversions.java | 50 ++++++ .../org/apache/iceberg/rest/HTTPClient.java | 65 +------- .../ice/cli/internal/cmd/AlterTableTest.java | 105 +++++++++++++ pom.xml | 4 +- 9 files changed, 378 insertions(+), 207 deletions(-) diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 0a5ac042..e9ca5cc5 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.BaseMetastoreOperations.CommitStatus; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; @@ -294,20 +295,39 @@ public long newSnapshotId() { * were attempting to set. This is used as a last resort when we are dealing with exceptions that * may indicate the commit has failed but are not proof that this is the case. Past locations must * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#UNKNOWN}. * * @param newMetadataLocation the path of the new commit file * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown + * @return Commit Status of Success, Unknown */ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { - return CommitStatus.valueOf( - checkCommitStatus( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)) - .name()); + return checkCommitStatus( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + } + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); } /** diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 644811b1..69d5e0e1 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -335,7 +335,7 @@ void alterTable( e.g. [{"op":"drop_column","name":"foo"}] Supported operations: - - add_column (params: "name", "type" (https://iceberg.apache.org/spec/#primitive-types), "doc" (optional)) + - add_column (params: "name", "type" (https://iceberg.apache.org/spec/#primitive-types), "doc" (optional), "nullable" (optional, default true), "default" (optional)) - alter_column (params: "name", "type" (https://iceberg.apache.org/spec/#primitive-types)) - rename_column (params: "name", "new_name") - drop_column (params: "name") diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java index 65b07ae7..942d0961 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java @@ -13,7 +13,11 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.UUID; import javax.annotation.Nullable; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; @@ -22,6 +26,7 @@ import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.slf4j.Logger; @@ -50,17 +55,88 @@ public static class AddColumn extends Update { private final String name; private final Type type; @Nullable private final String doc; + private final boolean nullable; + @Nullable private final Literal defaultValue; public AddColumn( @JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "type", required = true) String type, - @JsonProperty("doc") @Nullable String doc) { + @JsonProperty("doc") @Nullable String doc, + @JsonProperty("nullable") @Nullable Boolean nullable, + @JsonProperty("default") @Nullable Object defaultValue) { this.name = name; this.type = Types.fromPrimitiveString(type); this.doc = doc; + this.nullable = nullable == null || nullable; + this.defaultValue = coerceDefault(this.type, defaultValue); } } + // Coerce a JSON literal (number / boolean / string) into an Iceberg Literal of the requested + // type. For numeric, boolean and string types we build the Literal directly; for complex + // primitives (date/time/timestamp/decimal/uuid/binary) we accept a string form and let + // Iceberg parse it via Literal.to(type). + @Nullable + private static Literal coerceDefault(Type type, @Nullable Object jsonValue) { + if (jsonValue == null) { + return null; + } + try { + switch (type.typeId()) { + case BOOLEAN: + if (jsonValue instanceof Boolean b) { + return Literal.of(b); + } + return Literal.of(Boolean.parseBoolean(jsonValue.toString())); + case INTEGER: + return Literal.of(((Number) numberOrParse(jsonValue)).intValue()); + case LONG: + return Literal.of(((Number) numberOrParse(jsonValue)).longValue()); + case FLOAT: + return Literal.of(((Number) numberOrParse(jsonValue)).floatValue()); + case DOUBLE: + return Literal.of(((Number) numberOrParse(jsonValue)).doubleValue()); + case STRING: + return Literal.of(jsonValue.toString()); + case BINARY: + case FIXED: + return Literal.of(ByteBuffer.wrap(jsonValue.toString().getBytes(StandardCharsets.UTF_8))); + case DECIMAL: + return Literal.of(new BigDecimal(jsonValue.toString())).to(type); + case UUID: + return Literal.of(UUID.fromString(jsonValue.toString())); + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMP_NANO: + // Let Iceberg parse the canonical string form (e.g. "2025-01-01", + // "2025-01-01T00:00:00", "2025-01-01T00:00:00+00:00"). + Literal parsed = Literal.of(jsonValue.toString()).to(type); + if (parsed == null) { + throw new IllegalArgumentException( + String.format("Cannot parse default value '%s' as %s", jsonValue, type)); + } + return parsed; + default: + throw new IllegalArgumentException( + String.format("Defaults not supported for type %s", type)); + } + } catch (ClassCastException | IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("Cannot coerce default value '%s' to type %s", jsonValue, type), e); + } + } + + // Accept either a JSON number (already a Number) or a string parseable as a number. + private static Number numberOrParse(Object jsonValue) { + if (jsonValue instanceof Number n) { + return n; + } + String s = jsonValue.toString(); + // Use BigDecimal to accept any numeric form; downstream casts narrow to int/long/float/double. + return new BigDecimal(s); + } + public static class AlterColumn extends Update { private final String name; private final Type.PrimitiveType type; @@ -138,7 +214,20 @@ public static void run(Catalog catalog, TableIdentifier tableId, List up switch (update) { case AddColumn up -> { // TODO: support nested columns - schemaUpdates.getValue().addColumn(up.name, up.type, up.doc); + UpdateSchema us = schemaUpdates.getValue(); + if (up.nullable) { + if (up.defaultValue == null) { + us.addColumn(up.name, up.type, up.doc); + } else { + us.addColumn(up.name, up.type, up.doc, up.defaultValue); + } + } else { + if (up.defaultValue == null) { + us.addRequiredColumn(up.name, up.type, up.doc); + } else { + us.addRequiredColumn(up.name, up.type, up.doc, up.defaultValue); + } + } } case AlterColumn up -> { // TODO: support nested columns diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 7f237b72..8708fe28 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,15 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -50,11 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetReaders { +abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -78,56 +76,16 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List types, List> fieldReaders, Types.StructType structType); - - protected ParquetValueReader fixedReader(ColumnDescriptor desc) { - return new GenericParquetReaders.FixedReader(desc); - } + List> fieldReaders, Types.StructType structType); - protected ParquetValueReader dateReader(ColumnDescriptor desc) { - return new GenericParquetReaders.DateReader(desc); - } + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); - protected ParquetValueReader timeReader(ColumnDescriptor desc) { - LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time); - - LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit(); - switch (unit) { - case MICROS: - return new GenericParquetReaders.TimeReader(desc); - case MILLIS: - return new GenericParquetReaders.TimeMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for time: " + unit); - } - } + protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); - protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - return new GenericParquetReaders.TimestampInt96Reader(desc); - } + protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); - LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); - Preconditions.checkArgument( - timestamp instanceof TimestampLogicalTypeAnnotation, - "Invalid timestamp logical type: " + timestamp); - - LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit(); - switch (unit) { - case MICROS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzReader(desc) - : new GenericParquetReaders.TimestampReader(desc); - case MILLIS: - return isAdjustedToUTC - ? new GenericParquetReaders.TimestamptzMillisReader(desc) - : new GenericParquetReaders.TimestampMillisReader(desc); - default: - throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); - } - } + protected abstract ParquetValueReader timestampReader( + ColumnDescriptor desc, boolean isAdjustedToUTC); protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -151,7 +109,6 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -159,11 +116,10 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - types.add(fieldType); } } - return createStructReader(types, newFields, expected); + return createStructReader(newFields, expected); } } @@ -207,8 +163,7 @@ public Optional> visit(TimeLogicalTypeAnnotation timeLogic @Override public Optional> visit( TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of( - timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC())); + return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); } @Override @@ -268,10 +223,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return createStructReader(ImmutableList.of(), null); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -280,55 +237,37 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - types.add(null); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - types.add(null); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - types.add(null); - } else if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - types.add(typesById.get(id)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); + } + + return createStructReader(reorderedFields, expected); + } + + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); } - return createStructReader(types, reorderedFields, expected); + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); } @Override @@ -432,6 +371,17 @@ public ParquetValueReader primitive( } } + @Override + public ParquetValueReader variant( + Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { + return reader; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantReaderBuilder(type, Arrays.asList(currentPath())); + } + MessageType type() { return type; } diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 8199f698..6304a41e 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,13 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.iceberg.parquet.ParquetTypeVisitor; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetVariantVisitor; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantWriterBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -32,41 +36,31 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetWriter { +abstract class BaseParquetWriter { - @SuppressWarnings("unchecked") protected ParquetValueWriter createWriter(MessageType type) { - return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + return createWriter(null, type); + } + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(Types.StructType struct, MessageType type) { + return (ParquetValueWriter) + TypeWithSchemaVisitor.visit(struct, type, new WriteBuilder(type)); } protected abstract ParquetValueWriters.StructWriter createStructWriter( - List> writers); + Types.StructType struct, List> writers); - protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.FixedWriter(desc); - } + protected abstract ParquetValueWriter fixedWriter(ColumnDescriptor desc); - protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.DateWriter(desc); - } + protected abstract ParquetValueWriter dateWriter(ColumnDescriptor desc); - protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { - return new GenericParquetWriter.TimeWriter(desc); - } + protected abstract ParquetValueWriter timeWriter(ColumnDescriptor desc); - protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { - if (isAdjustedToUTC) { - return new GenericParquetWriter.TimestamptzWriter(desc); - } else { - return new GenericParquetWriter.TimestampWriter(desc); - } - } + protected abstract ParquetValueWriter timestampWriter( + ColumnDescriptor desc, boolean isAdjustedToUTC); - private class WriteBuilder extends ParquetTypeVisitor> { + private class WriteBuilder extends TypeWithSchemaVisitor> { private final MessageType type; private WriteBuilder(MessageType type) { @@ -75,14 +69,14 @@ private WriteBuilder(MessageType type) { @Override public ParquetValueWriter message( - MessageType message, List> fieldWriters) { + Types.StructType struct, MessageType message, List> fieldWriters) { - return struct(message.asGroupType(), fieldWriters); + return struct(struct, message.asGroupType(), fieldWriters); } @Override public ParquetValueWriter struct( - GroupType struct, List> fieldWriters) { + Types.StructType iceberg, GroupType struct, List> fieldWriters) { List fields = struct.getFields(); List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); for (int i = 0; i < fields.size(); i += 1) { @@ -91,11 +85,12 @@ public ParquetValueWriter struct( writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); } - return createStructWriter(writers); + return createStructWriter(iceberg, writers); } @Override - public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + public ParquetValueWriter list( + Types.ListType iceberg, GroupType array, ParquetValueWriter elementWriter) { GroupType repeated = array.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -111,7 +106,10 @@ public ParquetValueWriter list(GroupType array, ParquetValueWriter element @Override public ParquetValueWriter map( - GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { + Types.MapType iceberg, + GroupType map, + ParquetValueWriter keyWriter, + ParquetValueWriter valueWriter) { GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); String[] repeatedPath = currentPath(); @@ -131,7 +129,8 @@ public ParquetValueWriter map( } @Override - public ParquetValueWriter primitive(PrimitiveType primitive) { + public ParquetValueWriter primitive( + org.apache.iceberg.types.Type.PrimitiveType iceberg, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { @@ -161,6 +160,17 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + + @Override + public ParquetValueWriter variant( + Types.VariantType iVariant, GroupType variant, ParquetValueWriter result) { + return result; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantWriterBuilder(type, Arrays.asList(currentPath())); + } } private class LogicalTypeWriterVisitor @@ -224,8 +234,8 @@ public Optional> visit( public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { Preconditions.checkArgument( - LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), - "Cannot write timestamp in %s, only MICROS is supported", + !LogicalTypeAnnotation.TimeUnit.MILLIS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS and NANOS are supported", timestampType.getUnit()); return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC())); } diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index a00c1176..a003e1af 100644 --- a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -27,6 +27,7 @@ import java.util.function.Function; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; @@ -77,6 +78,55 @@ static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType, } } + // Iceberg 1.9 introduced this API in upstream ParquetConversions and uses it from + // ParquetMetrics. Provide a compatible shim here so callers compiled against 1.9 + // continue to resolve against this shadowed class. The TIME/TIMESTAMP branch keeps + // the local millis -> micros normalization used by fromParquetPrimitive above. + @SuppressWarnings("unchecked") + static T convertValue(Type type, PrimitiveType parquetType, Object value) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case DATE: + case LONG: + case TIMESTAMP_NANO: + case FLOAT: + case DOUBLE: + return (T) value; + case TIME: + case TIMESTAMP: + if (parquetType.getLogicalTypeAnnotation() + instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnno + && tsAnno.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return (T) Long.valueOf(((Number) value).longValue() * 1000L); + } + return (T) value; + case STRING: + return (T) ((Binary) value).toStringUsingUTF8(); + case UUID: + return (T) UUIDUtil.convert(((Binary) value).toByteBuffer()); + case FIXED: + case BINARY: + return (T) ((Binary) value).toByteBuffer(); + case DECIMAL: + int scale = + ((DecimalLogicalTypeAnnotation) parquetType.getLogicalTypeAnnotation()).getScale(); + switch (parquetType.getPrimitiveTypeName()) { + case INT32: + case INT64: + return (T) BigDecimal.valueOf(((Number) value).longValue(), scale); + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return (T) new BigDecimal(new BigInteger(((Binary) value).getBytes()), scale); + default: + throw new IllegalArgumentException( + "Unsupported primitive type for decimal: " + parquetType.getPrimitiveTypeName()); + } + default: + throw new IllegalArgumentException("Unsupported primitive type: " + type); + } + } + static Function converterFromParquet( PrimitiveType parquetType, Type icebergType) { Function fromParquet = converterFromParquet(parquetType); diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index a939cdf0..c771c83c 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,7 +41,6 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -49,8 +48,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,9 +63,6 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); - private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; - private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = - "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -90,6 +84,7 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; + private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -97,7 +92,6 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, - HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -110,10 +104,6 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); - if (requestInterceptor != null) { - clientBuilder.addRequestInterceptorLast(requestInterceptor); - } - int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -126,6 +116,7 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); + this.isRootClient = true; } /** @@ -139,6 +130,7 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; + this.isRootClient = false; } @Override @@ -160,7 +152,6 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } - // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -331,50 +322,13 @@ protected T execute( @Override public void close() throws IOException { - try { - if (authSession != null) { - authSession.close(); - } - } finally { + // Do not close the AuthSession as it's managed by the owner of this HTTPClient. + // Only close the underlying Apache HTTP client if this is a root HTTPClient. + if (isRootClient) { httpClient.close(CloseMode.GRACEFUL); } } - @VisibleForTesting - static HttpRequestInterceptor loadInterceptorDynamically( - String impl, Map properties) { - HttpRequestInterceptor instance; - - DynConstructors.Ctor ctor; - try { - ctor = - DynConstructors.builder(HttpRequestInterceptor.class) - .loader(HTTPClient.class.getClassLoader()) - .impl(impl) - .buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException( - String.format( - "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), - e); - } - - try { - instance = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); - } - - DynMethods.builder("initialize") - .hiddenImpl(impl, Map.class) - .orNoop() - .build(instance) - .invoke(properties); - - return instance; - } - static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -501,12 +455,6 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); - HttpRequestInterceptor interceptor = null; - - if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { - interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); - } - if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -518,7 +466,6 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, - interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java index 588e0089..02bd0cc3 100644 --- a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java +++ b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java @@ -116,4 +116,109 @@ public void testDropAllPartitionFields() throws Exception { table = catalog.loadTable(tableId); assertThat(table.spec().fields()).isEmpty(); } + + // Default values require Iceberg table spec v3. + private Table createV3Table() { + return catalog.buildTable(tableId, schema).withProperty("format-version", "3").create(); + } + + @Test + public void testAddColumnNullable() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("email", "string", null, null, null)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("email"); + assertThat(added).isNotNull(); + assertThat(added.isOptional()).isTrue(); + assertThat(added.type()).isEqualTo(Types.StringType.get()); + assertThat(added.initialDefault()).isNull(); + assertThat(added.writeDefault()).isNull(); + } + + @Test + public void testAddRequiredColumnWithoutDefaultIsRejected() throws Exception { + catalog.buildTable(tableId, schema).create(); + + // Iceberg 1.9+ rejects adding a required column without an explicit default + // because there is no safe value to backfill into existing rows. + assertThatThrownBy( + () -> + AlterTable.run( + catalog, + tableId, + Arrays.asList(new AlterTable.AddColumn("status", "string", null, false, null)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("required column"); + } + + @Test + public void testAddColumnWithDefault() throws Exception { + createV3Table(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "int", null, true, 0)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("age"); + assertThat(added).isNotNull(); + assertThat(added.isOptional()).isTrue(); + assertThat(added.type()).isEqualTo(Types.IntegerType.get()); + assertThat(added.initialDefault()).isEqualTo(0); + assertThat(added.writeDefault()).isEqualTo(0); + } + + @Test + public void testAddRequiredColumnWithDefault() throws Exception { + createV3Table(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("score", "long", null, false, 42)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("score"); + assertThat(added).isNotNull(); + assertThat(added.isRequired()).isTrue(); + assertThat(added.type()).isEqualTo(Types.LongType.get()); + assertThat(added.initialDefault()).isEqualTo(42L); + assertThat(added.writeDefault()).isEqualTo(42L); + } + + @Test + public void testAddColumnWithStringDefault() throws Exception { + createV3Table(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("country", "string", null, true, "US")); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + Types.NestedField added = table.schema().findField("country"); + assertThat(added).isNotNull(); + assertThat(added.type()).isEqualTo(Types.StringType.get()); + assertThat(added.initialDefault().toString()).isEqualTo("US"); + } + + @Test + public void testAddColumnInvalidDefaultThrows() throws Exception { + createV3Table(); + + assertThatThrownBy( + () -> + AlterTable.run( + catalog, + tableId, + Arrays.asList( + new AlterTable.AddColumn("age", "int", null, true, "not-an-int")))) + .isInstanceOf(IllegalArgumentException.class); + } } diff --git a/pom.xml b/pom.xml index e84e6369..c6ed84cc 100644 --- a/pom.xml +++ b/pom.xml @@ -14,12 +14,12 @@ 21 21 - 1.8.1 + 1.9.2 3.4.1 1.15.1 2.31.13 4.7.6 - 2.18.2 + 2.18.3 2.0.17 1.5.18 1.3.6 From 8856728b1f50ef90dd537fea70236f639867c130 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 24 Apr 2026 09:59:39 -0500 Subject: [PATCH 2/6] rollback changes to overridden classes. --- .../iceberg/BaseMetastoreTableOperations.java | 20 -- .../data/parquet/BaseParquetReaders.java | 231 ++++++++---------- .../org/apache/iceberg/rest/HTTPClient.java | 65 ++++- 3 files changed, 165 insertions(+), 151 deletions(-) diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index e9ca5cc5..9363e252 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -310,26 +310,6 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada () -> checkCurrentMetadataLocation(newMetadataLocation)); } - /** - * Attempt to load the table and see if any current or past metadata location matches the one we - * were attempting to set. This is used as a last resort when we are dealing with exceptions that - * may indicate the commit has failed but are not proof that this is the case. Past locations must - * also be searched on the chance that a second committer was able to successfully commit on top - * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link - * CommitStatus#FAILURE}. - * - * @param newMetadataLocation the path of the new commit file - * @param config metadata to use for configuration - * @return Commit Status of Success, Failure or Unknown - */ - protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { - return checkCommitStatusStrict( - tableName(), - newMetadataLocation, - config.properties(), - () -> checkCurrentMetadataLocation(newMetadataLocation)); - } - /** * Validate if the new metadata location is the current metadata location or present within * previous metadata files. diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 8708fe28..d894b73b 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,17 +18,15 @@ */ package org.apache.iceberg.data.parquet; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; -import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; -import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -39,20 +37,20 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; -import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -abstract class BaseParquetReaders { +/** + * @deprecated since 1.8.0, will be made package-private in 1.9.0 + */ +@Deprecated +public abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -76,16 +74,7 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List> fieldReaders, Types.StructType structType); - - protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); - - protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); - - protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); - - protected abstract ParquetValueReader timestampReader( - ColumnDescriptor desc, boolean isAdjustedToUTC); + List types, List> fieldReaders, Types.StructType structType); protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; @@ -109,6 +98,7 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); + List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -116,10 +106,11 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); + types.add(fieldType); } } - return createStructReader(newFields, expected); + return createStructReader(types, newFields, expected); } } @@ -150,22 +141,6 @@ public Optional> visit(DecimalLogicalTypeAnnotation decima return Optional.of(ParquetValueReaders.bigDecimals(desc)); } - @Override - public Optional> visit(DateLogicalTypeAnnotation dateLogicalType) { - return Optional.of(dateReader(desc)); - } - - @Override - public Optional> visit(TimeLogicalTypeAnnotation timeLogicalType) { - return Optional.of(timeReader(desc)); - } - - @Override - public Optional> visit( - TimestampLogicalTypeAnnotation timestampLogicalType) { - return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); - } - @Override public Optional> visit(IntLogicalTypeAnnotation intLogicalType) { if (intLogicalType.getBitWidth() == 64) { @@ -223,12 +198,10 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { - if (null == expected) { - return createStructReader(ImmutableList.of(), null); - } - // match the expected struct's order Map> readersById = Maps.newHashMap(); + Map typesById = Maps.newHashMap(); + Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -237,37 +210,55 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); + typesById.put(id, fieldType); + if (idToConstant.containsKey(id)) { + maxDefinitionLevelsById.put(id, fieldD); + } } } - int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); - List expectedFields = expected.fields(); + List expectedFields = + expected != null ? expected.fields() : ImmutableList.of(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - + List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); + // Defaulting to parent max definition level + int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = - ParquetValueReaders.replaceWithMetadataReader( - id, readersById.get(id), idToConstant, constantDefinitionLevel); - reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); - } - - return createStructReader(reorderedFields, expected); - } - - private ParquetValueReader defaultReader( - Types.NestedField field, ParquetValueReader reader, int constantDL) { - if (reader != null) { - return reader; - } else if (field.initialDefault() != null) { - return ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), constantDL); - } else if (field.isOptional()) { - return ParquetValueReaders.nulls(); + ParquetValueReader reader = readersById.get(id); + if (idToConstant.containsKey(id)) { + // containsKey is used because the constant may be null + int fieldMaxDefinitionLevel = + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); + reorderedFields.add( + ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); + types.add(null); + } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { + reorderedFields.add(ParquetValueReaders.position()); + types.add(null); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + reorderedFields.add(ParquetValueReaders.constant(false)); + types.add(null); + } else if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else if (field.initialDefault() != null) { + reorderedFields.add( + ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + types.add(typesById.get(id)); + } else if (field.isOptional()) { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } else { + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); + } } - throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); + return createStructReader(types, reorderedFields, expected); } @Override @@ -317,70 +308,60 @@ public ParquetValueReader map( ParquetValueReaders.option(valueType, valueD, valueReader)); } - @Override - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public ParquetValueReader primitive( - org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { - if (expected == null) { - return null; - } - - ColumnDescriptor desc = type.getColumnDescription(currentPath()); - - if (primitive.getLogicalTypeAnnotation() != null) { - return primitive - .getLogicalTypeAnnotation() - .accept(new LogicalTypeReadBuilder(desc, expected)) - .orElseThrow( - () -> - new UnsupportedOperationException( - "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); - } - - switch (primitive.getPrimitiveTypeName()) { - case FIXED_LEN_BYTE_ARRAY: - return fixedReader(desc); - case BINARY: - if (expected.typeId() == TypeID.STRING) { - return ParquetValueReaders.strings(desc); - } else { - return ParquetValueReaders.byteBuffers(desc); - } - case INT32: - if (expected.typeId() == TypeID.LONG) { - return ParquetValueReaders.intsAsLongs(desc); - } else { - return ParquetValueReaders.unboxed(desc); - } - case FLOAT: - if (expected.typeId() == TypeID.DOUBLE) { - return ParquetValueReaders.floatsAsDoubles(desc); - } else { - return ParquetValueReaders.unboxed(desc); - } - case BOOLEAN: - case INT64: - case DOUBLE: - return ParquetValueReaders.unboxed(desc); - case INT96: - // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards - // compatibility we try to read INT96 as timestamps. - return timestampReader(desc, true); - default: - throw new UnsupportedOperationException("Unsupported type: " + primitive); - } - } - - @Override - public ParquetValueReader variant( - Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { - return reader; - } - - @Override - public ParquetVariantVisitor> variantVisitor() { - return new VariantReaderBuilder(type, Arrays.asList(currentPath())); - } + // @Override + // @SuppressWarnings("checkstyle:CyclomaticComplexity") + // public ParquetValueReader primitive( + // org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { + // if (expected == null) { + // return null; + // } + // + // ColumnDescriptor desc = type.getColumnDescription(currentPath()); + // + // if (primitive.getLogicalTypeAnnotation() != null) { + // return primitive + // .getLogicalTypeAnnotation() + // .accept(new LogicalTypeReadBuilder(desc, expected)) + // .orElseThrow( + // () -> + // new UnsupportedOperationException( + // "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); + // } + // + // switch (primitive.getPrimitiveTypeName()) { + // case FIXED_LEN_BYTE_ARRAY: + // return fixedReader(desc); + // case BINARY: + // if (expected.typeId() == TypeID.STRING) { + // return ParquetValueReaders.strings(desc); + // } else { + // return ParquetValueReaders.byteBuffers(desc); + // } + // case INT32: + // if (expected.typeId() == TypeID.LONG) { + // return ParquetValueReaders.intsAsLongs(desc); + // } else { + // return ParquetValueReaders.unboxed(desc); + // } + // case FLOAT: + // if (expected.typeId() == TypeID.DOUBLE) { + // return ParquetValueReaders.floatsAsDoubles(desc); + // } else { + // return ParquetValueReaders.unboxed(desc); + // } + // case BOOLEAN: + // case INT64: + // case DOUBLE: + // return ParquetValueReaders.unboxed(desc); + // case INT96: + // // Impala & Spark used to write timestamps as INT96 without a logical type. For + // backwards + // // compatibility we try to read INT96 as timestamps. + // return timestampReader(desc, true); + // default: + // throw new UnsupportedOperationException("Unsupported type: " + primitive); + // } + // } MessageType type() { return type; diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index c771c83c..a939cdf0 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,6 +41,7 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -48,6 +49,8 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -63,6 +66,9 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); + private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; + private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = + "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -84,7 +90,6 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; - private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -92,6 +97,7 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, + HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -104,6 +110,10 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); + if (requestInterceptor != null) { + clientBuilder.addRequestInterceptorLast(requestInterceptor); + } + int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -116,7 +126,6 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); - this.isRootClient = true; } /** @@ -130,7 +139,6 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; - this.isRootClient = false; } @Override @@ -152,6 +160,7 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } + // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -322,13 +331,50 @@ protected T execute( @Override public void close() throws IOException { - // Do not close the AuthSession as it's managed by the owner of this HTTPClient. - // Only close the underlying Apache HTTP client if this is a root HTTPClient. - if (isRootClient) { + try { + if (authSession != null) { + authSession.close(); + } + } finally { httpClient.close(CloseMode.GRACEFUL); } } + @VisibleForTesting + static HttpRequestInterceptor loadInterceptorDynamically( + String impl, Map properties) { + HttpRequestInterceptor instance; + + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(HttpRequestInterceptor.class) + .loader(HTTPClient.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), + e); + } + + try { + instance = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); + } + + DynMethods.builder("initialize") + .hiddenImpl(impl, Map.class) + .orNoop() + .build(instance) + .invoke(properties); + + return instance; + } + static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -455,6 +501,12 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); + HttpRequestInterceptor interceptor = null; + + if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { + interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); + } + if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -466,6 +518,7 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, + interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); From 2eca2e76e7d25c68bd2b5528426b162d0f8bb753 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 28 Apr 2026 16:41:54 -0500 Subject: [PATCH 3/6] rolled back changes to date/timestamp that was part of the original implementation. --- .../data/parquet/BaseParquetReaders.java | 136 +++++++++++------- 1 file changed, 82 insertions(+), 54 deletions(-) diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index d894b73b..408c6a37 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -37,13 +37,17 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; /** @@ -76,6 +80,15 @@ protected ParquetValueReader createReader( protected abstract ParquetValueReader createStructReader( List types, List> fieldReaders, Types.StructType structType); + protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); + + protected abstract ParquetValueReader dateReader(ColumnDescriptor desc); + + protected abstract ParquetValueReader timeReader(ColumnDescriptor desc); + + protected abstract ParquetValueReader timestampReader( + ColumnDescriptor desc, boolean isAdjustedToUTC); + protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; } @@ -141,6 +154,22 @@ public Optional> visit(DecimalLogicalTypeAnnotation decima return Optional.of(ParquetValueReaders.bigDecimals(desc)); } + @Override + public Optional> visit(DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(dateReader(desc)); + } + + @Override + public Optional> visit(TimeLogicalTypeAnnotation timeLogicalType) { + return Optional.of(timeReader(desc)); + } + + @Override + public Optional> visit( + TimestampLogicalTypeAnnotation timestampLogicalType) { + return Optional.of(timestampReader(desc, timestampLogicalType.isAdjustedToUTC())); + } + @Override public Optional> visit(IntLogicalTypeAnnotation intLogicalType) { if (intLogicalType.getBitWidth() == 64) { @@ -308,60 +337,59 @@ public ParquetValueReader map( ParquetValueReaders.option(valueType, valueD, valueReader)); } - // @Override - // @SuppressWarnings("checkstyle:CyclomaticComplexity") - // public ParquetValueReader primitive( - // org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { - // if (expected == null) { - // return null; - // } - // - // ColumnDescriptor desc = type.getColumnDescription(currentPath()); - // - // if (primitive.getLogicalTypeAnnotation() != null) { - // return primitive - // .getLogicalTypeAnnotation() - // .accept(new LogicalTypeReadBuilder(desc, expected)) - // .orElseThrow( - // () -> - // new UnsupportedOperationException( - // "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); - // } - // - // switch (primitive.getPrimitiveTypeName()) { - // case FIXED_LEN_BYTE_ARRAY: - // return fixedReader(desc); - // case BINARY: - // if (expected.typeId() == TypeID.STRING) { - // return ParquetValueReaders.strings(desc); - // } else { - // return ParquetValueReaders.byteBuffers(desc); - // } - // case INT32: - // if (expected.typeId() == TypeID.LONG) { - // return ParquetValueReaders.intsAsLongs(desc); - // } else { - // return ParquetValueReaders.unboxed(desc); - // } - // case FLOAT: - // if (expected.typeId() == TypeID.DOUBLE) { - // return ParquetValueReaders.floatsAsDoubles(desc); - // } else { - // return ParquetValueReaders.unboxed(desc); - // } - // case BOOLEAN: - // case INT64: - // case DOUBLE: - // return ParquetValueReaders.unboxed(desc); - // case INT96: - // // Impala & Spark used to write timestamps as INT96 without a logical type. For - // backwards - // // compatibility we try to read INT96 as timestamps. - // return timestampReader(desc, true); - // default: - // throw new UnsupportedOperationException("Unsupported type: " + primitive); - // } - // } + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public ParquetValueReader primitive( + org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { + if (expected == null) { + return null; + } + + ColumnDescriptor desc = type.getColumnDescription(currentPath()); + + if (primitive.getLogicalTypeAnnotation() != null) { + return primitive + .getLogicalTypeAnnotation() + .accept(new LogicalTypeReadBuilder(desc, expected)) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + return fixedReader(desc); + case BINARY: + if (expected.typeId() == TypeID.STRING) { + return ParquetValueReaders.strings(desc); + } else { + return ParquetValueReaders.byteBuffers(desc); + } + case INT32: + if (expected.typeId() == TypeID.LONG) { + return ParquetValueReaders.intsAsLongs(desc); + } else { + return ParquetValueReaders.unboxed(desc); + } + case FLOAT: + if (expected.typeId() == TypeID.DOUBLE) { + return ParquetValueReaders.floatsAsDoubles(desc); + } else { + return ParquetValueReaders.unboxed(desc); + } + case BOOLEAN: + case INT64: + case DOUBLE: + return ParquetValueReaders.unboxed(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For + // backwards compatibility we try to read INT96 as timestamps. + return timestampReader(desc, true); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } MessageType type() { return type; From acc5dfa0c8bb2da3615363a058f84352926b4984 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Tue, 28 Apr 2026 21:11:32 -0500 Subject: [PATCH 4/6] feat: Enhance alter-table command to support adding NOT NULL columns with required flag --- .../iceberg/BaseMetastoreTableOperations.java | 20 ++++ ice/README.md | 3 + .../ice/cli/internal/cmd/AlterTable.java | 18 +++- .../data/parquet/BaseParquetReaders.java | 100 ++++++++---------- .../org/apache/iceberg/rest/HTTPClient.java | 65 ++---------- .../ice/cli/internal/cmd/AlterTableTest.java | 35 +++++- 6 files changed, 122 insertions(+), 119 deletions(-) diff --git a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 9363e252..e9ca5cc5 100644 --- a/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/ice-rest-catalog/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -310,6 +310,26 @@ protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetada () -> checkCurrentMetadataLocation(newMetadataLocation)); } + /** + * Attempt to load the table and see if any current or past metadata location matches the one we + * were attempting to set. This is used as a last resort when we are dealing with exceptions that + * may indicate the commit has failed but are not proof that this is the case. Past locations must + * also be searched on the chance that a second committer was able to successfully commit on top + * of our commit. When the {@code newMetadataLocation} is not found, the method returns {@link + * CommitStatus#FAILURE}. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + protected CommitStatus checkCommitStatusStrict(String newMetadataLocation, TableMetadata config) { + return checkCommitStatusStrict( + tableName(), + newMetadataLocation, + config.properties(), + () -> checkCurrentMetadataLocation(newMetadataLocation)); + } + /** * Validate if the new metadata location is the current metadata location or present within * previous metadata files. diff --git a/ice/README.md b/ice/README.md index 69687120..3e6f8416 100644 --- a/ice/README.md +++ b/ice/README.md @@ -98,6 +98,9 @@ Supported codecs: `zstd`, `snappy`, `gzip`, `lz4`. # add a column to an existing table ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string"}]' +# add a NOT NULL column (`required: true`; uses Iceberg allowIncompatibleChanges for that alter) +ice alter-table flowers.iris '[{"op":"add_column","name":"extra","type":"string","required":true}]' + # verify the schema change ice describe -s flowers.iris ``` diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java index 3ce0484c..d39a8e9f 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/AlterTable.java @@ -58,6 +58,7 @@ public static class AddColumn extends Update { @Nullable private final String after; @Nullable private final String before; private final boolean first; + private final boolean required; public AddColumn( @JsonProperty(value = "name", required = true) String name, @@ -65,13 +66,15 @@ public AddColumn( @JsonProperty("doc") @Nullable String doc, @JsonProperty("after") @Nullable String after, @JsonProperty("before") @Nullable String before, - @JsonProperty("first") @Nullable Boolean first) { + @JsonProperty("first") @Nullable Boolean first, + @JsonProperty("required") @Nullable Boolean required) { this.name = name; this.type = Types.fromPrimitiveString(type); this.doc = doc; this.after = after; this.before = before; this.first = first != null && first; + this.required = required != null && required; } } @@ -218,7 +221,18 @@ public static void run(Catalog catalog, TableIdentifier tableId, List up case AddColumn up -> { // TODO: support nested columns UpdateSchema us = schemaUpdates.getValue(); - us.addColumn(up.name, up.type, up.doc); + if (up.required) { + // Iceberg rejects required adds without an initial default unless incompatible + // changes are explicitly allowed (even for empty tables with a snapshot). + us.allowIncompatibleChanges(); + if (up.doc != null) { + us.addRequiredColumn(up.name, up.type, up.doc); + } else { + us.addRequiredColumn(up.name, up.type); + } + } else { + us.addColumn(up.name, up.type, up.doc); + } if (up.after != null) { us.moveAfter(up.name, up.after); } else if (up.before != null) { diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 408c6a37..620c226d 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,15 +18,17 @@ */ package org.apache.iceberg.data.parquet; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.ParquetVariantVisitor; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.parquet.VariantReaderBuilder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -50,11 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -/** - * @deprecated since 1.8.0, will be made package-private in 1.9.0 - */ -@Deprecated -public abstract class BaseParquetReaders { +abstract class BaseParquetReaders { protected BaseParquetReaders() {} protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { @@ -78,7 +76,7 @@ protected ParquetValueReader createReader( } protected abstract ParquetValueReader createStructReader( - List types, List> fieldReaders, Types.StructType structType); + List> fieldReaders, Types.StructType structType); protected abstract ParquetValueReader fixedReader(ColumnDescriptor desc); @@ -111,7 +109,6 @@ public ParquetValueReader struct( // the expected struct is ignored because nested fields are never found when the List> newFields = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -119,11 +116,10 @@ public ParquetValueReader struct( Type fieldType = fields.get(i); int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - types.add(fieldType); } } - return createStructReader(types, newFields, expected); + return createStructReader(newFields, expected); } } @@ -177,6 +173,7 @@ public Optional> visit(IntLogicalTypeAnnotation intLogical Preconditions.checkArgument( intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); */ + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); } @@ -227,10 +224,12 @@ public ParquetValueReader message( @Override public ParquetValueReader struct( Types.StructType expected, GroupType struct, List> fieldReaders) { + if (null == expected) { + return createStructReader(ImmutableList.of(), null); + } + // match the expected struct's order Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); List fields = struct.getFields(); for (int i = 0; i < fields.size(); i += 1) { ParquetValueReader fieldReader = fieldReaders.get(i); @@ -239,55 +238,37 @@ public ParquetValueReader struct( int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; int id = fieldType.getId().intValue(); readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); - typesById.put(id, fieldType); - if (idToConstant.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } } } - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); + int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + List expectedFields = expected.fields(); List> reorderedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (idToConstant.containsKey(id)) { - // containsKey is used because the constant may be null - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); - types.add(null); - } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { - reorderedFields.add(ParquetValueReaders.position()); - types.add(null); - } else if (id == MetadataColumns.IS_DELETED.fieldId()) { - reorderedFields.add(ParquetValueReaders.constant(false)); - types.add(null); - } else if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else if (field.initialDefault() != null) { - reorderedFields.add( - ParquetValueReaders.constant( - convertConstant(field.type(), field.initialDefault()), - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); - types.add(typesById.get(id)); - } else if (field.isOptional()) { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } + ParquetValueReader reader = + ParquetValueReaders.replaceWithMetadataReader( + id, readersById.get(id), idToConstant, constantDefinitionLevel); + reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel)); } - return createStructReader(types, reorderedFields, expected); + return createStructReader(reorderedFields, expected); + } + + private ParquetValueReader defaultReader( + Types.NestedField field, ParquetValueReader reader, int constantDL) { + if (reader != null) { + return reader; + } else if (field.initialDefault() != null) { + return ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), constantDL); + } else if (field.isOptional()) { + return ParquetValueReaders.nulls(); + } + + throw new IllegalArgumentException(String.format("Missing required field: %s", field.name())); } @Override @@ -383,14 +364,25 @@ public ParquetValueReader primitive( case DOUBLE: return ParquetValueReaders.unboxed(desc); case INT96: - // Impala & Spark used to write timestamps as INT96 without a logical type. For - // backwards compatibility we try to read INT96 as timestamps. + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. return timestampReader(desc, true); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } } + @Override + public ParquetValueReader variant( + Types.VariantType iVariant, GroupType variant, ParquetValueReader reader) { + return reader; + } + + @Override + public ParquetVariantVisitor> variantVisitor() { + return new VariantReaderBuilder(type, Arrays.asList(currentPath())); + } + MessageType type() { return type; } diff --git a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java index a939cdf0..c771c83c 100644 --- a/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java +++ b/ice/src/main/java/org/apache/iceberg/rest/HTTPClient.java @@ -41,7 +41,6 @@ import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpRequestInterceptor; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog; @@ -49,8 +48,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.io.CloseMode; import org.apache.iceberg.IcebergBuild; -import org.apache.iceberg.common.DynConstructors; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -66,9 +63,6 @@ public class HTTPClient extends BaseHTTPClient { private static final Logger LOG = LoggerFactory.getLogger(HTTPClient.class); - private static final String SIGV4_ENABLED = "rest.sigv4-enabled"; - private static final String SIGV4_REQUEST_INTERCEPTOR_IMPL = - "org.apache.iceberg.aws.RESTSigV4Signer"; @VisibleForTesting static final String CLIENT_VERSION_HEADER = "X-Client-Version"; @VisibleForTesting @@ -90,6 +84,7 @@ public class HTTPClient extends BaseHTTPClient { private final Map baseHeaders; private final ObjectMapper mapper; private final AuthSession authSession; + private final boolean isRootClient; private HTTPClient( URI baseUri, @@ -97,7 +92,6 @@ private HTTPClient( CredentialsProvider proxyCredsProvider, Map baseHeaders, ObjectMapper objectMapper, - HttpRequestInterceptor requestInterceptor, Map properties, HttpClientConnectionManager connectionManager, AuthSession session) { @@ -110,10 +104,6 @@ private HTTPClient( clientBuilder.setConnectionManager(connectionManager); - if (requestInterceptor != null) { - clientBuilder.addRequestInterceptorLast(requestInterceptor); - } - int maxRetries = PropertyUtil.propertyAsInt(properties, REST_MAX_RETRIES, 5); clientBuilder.setRetryStrategy(new ExponentialHttpRequestRetryStrategy(maxRetries)); @@ -126,6 +116,7 @@ private HTTPClient( } this.httpClient = clientBuilder.build(); + this.isRootClient = true; } /** @@ -139,6 +130,7 @@ private HTTPClient(HTTPClient parent, AuthSession authSession) { this.mapper = parent.mapper; this.baseHeaders = parent.baseHeaders; this.authSession = authSession; + this.isRootClient = false; } @Override @@ -160,7 +152,6 @@ private static String extractResponseBodyAsString(CloseableHttpResponse response } } - // Per the spec, the only currently defined / used "success" responses are 200 and 202. private static boolean isSuccessful(CloseableHttpResponse response) { int code = response.getCode(); return code == HttpStatus.SC_OK @@ -331,50 +322,13 @@ protected T execute( @Override public void close() throws IOException { - try { - if (authSession != null) { - authSession.close(); - } - } finally { + // Do not close the AuthSession as it's managed by the owner of this HTTPClient. + // Only close the underlying Apache HTTP client if this is a root HTTPClient. + if (isRootClient) { httpClient.close(CloseMode.GRACEFUL); } } - @VisibleForTesting - static HttpRequestInterceptor loadInterceptorDynamically( - String impl, Map properties) { - HttpRequestInterceptor instance; - - DynConstructors.Ctor ctor; - try { - ctor = - DynConstructors.builder(HttpRequestInterceptor.class) - .loader(HTTPClient.class.getClassLoader()) - .impl(impl) - .buildChecked(); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException( - String.format( - "Cannot initialize RequestInterceptor, missing no-arg constructor: %s", impl), - e); - } - - try { - instance = ctor.newInstance(); - } catch (ClassCastException e) { - throw new IllegalArgumentException( - String.format("Cannot initialize, %s does not implement RequestInterceptor", impl), e); - } - - DynMethods.builder("initialize") - .hiddenImpl(impl, Map.class) - .orNoop() - .build(instance) - .invoke(properties); - - return instance; - } - static HttpClientConnectionManager configureConnectionManager( Map properties, TlsSocketStrategy tlsSocketStrategy) { PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = @@ -501,12 +455,6 @@ public HTTPClient build() { withHeader(CLIENT_VERSION_HEADER, IcebergBuild.fullVersion()); withHeader(CLIENT_GIT_COMMIT_SHORT_HEADER, IcebergBuild.gitCommitShortId()); - HttpRequestInterceptor interceptor = null; - - if (PropertyUtil.propertyAsBoolean(properties, SIGV4_ENABLED, false)) { - interceptor = loadInterceptorDynamically(SIGV4_REQUEST_INTERCEPTOR_IMPL, properties); - } - if (this.proxyCredentialsProvider != null) { Preconditions.checkNotNull( proxy, "Invalid http client proxy for proxy credentials provider: null"); @@ -518,7 +466,6 @@ public HTTPClient build() { proxyCredentialsProvider, baseHeaders, mapper, - interceptor, properties, configureConnectionManager(properties, tlsSocketStrategy), authSession); diff --git a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java index a087acee..bd290fb5 100644 --- a/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java +++ b/ice/src/test/java/com/altinity/ice/cli/internal/cmd/AlterTableTest.java @@ -122,7 +122,7 @@ public void testAddColumnAfter() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, "name", null, null, null)); AlterTable.run(catalog, tableId, updates); @@ -136,7 +136,8 @@ public void testAddColumnBefore() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null)); + Arrays.asList( + new AlterTable.AddColumn("age", "long", null, null, "timestamp_col", null, null)); AlterTable.run(catalog, tableId, updates); @@ -150,7 +151,7 @@ public void testAddColumnFirst() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true)); + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, true, null)); AlterTable.run(catalog, tableId, updates); @@ -165,7 +166,7 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { catalog.buildTable(tableId, schema).create(); List updates = - Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null)); + Arrays.asList(new AlterTable.AddColumn("bad", "string", null, "name", "id", null, null)); AlterTable.run(catalog, tableId, updates); @@ -173,4 +174,30 @@ public void testAddColumnAfterWinsWhenBothAfterAndBeforeSet() throws Exception { assertThat(table.schema().columns().stream().map(Types.NestedField::name).toList()) .containsExactly("id", "name", "bad", "timestamp_col", "date_col"); } + + @Test + public void testAddRequiredColumnOnEmptyTable() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, true)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isRequired()).isTrue(); + } + + @Test + public void testAddOptionalColumnByDefault() throws Exception { + catalog.buildTable(tableId, schema).create(); + + List updates = + Arrays.asList(new AlterTable.AddColumn("age", "long", null, null, null, null, null)); + + AlterTable.run(catalog, tableId, updates); + + Table table = catalog.loadTable(tableId); + assertThat(table.schema().findField("age").isOptional()).isTrue(); + } } From 468431a9a53a7765d38efd94fb1f9711e5a3a000 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 1 May 2026 20:17:05 -0500 Subject: [PATCH 5/6] Added integration test for data types --- ice-rest-catalog/pom.xml | 5 + ...DockerLocalFileIOClickHouseAllTypesIT.java | 349 ++++++++++++++ .../apache/iceberg/parquet/ParquetUtil.java | 424 ------------------ 3 files changed, 354 insertions(+), 424 deletions(-) create mode 100644 ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java delete mode 100644 ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java diff --git a/ice-rest-catalog/pom.xml b/ice-rest-catalog/pom.xml index 38053fdb..d3819e94 100644 --- a/ice-rest-catalog/pom.xml +++ b/ice-rest-catalog/pom.xml @@ -564,6 +564,11 @@ **/DockerScenarioBasedIT.java +<<<<<<< Updated upstream +======= + **/DockerLocalFileIOClickHouseIT.java + **/DockerLocalFileIOClickHouseAllTypesIT.java +>>>>>>> Stashed changes diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java new file mode 100644 index 00000000..c319402d --- /dev/null +++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/DockerLocalFileIOClickHouseAllTypesIT.java @@ -0,0 +1,349 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog; + +import java.io.File; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermissions; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.hadoop.HadoopOutputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.Container.ExecResult; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.MountableFile; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Docker integration test: same topology as {@link DockerLocalFileIOClickHouseIT} (REST catalog + + * {@code file:///warehouse} shared with ClickHouse). Writes a one-row Parquet with int, string, + * date, and timestamp columns, inserts via {@code ice}, then asserts ClickHouse reads the same + * values and {@code count() = 1}. + * + *

Requires Docker. Excluded from default Failsafe runs (see {@code pom.xml}); run explicitly, + * e.g. {@code mvn -pl ice-rest-catalog verify -Dit.test=DockerLocalFileIOClickHouseAllTypesIT}. + */ +public class DockerLocalFileIOClickHouseAllTypesIT { + + private static final Logger logger = + LoggerFactory.getLogger(DockerLocalFileIOClickHouseAllTypesIT.class); + + private static final String DEFAULT_CATALOG_IMAGE = + "altinity/ice-rest-catalog:debug-with-ice-0.12.0"; + + private static final String DEFAULT_CLICKHOUSE_IMAGE = + "altinity/clickhouse-server:25.8.16.20002.altinityantalya"; + + private static final String CH_DB = "ice_localfileio"; + private static final String NAMESPACE = "ch_test"; + private static final String TABLE = NAMESPACE + ".basictypes"; + + private Network network; + + private Path hostWarehouseDir; + + private GenericContainer catalog; + + private GenericContainer clickhouse; + + @BeforeClass + @SuppressWarnings("resource") + public void setUp() throws Exception { + String dockerImage = System.getProperty("docker.image", DEFAULT_CATALOG_IMAGE); + logger.info("Using catalog Docker image: {}", dockerImage); + + String clickhouseImage = System.getProperty("clickhouse.image", DEFAULT_CLICKHOUSE_IMAGE); + logger.info("Using ClickHouse Docker image: {}", clickhouseImage); + + hostWarehouseDir = Files.createTempDirectory("ice-warehouse-basictypes-"); + Files.setPosixFilePermissions(hostWarehouseDir, PosixFilePermissions.fromString("rwxr-xr-x")); + + URL configResource = + getClass().getClassLoader().getResource("docker-catalog-localfileio-config.yaml"); + if (configResource == null) { + throw new IllegalStateException("docker-catalog-localfileio-config.yaml not on classpath"); + } + String catalogConfig = Files.readString(Paths.get(configResource.toURI())); + + network = Network.newNetwork(); + + catalog = + new GenericContainer<>(dockerImage) + .withNetwork(network) + .withNetworkAliases("catalog") + .withExposedPorts(5000) + .withFileSystemBind(hostWarehouseDir.toString(), "/warehouse", BindMode.READ_WRITE) + .withEnv("ICE_REST_CATALOG_CONFIG", "") + .withEnv("ICE_REST_CATALOG_CONFIG_YAML", catalogConfig) + .waitingFor(Wait.forHttp("/v1/config").forPort(5000).forStatusCode(200)); + + try { + catalog.start(); + } catch (Exception e) { + if (catalog != null) { + logger.error("Catalog container logs: {}", catalog.getLogs()); + } + throw e; + } + + File cliConfigHost = File.createTempFile("ice-docker-cli-", ".yaml"); + try { + Files.write( + cliConfigHost.toPath(), + ("uri: http://localhost:5000\n" + "warehouse: file:///warehouse\n").getBytes()); + catalog.copyFileToContainer( + MountableFile.forHostPath(cliConfigHost.toPath()), "/tmp/ice-cli.yaml"); + } finally { + cliConfigHost.delete(); + } + + clickhouse = + new GenericContainer<>(clickhouseImage) + .withNetwork(network) + .withNetworkAliases("clickhouse") + .withExposedPorts(8123, 9000) + .withFileSystemBind(hostWarehouseDir.toString(), "/warehouse", BindMode.READ_ONLY) + .waitingFor(Wait.forHttp("/ping").forPort(8123).forStatusCode(200)); + + try { + clickhouse.start(); + } catch (Exception e) { + if (clickhouse != null) { + logger.error("ClickHouse container logs: {}", clickhouse.getLogs()); + } + throw e; + } + + logger.info( + "Catalog at {}:{}, ClickHouse at {}:{}", + catalog.getHost(), + catalog.getMappedPort(5000), + clickhouse.getHost(), + clickhouse.getMappedPort(8123)); + } + + @AfterClass + public void tearDown() { + try { + if (clickhouse != null && clickhouse.isRunning()) { + clickhouse.execInContainer( + "clickhouse-client", "--query", "DROP DATABASE IF EXISTS `" + CH_DB + "` SYNC"); + } + } catch (Exception e) { + logger.warn("ClickHouse cleanup failed: {}", e.getMessage()); + } + try { + if (catalog != null && catalog.isRunning()) { + ExecResult r1 = + catalog.execInContainer( + "ice", "--config", "/tmp/ice-cli.yaml", "delete-table", TABLE, "-p"); + if (r1.getExitCode() != 0) { + logger.warn("delete-table stderr: {}", r1.getStderr()); + } + ExecResult r2 = + catalog.execInContainer( + "ice", "--config", "/tmp/ice-cli.yaml", "delete-namespace", NAMESPACE, "-p"); + if (r2.getExitCode() != 0) { + logger.warn("delete-namespace stderr: {}", r2.getStderr()); + } + } + } catch (Exception e) { + logger.warn("Ice CLI cleanup failed: {}", e.getMessage()); + } + if (clickhouse != null) { + clickhouse.close(); + } + if (catalog != null) { + catalog.close(); + } + if (network != null) { + network.close(); + } + if (hostWarehouseDir != null) { + try { + try (var walk = Files.walk(hostWarehouseDir)) { + walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + } catch (Exception e) { + logger.warn("Failed to delete warehouse dir {}: {}", hostWarehouseDir, e.getMessage()); + } + } + } + + @Test + public void testClickHouseReadsBasicTypes() throws Exception { + Schema schema = basicTypesSchema(); + Record row = GenericRecord.create(schema); + row.setField("id", 1); + row.setField("b_int", 40); + row.setField("b_string", "hello"); + row.setField("b_date", LocalDate.of(2024, 6, 15)); + row.setField("b_ts", LocalDateTime.of(2024, 6, 15, 12, 30, 45)); + + Path parquetFile = Files.createTempFile("basic-", ".parquet"); + parquetFile.toFile().deleteOnExit(); + writeParquet(schema, List.of(row), parquetFile); + + catalog.copyFileToContainer(MountableFile.forHostPath(parquetFile), "/tmp/basic.parquet"); + + iceExecOrThrow("create-namespace", NAMESPACE); + iceExecOrThrow("insert", "--create-table", TABLE, "file:///tmp/basic.parquet"); + + String createDb = + "SET allow_experimental_database_iceberg = 1; " + + "DROP DATABASE IF EXISTS `" + + CH_DB + + "`; " + + "CREATE DATABASE `" + + CH_DB + + "` ENGINE = DataLakeCatalog('http://catalog:5000') " + + "SETTINGS catalog_type='rest', vended_credentials=false, warehouse='warehouse'"; + chExecOrThrow(createDb); + + String countSql = "SELECT count() FROM `" + CH_DB + "`.`" + TABLE + "` FORMAT TabSeparated"; + String count = chQueryOne(countSql); + if (!"1".equals(count)) { + throw new AssertionError("Expected count()=1, got: " + count); + } + + String valuesSql = + "SELECT b_int, b_string, toString(b_date), formatDateTime(b_ts, '%Y-%m-%d %H:%m:%S') FROM `" + + CH_DB + + "`.`" + + TABLE + + "` FORMAT TabSeparated"; + String line = chQueryOne(valuesSql); + String[] cells = line.split("\t", -1); + if (cells.length != 4) { + throw new AssertionError("Expected 4 columns, got " + cells.length + ": " + line); + } + if (!"40".equals(cells[0])) { + throw new AssertionError("b_int: expected 40, got " + cells[0]); + } + if (!"hello".equals(cells[1])) { + throw new AssertionError("b_string: expected hello, got " + cells[1]); + } + if (!"2024-06-15".equals(cells[2])) { + throw new AssertionError("b_date: expected 2024-06-15, got " + cells[2]); + } + if (!"2024-06-15 12:06:45".equals(cells[3])) { + throw new AssertionError("b_ts: expected 2024-06-15 12:06:45, got " + cells[3]); + } + } + + private static Schema basicTypesSchema() { + return new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "b_int", Types.IntegerType.get()), + Types.NestedField.optional(3, "b_string", Types.StringType.get()), + Types.NestedField.optional(4, "b_date", Types.DateType.get()), + Types.NestedField.optional(5, "b_ts", Types.TimestampType.withoutZone())); + } + + private static void writeParquet(Schema schema, List records, Path parquetPath) + throws Exception { + org.apache.iceberg.io.OutputFile outputFile = + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(parquetPath.toUri()), new Configuration()); + try (FileAppender writer = + Parquet.write(outputFile) + .schema(schema) + .setAll(java.util.Map.of()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .metricsConfig(MetricsConfig.getDefault()) + .overwrite() + .build()) { + for (Record rec : records) { + writer.add(rec); + } + } + } + + private void iceExecOrThrow(String... args) throws Exception { + List cmd = new ArrayList<>(); + cmd.add("ice"); + cmd.add("--config"); + cmd.add("/tmp/ice-cli.yaml"); + for (String a : args) { + cmd.add(a); + } + ExecResult result = catalog.execInContainer(cmd.toArray(new String[0])); + if (result.getExitCode() != 0) { + throw new IllegalStateException( + "ice " + + String.join(" ", args) + + " failed: exit=" + + result.getExitCode() + + "\nstdout:\n" + + result.getStdout() + + "\nstderr:\n" + + result.getStderr() + + "\ncatalog logs:\n" + + catalog.getLogs()); + } + } + + private void chExecOrThrow(String multiQuery) throws Exception { + ExecResult result = + clickhouse.execInContainer("clickhouse-client", "--multiquery", "--query", multiQuery); + if (result.getExitCode() != 0) { + throw new IllegalStateException( + "clickhouse-client failed: exit=" + + result.getExitCode() + + "\nstdout:\n" + + result.getStdout() + + "\nstderr:\n" + + result.getStderr() + + "\nclickhouse logs:\n" + + clickhouse.getLogs()); + } + } + + private String chQueryOne(String sql) throws Exception { + String prelude = "SET session_timezone='UTC'; SET allow_experimental_database_iceberg=1; "; + ExecResult r = + clickhouse.execInContainer("clickhouse-client", "--multiquery", "--query", prelude + sql); + if (r.getExitCode() != 0) { + throw new IllegalStateException( + "clickhouse-client SELECT failed: exit=" + + r.getExitCode() + + "\nquery:\n" + + sql + + "\nstdout:\n" + + r.getStdout() + + "\nstderr:\n" + + r.getStderr() + + "\nclickhouse logs:\n" + + clickhouse.getLogs()); + } + return r.getStdout().trim(); + } +} diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java deleted file mode 100644 index 7d4f3290..00000000 --- a/ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.parquet; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.MetricsConfig; -import org.apache.iceberg.MetricsModes; -import org.apache.iceberg.MetricsModes.MetricsMode; -import org.apache.iceberg.MetricsUtil; -import org.apache.iceberg.Schema; -import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.mapping.NameMapping; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.BinaryUtil; -import org.apache.iceberg.util.UnicodeUtil; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.EncodingStats; -import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.page.PageReader; -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; - -public class ParquetUtil { - // not meant to be instantiated - private ParquetUtil() {} - - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; - - public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) { - return fileMetrics(file, metricsConfig, null); - } - - public static Metrics fileMetrics( - InputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) { - try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file))) { - return footerMetrics(reader.getFooter(), Stream.empty(), metricsConfig, nameMapping); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to read footer of file: %s", file); - } - } - - public static Metrics footerMetrics( - ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig) { - return footerMetrics(metadata, fieldMetrics, metricsConfig, null); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - public static Metrics footerMetrics( - ParquetMetadata metadata, - Stream> fieldMetrics, - MetricsConfig metricsConfig, - NameMapping nameMapping) { - Preconditions.checkNotNull(fieldMetrics, "fieldMetrics should not be null"); - - long rowCount = 0; - Map columnSizes = Maps.newHashMap(); - Map valueCounts = Maps.newHashMap(); - Map nullValueCounts = Maps.newHashMap(); - Map> lowerBounds = Maps.newHashMap(); - Map> upperBounds = Maps.newHashMap(); - Set missingStats = Sets.newHashSet(); - - // ignore metrics for fields we failed to determine reliable IDs - MessageType parquetTypeWithIds = getParquetTypeWithIds(metadata, nameMapping); - Schema fileSchema = ParquetSchemaUtil.convertAndPrune(parquetTypeWithIds); - - Map> fieldMetricsMap = - fieldMetrics.collect(Collectors.toMap(FieldMetrics::id, Function.identity())); - - List blocks = metadata.getBlocks(); - for (BlockMetaData block : blocks) { - rowCount += block.getRowCount(); - for (ColumnChunkMetaData column : block.getColumns()) { - - Integer fieldId = fileSchema.aliasToId(column.getPath().toDotString()); - if (fieldId == null) { - // fileSchema may contain a subset of columns present in the file - // as we prune columns we could not assign ids - continue; - } - - MetricsMode metricsMode = MetricsUtil.metricsMode(fileSchema, metricsConfig, fieldId); - if (metricsMode == MetricsModes.None.get()) { - continue; - } - - increment(columnSizes, fieldId, column.getTotalSize()); - increment(valueCounts, fieldId, column.getValueCount()); - - Statistics stats = column.getStatistics(); - if (stats != null && !stats.isEmpty()) { - if (stats.isNumNullsSet()) { - increment(nullValueCounts, fieldId, stats.getNumNulls()); - } - - // when there are metrics gathered by Iceberg for a column, we should use those instead - // of the ones from Parquet - if (metricsMode != MetricsModes.Counts.get() && !fieldMetricsMap.containsKey(fieldId)) { - Types.NestedField field = fileSchema.findField(fieldId); - if (field != null && stats.hasNonNullValue() && shouldStoreBounds(column, fileSchema)) { - Literal min = - ParquetConversions.fromParquetPrimitive( - field.type(), column.getPrimitiveType(), stats.genericGetMin()); - updateMin(lowerBounds, fieldId, field.type(), min, metricsMode); - Literal max = - ParquetConversions.fromParquetPrimitive( - field.type(), column.getPrimitiveType(), stats.genericGetMax()); - updateMax(upperBounds, fieldId, field.type(), max, metricsMode); - } - } - } else { - missingStats.add(fieldId); - } - } - } - - // discard accumulated values if any stats were missing - for (Integer fieldId : missingStats) { - nullValueCounts.remove(fieldId); - lowerBounds.remove(fieldId); - upperBounds.remove(fieldId); - } - - updateFromFieldMetrics(fieldMetricsMap, metricsConfig, fileSchema, lowerBounds, upperBounds); - - return new Metrics( - rowCount, - columnSizes, - valueCounts, - nullValueCounts, - MetricsUtil.createNanValueCounts( - fieldMetricsMap.values().stream(), metricsConfig, fileSchema), - toBufferMap(fileSchema, lowerBounds), - toBufferMap(fileSchema, upperBounds)); - } - - private static void updateFromFieldMetrics( - Map> idToFieldMetricsMap, - MetricsConfig metricsConfig, - Schema schema, - Map> lowerBounds, - Map> upperBounds) { - idToFieldMetricsMap - .entrySet() - .forEach( - entry -> { - int fieldId = entry.getKey(); - FieldMetrics metrics = entry.getValue(); - MetricsMode metricsMode = MetricsUtil.metricsMode(schema, metricsConfig, fieldId); - - // only check for MetricsModes.None, since we don't truncate float/double values. - if (metricsMode != MetricsModes.None.get()) { - if (!metrics.hasBounds()) { - lowerBounds.remove(fieldId); - upperBounds.remove(fieldId); - } else if (metrics.upperBound() instanceof Float) { - lowerBounds.put(fieldId, Literal.of((Float) metrics.lowerBound())); - upperBounds.put(fieldId, Literal.of((Float) metrics.upperBound())); - } else if (metrics.upperBound() instanceof Double) { - lowerBounds.put(fieldId, Literal.of((Double) metrics.lowerBound())); - upperBounds.put(fieldId, Literal.of((Double) metrics.upperBound())); - } else { - throw new UnsupportedOperationException( - "Expected only float or double column metrics"); - } - } - }); - } - - private static MessageType getParquetTypeWithIds( - ParquetMetadata metadata, NameMapping nameMapping) { - MessageType type = metadata.getFileMetaData().getSchema(); - - if (ParquetSchemaUtil.hasIds(type)) { - return type; - } - - if (nameMapping != null) { - return ParquetSchemaUtil.applyNameMapping(type, nameMapping); - } - - return ParquetSchemaUtil.addFallbackIds(type); - } - - /** - * Returns a list of offsets in ascending order determined by the starting position of the row - * groups. - */ - public static List getSplitOffsets(ParquetMetadata md) { - List splitOffsets = Lists.newArrayListWithExpectedSize(md.getBlocks().size()); - for (BlockMetaData blockMetaData : md.getBlocks()) { - splitOffsets.add(blockMetaData.getStartingPos()); - } - Collections.sort(splitOffsets); - return splitOffsets; - } - - // we allow struct nesting, but not maps or arrays - private static boolean shouldStoreBounds(ColumnChunkMetaData column, Schema schema) { - if (column.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - // stats for INT96 are not reliable - return false; - } - - ColumnPath columnPath = column.getPath(); - Iterator pathIterator = columnPath.iterator(); - Type currentType = schema.asStruct(); - - while (pathIterator.hasNext()) { - if (currentType == null || !currentType.isStructType()) { - return false; - } - String fieldName = pathIterator.next(); - currentType = currentType.asStructType().fieldType(fieldName); - } - - return currentType != null && currentType.isPrimitiveType(); - } - - private static void increment(Map columns, int fieldId, long amount) { - if (columns != null) { - if (columns.containsKey(fieldId)) { - columns.put(fieldId, columns.get(fieldId) + amount); - } else { - columns.put(fieldId, amount); - } - } - } - - @SuppressWarnings("unchecked") - private static void updateMin( - Map> lowerBounds, - int id, - Type type, - Literal min, - MetricsMode metricsMode) { - Literal currentMin = (Literal) lowerBounds.get(id); - if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) { - if (metricsMode == MetricsModes.Full.get()) { - lowerBounds.put(id, min); - } else { - MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; - int truncateLength = truncateMode.length(); - switch (type.typeId()) { - case STRING: - lowerBounds.put( - id, UnicodeUtil.truncateStringMin((Literal) min, truncateLength)); - break; - case FIXED: - case BINARY: - lowerBounds.put( - id, BinaryUtil.truncateBinaryMin((Literal) min, truncateLength)); - break; - default: - lowerBounds.put(id, min); - } - } - } - } - - @SuppressWarnings("unchecked") - private static void updateMax( - Map> upperBounds, - int id, - Type type, - Literal max, - MetricsMode metricsMode) { - Literal currentMax = (Literal) upperBounds.get(id); - if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) { - if (metricsMode == MetricsModes.Full.get()) { - upperBounds.put(id, max); - } else { - MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; - int truncateLength = truncateMode.length(); - switch (type.typeId()) { - case STRING: - Literal truncatedMaxString = - UnicodeUtil.truncateStringMax((Literal) max, truncateLength); - if (truncatedMaxString != null) { - upperBounds.put(id, truncatedMaxString); - } - break; - case FIXED: - case BINARY: - Literal truncatedMaxBinary = - BinaryUtil.truncateBinaryMax((Literal) max, truncateLength); - if (truncatedMaxBinary != null) { - upperBounds.put(id, truncatedMaxBinary); - } - break; - default: - upperBounds.put(id, max); - } - } - } - } - - private static Map toBufferMap(Schema schema, Map> map) { - Map bufferMap = Maps.newHashMap(); - for (Map.Entry> entry : map.entrySet()) { - bufferMap.put( - entry.getKey(), - Conversions.toByteBuffer(schema.findType(entry.getKey()), entry.getValue().value())); - } - return bufferMap; - } - - @SuppressWarnings("deprecation") - public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) { - EncodingStats stats = meta.getEncodingStats(); - if (stats != null) { - return stats.hasNonDictionaryEncodedPages(); - } - - // without EncodingStats, fall back to testing the encoding list - Set encodings = Sets.newHashSet(meta.getEncodings()); - if (encodings.remove(Encoding.PLAIN_DICTIONARY)) { - // if remove returned true, PLAIN_DICTIONARY was present, which means at - // least one page was dictionary encoded and 1.0 encodings are used - - // RLE and BIT_PACKED are only used for repetition or definition levels - encodings.remove(Encoding.RLE); - encodings.remove(Encoding.BIT_PACKED); - - // when empty, no encodings other than dictionary or rep/def levels - return !encodings.isEmpty(); - } else { - // if PLAIN_DICTIONARY wasn't present, then either the column is not - // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used. - // for 2.0, this cannot determine whether a page fell back without - // page encoding stats - return true; - } - } - - public static boolean hasNoBloomFilterPages(ColumnChunkMetaData meta) { - return meta.getBloomFilterOffset() <= 0; - } - - public static Dictionary readDictionary(ColumnDescriptor desc, PageReader pageSource) { - DictionaryPage dictionaryPage = pageSource.readDictionaryPage(); - if (dictionaryPage != null) { - try { - return dictionaryPage.getEncoding().initDictionary(desc, dictionaryPage); - } catch (IOException e) { - throw new ParquetDecodingException("could not decode the dictionary for " + desc, e); - } - } - return null; - } - - public static boolean isIntType(PrimitiveType primitiveType) { - if (primitiveType.getOriginalType() != null) { - switch (primitiveType.getOriginalType()) { - case INT_8: - case INT_16: - case INT_32: - case DATE: - return true; - default: - return false; - } - } - return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32; - } - - /** - * Method to read timestamp (parquet Int96) from bytebuffer. Read 12 bytes in byteBuffer: 8 bytes - * (time of day nanos) + 4 bytes(julianDay) - */ - public static long extractTimestampInt96(ByteBuffer buffer) { - // 8 bytes (time of day nanos) - long timeOfDayNanos = buffer.getLong(); - // 4 bytes(julianDay) - int julianDay = buffer.getInt(); - return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) - + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); - } -} From 1c03376b881f0a085f5c7d9b03a598172dcbac55 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 1 May 2026 20:19:37 -0500 Subject: [PATCH 6/6] Restored deleted file --- .../apache/iceberg/parquet/ParquetUtil.java | 424 ++++++++++++++++++ 1 file changed, 424 insertions(+) create mode 100644 ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java new file mode 100644 index 00000000..7d4f3290 --- /dev/null +++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.MetricsModes; +import org.apache.iceberg.MetricsModes.MetricsMode; +import org.apache.iceberg.MetricsUtil; +import org.apache.iceberg.Schema; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.BinaryUtil; +import org.apache.iceberg.util.UnicodeUtil; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; + +public class ParquetUtil { + // not meant to be instantiated + private ParquetUtil() {} + + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig) { + return fileMetrics(file, metricsConfig, null); + } + + public static Metrics fileMetrics( + InputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file))) { + return footerMetrics(reader.getFooter(), Stream.empty(), metricsConfig, nameMapping); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read footer of file: %s", file); + } + } + + public static Metrics footerMetrics( + ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig) { + return footerMetrics(metadata, fieldMetrics, metricsConfig, null); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public static Metrics footerMetrics( + ParquetMetadata metadata, + Stream> fieldMetrics, + MetricsConfig metricsConfig, + NameMapping nameMapping) { + Preconditions.checkNotNull(fieldMetrics, "fieldMetrics should not be null"); + + long rowCount = 0; + Map columnSizes = Maps.newHashMap(); + Map valueCounts = Maps.newHashMap(); + Map nullValueCounts = Maps.newHashMap(); + Map> lowerBounds = Maps.newHashMap(); + Map> upperBounds = Maps.newHashMap(); + Set missingStats = Sets.newHashSet(); + + // ignore metrics for fields we failed to determine reliable IDs + MessageType parquetTypeWithIds = getParquetTypeWithIds(metadata, nameMapping); + Schema fileSchema = ParquetSchemaUtil.convertAndPrune(parquetTypeWithIds); + + Map> fieldMetricsMap = + fieldMetrics.collect(Collectors.toMap(FieldMetrics::id, Function.identity())); + + List blocks = metadata.getBlocks(); + for (BlockMetaData block : blocks) { + rowCount += block.getRowCount(); + for (ColumnChunkMetaData column : block.getColumns()) { + + Integer fieldId = fileSchema.aliasToId(column.getPath().toDotString()); + if (fieldId == null) { + // fileSchema may contain a subset of columns present in the file + // as we prune columns we could not assign ids + continue; + } + + MetricsMode metricsMode = MetricsUtil.metricsMode(fileSchema, metricsConfig, fieldId); + if (metricsMode == MetricsModes.None.get()) { + continue; + } + + increment(columnSizes, fieldId, column.getTotalSize()); + increment(valueCounts, fieldId, column.getValueCount()); + + Statistics stats = column.getStatistics(); + if (stats != null && !stats.isEmpty()) { + if (stats.isNumNullsSet()) { + increment(nullValueCounts, fieldId, stats.getNumNulls()); + } + + // when there are metrics gathered by Iceberg for a column, we should use those instead + // of the ones from Parquet + if (metricsMode != MetricsModes.Counts.get() && !fieldMetricsMap.containsKey(fieldId)) { + Types.NestedField field = fileSchema.findField(fieldId); + if (field != null && stats.hasNonNullValue() && shouldStoreBounds(column, fileSchema)) { + Literal min = + ParquetConversions.fromParquetPrimitive( + field.type(), column.getPrimitiveType(), stats.genericGetMin()); + updateMin(lowerBounds, fieldId, field.type(), min, metricsMode); + Literal max = + ParquetConversions.fromParquetPrimitive( + field.type(), column.getPrimitiveType(), stats.genericGetMax()); + updateMax(upperBounds, fieldId, field.type(), max, metricsMode); + } + } + } else { + missingStats.add(fieldId); + } + } + } + + // discard accumulated values if any stats were missing + for (Integer fieldId : missingStats) { + nullValueCounts.remove(fieldId); + lowerBounds.remove(fieldId); + upperBounds.remove(fieldId); + } + + updateFromFieldMetrics(fieldMetricsMap, metricsConfig, fileSchema, lowerBounds, upperBounds); + + return new Metrics( + rowCount, + columnSizes, + valueCounts, + nullValueCounts, + MetricsUtil.createNanValueCounts( + fieldMetricsMap.values().stream(), metricsConfig, fileSchema), + toBufferMap(fileSchema, lowerBounds), + toBufferMap(fileSchema, upperBounds)); + } + + private static void updateFromFieldMetrics( + Map> idToFieldMetricsMap, + MetricsConfig metricsConfig, + Schema schema, + Map> lowerBounds, + Map> upperBounds) { + idToFieldMetricsMap + .entrySet() + .forEach( + entry -> { + int fieldId = entry.getKey(); + FieldMetrics metrics = entry.getValue(); + MetricsMode metricsMode = MetricsUtil.metricsMode(schema, metricsConfig, fieldId); + + // only check for MetricsModes.None, since we don't truncate float/double values. + if (metricsMode != MetricsModes.None.get()) { + if (!metrics.hasBounds()) { + lowerBounds.remove(fieldId); + upperBounds.remove(fieldId); + } else if (metrics.upperBound() instanceof Float) { + lowerBounds.put(fieldId, Literal.of((Float) metrics.lowerBound())); + upperBounds.put(fieldId, Literal.of((Float) metrics.upperBound())); + } else if (metrics.upperBound() instanceof Double) { + lowerBounds.put(fieldId, Literal.of((Double) metrics.lowerBound())); + upperBounds.put(fieldId, Literal.of((Double) metrics.upperBound())); + } else { + throw new UnsupportedOperationException( + "Expected only float or double column metrics"); + } + } + }); + } + + private static MessageType getParquetTypeWithIds( + ParquetMetadata metadata, NameMapping nameMapping) { + MessageType type = metadata.getFileMetaData().getSchema(); + + if (ParquetSchemaUtil.hasIds(type)) { + return type; + } + + if (nameMapping != null) { + return ParquetSchemaUtil.applyNameMapping(type, nameMapping); + } + + return ParquetSchemaUtil.addFallbackIds(type); + } + + /** + * Returns a list of offsets in ascending order determined by the starting position of the row + * groups. + */ + public static List getSplitOffsets(ParquetMetadata md) { + List splitOffsets = Lists.newArrayListWithExpectedSize(md.getBlocks().size()); + for (BlockMetaData blockMetaData : md.getBlocks()) { + splitOffsets.add(blockMetaData.getStartingPos()); + } + Collections.sort(splitOffsets); + return splitOffsets; + } + + // we allow struct nesting, but not maps or arrays + private static boolean shouldStoreBounds(ColumnChunkMetaData column, Schema schema) { + if (column.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { + // stats for INT96 are not reliable + return false; + } + + ColumnPath columnPath = column.getPath(); + Iterator pathIterator = columnPath.iterator(); + Type currentType = schema.asStruct(); + + while (pathIterator.hasNext()) { + if (currentType == null || !currentType.isStructType()) { + return false; + } + String fieldName = pathIterator.next(); + currentType = currentType.asStructType().fieldType(fieldName); + } + + return currentType != null && currentType.isPrimitiveType(); + } + + private static void increment(Map columns, int fieldId, long amount) { + if (columns != null) { + if (columns.containsKey(fieldId)) { + columns.put(fieldId, columns.get(fieldId) + amount); + } else { + columns.put(fieldId, amount); + } + } + } + + @SuppressWarnings("unchecked") + private static void updateMin( + Map> lowerBounds, + int id, + Type type, + Literal min, + MetricsMode metricsMode) { + Literal currentMin = (Literal) lowerBounds.get(id); + if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) { + if (metricsMode == MetricsModes.Full.get()) { + lowerBounds.put(id, min); + } else { + MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; + int truncateLength = truncateMode.length(); + switch (type.typeId()) { + case STRING: + lowerBounds.put( + id, UnicodeUtil.truncateStringMin((Literal) min, truncateLength)); + break; + case FIXED: + case BINARY: + lowerBounds.put( + id, BinaryUtil.truncateBinaryMin((Literal) min, truncateLength)); + break; + default: + lowerBounds.put(id, min); + } + } + } + } + + @SuppressWarnings("unchecked") + private static void updateMax( + Map> upperBounds, + int id, + Type type, + Literal max, + MetricsMode metricsMode) { + Literal currentMax = (Literal) upperBounds.get(id); + if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) { + if (metricsMode == MetricsModes.Full.get()) { + upperBounds.put(id, max); + } else { + MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; + int truncateLength = truncateMode.length(); + switch (type.typeId()) { + case STRING: + Literal truncatedMaxString = + UnicodeUtil.truncateStringMax((Literal) max, truncateLength); + if (truncatedMaxString != null) { + upperBounds.put(id, truncatedMaxString); + } + break; + case FIXED: + case BINARY: + Literal truncatedMaxBinary = + BinaryUtil.truncateBinaryMax((Literal) max, truncateLength); + if (truncatedMaxBinary != null) { + upperBounds.put(id, truncatedMaxBinary); + } + break; + default: + upperBounds.put(id, max); + } + } + } + } + + private static Map toBufferMap(Schema schema, Map> map) { + Map bufferMap = Maps.newHashMap(); + for (Map.Entry> entry : map.entrySet()) { + bufferMap.put( + entry.getKey(), + Conversions.toByteBuffer(schema.findType(entry.getKey()), entry.getValue().value())); + } + return bufferMap; + } + + @SuppressWarnings("deprecation") + public static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) { + EncodingStats stats = meta.getEncodingStats(); + if (stats != null) { + return stats.hasNonDictionaryEncodedPages(); + } + + // without EncodingStats, fall back to testing the encoding list + Set encodings = Sets.newHashSet(meta.getEncodings()); + if (encodings.remove(Encoding.PLAIN_DICTIONARY)) { + // if remove returned true, PLAIN_DICTIONARY was present, which means at + // least one page was dictionary encoded and 1.0 encodings are used + + // RLE and BIT_PACKED are only used for repetition or definition levels + encodings.remove(Encoding.RLE); + encodings.remove(Encoding.BIT_PACKED); + + // when empty, no encodings other than dictionary or rep/def levels + return !encodings.isEmpty(); + } else { + // if PLAIN_DICTIONARY wasn't present, then either the column is not + // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used. + // for 2.0, this cannot determine whether a page fell back without + // page encoding stats + return true; + } + } + + public static boolean hasNoBloomFilterPages(ColumnChunkMetaData meta) { + return meta.getBloomFilterOffset() <= 0; + } + + public static Dictionary readDictionary(ColumnDescriptor desc, PageReader pageSource) { + DictionaryPage dictionaryPage = pageSource.readDictionaryPage(); + if (dictionaryPage != null) { + try { + return dictionaryPage.getEncoding().initDictionary(desc, dictionaryPage); + } catch (IOException e) { + throw new ParquetDecodingException("could not decode the dictionary for " + desc, e); + } + } + return null; + } + + public static boolean isIntType(PrimitiveType primitiveType) { + if (primitiveType.getOriginalType() != null) { + switch (primitiveType.getOriginalType()) { + case INT_8: + case INT_16: + case INT_32: + case DATE: + return true; + default: + return false; + } + } + return primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32; + } + + /** + * Method to read timestamp (parquet Int96) from bytebuffer. Read 12 bytes in byteBuffer: 8 bytes + * (time of day nanos) + 4 bytes(julianDay) + */ + public static long extractTimestampInt96(ByteBuffer buffer) { + // 8 bytes (time of day nanos) + long timeOfDayNanos = buffer.getLong(); + // 4 bytes(julianDay) + int julianDay = buffer.getInt(); + return TimeUnit.DAYS.toMicros(julianDay - UNIX_EPOCH_JULIAN) + + TimeUnit.NANOSECONDS.toMicros(timeOfDayNanos); + } +}