diff --git a/.asf.yaml b/.asf.yaml index bcacbb58d4..3eaeedaed6 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -26,7 +26,6 @@ github: - lake-house - iceberg - paimon - - hudi - management-system - self-optimizing - flink diff --git a/.github/workflows/core-hadoop2-ci.yml b/.github/workflows/core-hadoop2-ci.yml index ad891366c5..4d75245506 100644 --- a/.github/workflows/core-hadoop2-ci.yml +++ b/.github/workflows/core-hadoop2-ci.yml @@ -25,7 +25,6 @@ on: - "amoro-optimizer/**" - "amoro-format-iceberg/**" - "amoro-format-paimon/**" - - "amoro-format-hudi/**" - "amoro-format-mixed/amoro-mixed-flink/**" - "amoro-format-mixed/amoro-mixed-hive/**" - "amoro-format-mixed/amoro-mixed-spark/**" diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml index e93a32aa4f..50c2b068ac 100644 --- a/.github/workflows/core-hadoop3-ci.yml +++ b/.github/workflows/core-hadoop3-ci.yml @@ -25,7 +25,6 @@ on: - "amoro-optimizer/**" - "amoro-format-iceberg/**" - "amoro-format-paimon/**" - - "amoro-format-hudi/**" - "amoro-format-mixed/amoro-mixed-flink/**" - "amoro-format-mixed/amoro-mixed-hive/**" - "amoro-format-mixed/amoro-mixed-spark/**" diff --git a/README.md b/README.md index e07afe7b0c..ba491057e1 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,6 @@ Amoro contains modules as below: - `amoro-web` is the dashboard frontend for ams - `amoro-optimizer` provides default optimizer implementation - `amoro-format-iceberg` contains integration of Apache Iceberg format -- `amoro-format-hudi` contains integration of Apache Hudi format - `amoro-format-paimon` contains integration of Apache Paimon format - `amoro-format-mixed` provides Mixed format implementation - `amoro-mixed-hive` integrates with Apache Hive and implements Mixed Hive format @@ -131,7 +130,6 @@ Amoro is built using Maven with JDK 8, 11 and 17(required for `amoro-format-mixe * Build all modules: `./mvnw clean package -DskipTests -Ptoolchain,build-mixed-format-trino`, besides you need config `toolchains.xml` in `${user.home}/.m2/` dir with content below. * Build a distribution package with all formats integrated: `./mvnw clean package -Psupport-all-formats` * Build a distribution package with Apache Paimon format: `./mvnw clean package -Psupport-paimon-format` - * Build a distribution package with Apache Hudi format: `./mvnw clean package -Psupport-hudi-format` ``` diff --git a/amoro-ams/pom.xml b/amoro-ams/pom.xml index 34f5591c73..f05b4f0557 100644 --- a/amoro-ams/pom.xml +++ b/amoro-ams/pom.xml @@ -243,7 +243,6 @@ mysql mysql-connector-java - provided @@ -359,14 +358,6 @@ - - - org.apache.amoro - amoro-format-mixed-spark-${spark.major.version}_${scala.binary.version} - ${project.version} - runtime - - org.apache.amoro amoro-optimizer-standalone @@ -410,7 +401,6 @@ org.apache.amoro amoro-format-paimon ${project.version} - test @@ -462,12 +452,14 @@ org.testcontainers testcontainers + 1.20.4 test org.testcontainers junit-jupiter + 1.20.4 test @@ -481,12 +473,14 @@ org.testcontainers k3s + 1.20.4 test org.testcontainers mysql + 1.20.4 test @@ -517,7 +511,7 @@ com.google.guava guava - ${guava-hive.version} + ${guava.version} @@ -616,15 +610,6 @@ - - support-hudi-format - - - org.apache.amoro - amoro-format-hudi - - - support-all-formats @@ -632,10 +617,6 @@ org.apache.amoro amoro-format-paimon - - org.apache.amoro - amoro-format-hudi - diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index 9ac6fb08e2..ccdd864474 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -518,6 +518,13 @@ public class AmoroManagementConf { .withDescription( "Allow the table to break the quota limit when the resource is sufficient."); + public static final ConfigOption OPTIMIZING_QUEUE_WARMUP_THREAD_COUNT = + ConfigOptions.key("self-optimizing.queue-warmup.thread-count") + .intType() + .defaultValue(10) + .withDescription( + "The number of threads used to warm up optimizing queues during AMS startup."); + /** @deprecated Use {@link #PROCESS_HISTORY_DATA_KEEP_TIME} instead. */ @Deprecated public static final ConfigOption PROCESS_HISTORY_DATA_KEEP_DAYS = diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java index 1e78520f07..d4e94e2234 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java @@ -81,6 +81,7 @@ public static void validateConfig(Configurations configurations) { validateThreadCount(configurations, AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT); validateThreadCount(configurations, AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT); + validateThreadCount(configurations, AmoroManagementConf.OPTIMIZING_QUEUE_WARMUP_THREAD_COUNT); if (configurations.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) { validateThreadCount(configurations, AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 0be364523a..ee1958c614 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -283,7 +283,8 @@ public void startOptimizingService() throws Exception { optimizerManager, tableService, bucketAssignStore, - haContainer); + haContainer, + processFactories); LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index efc1c375b7..a44bb654dd 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -33,6 +33,7 @@ import org.apache.amoro.exception.IllegalTaskStateException; import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.exception.PluginRetryAuthException; +import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.resource.Resource; import org.apache.amoro.resource.ResourceContainer; import org.apache.amoro.resource.ResourceGroup; @@ -41,14 +42,18 @@ import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo; import org.apache.amoro.server.ha.HighAvailabilityContainer; import org.apache.amoro.server.manager.AbstractOptimizerContainer; +import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingQueue; +import org.apache.amoro.server.optimizing.OptimizingQueueWarmupMetrics; +import org.apache.amoro.server.optimizing.OptimizingQueueWarmupService; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.persistence.StatedPersistentBase; import org.apache.amoro.server.persistence.mapper.OptimizerMapper; import org.apache.amoro.server.persistence.mapper.ResourceMapper; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.process.ProcessFactoryRouter; import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.resource.Containers; import org.apache.amoro.server.resource.OptimizerInstance; @@ -58,6 +63,7 @@ import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -83,6 +89,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -121,9 +128,11 @@ public class DefaultOptimizingService extends StatedPersistentBase private final TableService tableService; private final RuntimeHandlerChain tableHandlerChain; private final ExecutorService planExecutor; + private final OptimizingQueueWarmupService queueWarmupService; private final BucketAssignStore bucketAssignStore; private final HighAvailabilityContainer haContainer; private final boolean isMasterSlaveMode; + private final ProcessFactoryRouter router; public DefaultOptimizingService( Configurations serviceConfig, @@ -131,7 +140,29 @@ public DefaultOptimizingService( OptimizerManager optimizerManager, TableService tableService, BucketAssignStore bucketAssignStore, - HighAvailabilityContainer haContainer) { + HighAvailabilityContainer haContainer, + List processFactories) { + this( + serviceConfig, + catalogManager, + optimizerManager, + tableService, + bucketAssignStore, + haContainer, + processFactories, + defaultWarmupServiceFactory(serviceConfig)); + } + + @VisibleForTesting + DefaultOptimizingService( + Configurations serviceConfig, + CatalogManager catalogManager, + OptimizerManager optimizerManager, + TableService tableService, + BucketAssignStore bucketAssignStore, + HighAvailabilityContainer haContainer, + List processFactories, + WarmupServiceFactory warmupServiceFactory) { this.optimizerTouchTimeout = serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT); this.taskAckTimeout = @@ -159,6 +190,9 @@ public DefaultOptimizingService( this.isMasterSlaveMode = haContainer != null && serviceConfig.getBoolean(AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE); + this.router = + new ProcessFactoryRouter( + Optional.ofNullable(processFactories).orElseGet(Collections::emptyList)); this.tableHandlerChain = new TableRuntimeHandlerImpl(); this.planExecutor = Executors.newCachedThreadPool( @@ -166,31 +200,75 @@ public DefaultOptimizingService( .setNameFormat("plan-executor-thread-%d") .setDaemon(true) .build()); + OptimizingQueueWarmupMetrics warmupMetrics = + new OptimizingQueueWarmupMetrics(MetricManager.getInstance().getGlobalRegistry()); + warmupMetrics.register(); + this.queueWarmupService = + warmupServiceFactory.create( + this::getOptionalQueueByGroup, this::runtimeStillActiveForWarmup, warmupMetrics); + LOG.info( + "Optimizing router initialised: delegates={} formats={}", + router.delegates().stream().map(ProcessFactory::name).collect(Collectors.toList()), + router.supportedFormats()); + } + + @VisibleForTesting + interface WarmupServiceFactory { + OptimizingQueueWarmupService create( + Function> queueSupplier, + Predicate runtimeStillActive, + OptimizingQueueWarmupMetrics metrics); + } + + private static WarmupServiceFactory defaultWarmupServiceFactory(Configurations serviceConfig) { + return (queueSupplier, runtimeStillActive, metrics) -> + new OptimizingQueueWarmupService( + serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_QUEUE_WARMUP_THREAD_COUNT), + queueSupplier, + runtimeStillActive, + metrics); } public RuntimeHandlerChain getTableRuntimeHandler() { return tableHandlerChain; } + private boolean runtimeStillActiveForWarmup(DefaultTableRuntime runtime) { + if (!tableService.isStarted()) { + return getOptionalQueueByGroup(runtime.getGroupName()).isPresent(); + } + TableRuntime current = tableService.getRuntime(runtime.getTableIdentifier().getId()); + return current == runtime && getOptionalQueueByGroup(runtime.getGroupName()).isPresent(); + } + + @VisibleForTesting + boolean awaitQueueWarmupForTest(long timeout, TimeUnit unit) throws InterruptedException { + return queueWarmupService.awaitCompletionForTest(timeout, unit); + } + private void loadOptimizingQueues(List tableRuntimeList) { List optimizerGroups = getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups); List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll); Map> groupToTableRuntimes = tableRuntimeList.stream().collect(Collectors.groupingBy(TableRuntime::getGroupName)); + List warmupRuntimes = new ArrayList<>(); optimizerGroups.forEach( group -> { String groupName = group.getName(); - List tableRuntimes = groupToTableRuntimes.remove(groupName); + List tableRuntimes = + Optional.ofNullable(groupToTableRuntimes.remove(groupName)).orElseGet(ArrayList::new); OptimizingQueue optimizingQueue = new OptimizingQueue( catalogManager, group, this, planExecutor, - Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new), - maxPlanningParallelism); + Collections.emptyList(), + maxPlanningParallelism, + router); optimizingQueueByGroup.put(groupName, optimizingQueue); + warmupRuntimes.addAll(tableRuntimes); optimizerGroupKeeper.keepInTouch(groupName, 1); }); optimizers.forEach(optimizer -> registerOptimizer(optimizer, false)); @@ -216,6 +294,7 @@ private void loadOptimizingQueues(List tableRuntimeList) { tr.completeEmptyProcess(); }); }); + queueWarmupService.warmupTables(warmupRuntimes); } private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) { @@ -393,6 +472,12 @@ public void deleteOptimizer(String group, String resourceId) { public void createResourceGroup(ResourceGroup resourceGroup) { doAsTransaction( () -> { + String groupName = resourceGroup.getName(); + OptimizingQueue existingQueue = optimizingQueueByGroup.get(groupName); + if (existingQueue != null) { + existingQueue.updateOptimizerGroup(resourceGroup); + return; + } OptimizingQueue optimizingQueue = new OptimizingQueue( catalogManager, @@ -400,8 +485,8 @@ public void createResourceGroup(ResourceGroup resourceGroup) { this, planExecutor, new ArrayList<>(), - maxPlanningParallelism); - String groupName = resourceGroup.getName(); + maxPlanningParallelism, + router); optimizingQueueByGroup.put(groupName, optimizingQueue); optimizerGroupKeeper.keepInTouch(groupName, 1); }); @@ -409,7 +494,9 @@ public void createResourceGroup(ResourceGroup resourceGroup) { public void deleteResourceGroup(String groupName) { OptimizingQueue optimizingQueue = optimizingQueueByGroup.remove(groupName); - optimizingQueue.dispose(); + if (optimizingQueue != null) { + optimizingQueue.dispose(); + } } public void updateResourceGroup(ResourceGroup resourceGroup) { @@ -418,11 +505,22 @@ public void updateResourceGroup(ResourceGroup resourceGroup) { } public void dispose() { + queueWarmupService.close(); planExecutor.shutdown(); // shutdown sync group first, stop syncing group optimizingConfigWatcher.dispose(); // dispose all queues optimizingQueueByGroup.values().forEach(OptimizingQueue::dispose); + router + .delegates() + .forEach( + factory -> { + try { + factory.close(); + } catch (Exception e) { + LOG.warn("Error closing ProcessFactory '{}': {}", factory.name(), e.getMessage()); + } + }); optimizerKeeper.dispose(); optimizerGroupKeeper.dispose(); tableHandlerChain.dispose(); @@ -444,7 +542,8 @@ private class TableRuntimeHandlerImpl extends RuntimeHandlerChain { @Override public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) { DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime; - if (!defaultTableRuntime.getOptimizingStatus().isProcessing()) { + OptimizingStatus status = defaultTableRuntime.getOptimizingStatus(); + if (status == OptimizingStatus.PENDING || status == OptimizingStatus.IDLE) { getOptionalQueueByGroup(defaultTableRuntime.getGroupName()) .ifPresent(q -> q.refreshTable(defaultTableRuntime)); } @@ -459,7 +558,7 @@ public void handleConfigChanged(TableRuntime runtime, TableConfiguration origina getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime)); // If the new group doesn't exist, close the process to avoid the table in limbo(PENDING) // status. - if (newQueue.isEmpty()) { + if (!newQueue.isPresent()) { LOG.warn( "Cannot find the resource group: {}, try to release optimizing process of table {} directly", tableRuntime.getGroupName(), @@ -502,7 +601,9 @@ protected void initHandler(List tableRuntimeList) { } @Override - protected void doDispose() {} + protected void doDispose() { + queueWarmupService.close(); + } } private class OptimizerKeepingTask implements Delayed { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java index 70f01ee02b..4d47e1c9d5 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; @@ -109,13 +110,12 @@ private static Enforcer createEnforcer() { } private static List> readPolicies(String resource) { - return readResource(resource) - .lines() + return Arrays.stream(readResource(resource).split("\\R")) .map(String::trim) .filter(line -> !line.isEmpty() && !line.startsWith("#")) .map( line -> - List.of(line.split("\\s*,\\s*")).stream() + Arrays.stream(line.split("\\s*,\\s*")) .map(String::trim) .collect(Collectors.toList())) .collect(Collectors.toList()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java index 31c8240cae..ebb5db7a55 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java @@ -43,11 +43,9 @@ public class CatalogBuilder { private static final Map> formatSupportedMatrix = ImmutableMap.of( CATALOG_TYPE_HADOOP, - Sets.newHashSet( - TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON, TableFormat.HUDI), + Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON), CATALOG_TYPE_FILESYSTEM, - Sets.newHashSet( - TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON, TableFormat.HUDI), + Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON), CATALOG_TYPE_GLUE, Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG), CATALOG_TYPE_REST, @@ -59,8 +57,7 @@ public class CatalogBuilder { TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, - TableFormat.PAIMON, - TableFormat.HUDI), + TableFormat.PAIMON), CATALOG_TYPE_AMS, Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG)); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java index 2246cf68b7..80921221d6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java @@ -52,6 +52,7 @@ import org.apache.amoro.server.dashboard.controller.OptimizerGroupController; import org.apache.amoro.server.dashboard.controller.OverviewController; import org.apache.amoro.server.dashboard.controller.PlatformFileInfoController; +import org.apache.amoro.server.dashboard.controller.ProcessController; import org.apache.amoro.server.dashboard.controller.SettingController; import org.apache.amoro.server.dashboard.controller.TableController; import org.apache.amoro.server.dashboard.controller.TerminalController; @@ -91,6 +92,7 @@ public class DashboardServer { private final OptimizerController optimizerController; private final PlatformFileInfoController platformFileInfoController; private final SettingController settingController; + private final ProcessController processController; private final TableController tableController; private final TerminalController terminalController; private final VersionController versionController; @@ -122,6 +124,7 @@ public DashboardServer( this.settingController = new SettingController(serviceConfig, optimizerManager); ServerTableDescriptor tableDescriptor = new ServerTableDescriptor(catalogManager, tableManager, serviceConfig); + this.processController = new ProcessController(tableManager); this.tableController = new TableController(catalogManager, tableManager, tableDescriptor, serviceConfig); this.terminalController = new TerminalController(terminalManager); @@ -298,6 +301,9 @@ private EndpointGroup apiGroup() { post( "/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/cancel", tableController::cancelOptimizingProcess); + get( + "/catalogs/{catalog}/dbs/{db}/tables/{table}/processes", + processController::getTableProcesses); }); get("/upgrade/properties", tableController::getUpgradeHiveTableProperties); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java index 7f3a49d0c5..d7e2bcd3ac 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java @@ -241,8 +241,13 @@ private Long snapshotIdOfTableRef(Table table, String ref) { } @Override - public List getSnapshots( - AmoroTable amoroTable, String ref, OperationType operationType) { + public Pair, Long> getSnapshots( + AmoroTable amoroTable, + String ref, + OperationType operationType, + int limit, + int offset, + String lastSnapshot) { MixedTable mixedTable = getTable(amoroTable); List snapshotsOfTables = new ArrayList<>(); List> tableAndSnapshotIdList = new ArrayList<>(); @@ -262,10 +267,50 @@ public List getSnapshots( } tableAndSnapshotIdList.forEach( tableAndSnapshotId -> collectSnapshots(snapshotsOfTables, tableAndSnapshotId)); - return snapshotsOfTables.stream() - .filter(s -> validOperationType(s, operationType)) - .sorted((o1, o2) -> Long.compare(o2.getCommitTime(), o1.getCommitTime())) - .collect(Collectors.toList()); + List amoroSnapshotsOfTables = + snapshotsOfTables.stream() + .filter(s -> validOperationType(s, operationType)) + .sorted((o1, o2) -> Long.compare(o2.getCommitTime(), o1.getCommitTime())) + .collect(Collectors.toList()); + return paginate(amoroSnapshotsOfTables, limit, offset, lastSnapshot); + } + + /** + * Apply pagination against a pre-filtered, pre-sorted snapshot list. + * + *

Semantics mirror {@code PaimonTableDescriptor.getSnapshots}: if {@code lastSnapshot} is a + * non-empty cursor, skip forward until the matching snapshotId is found and return the next + * {@code limit} items; if the cursor is missing (snapshot rolled off), fall back to offset-based + * behaviour. Otherwise apply {@code offset} + {@code limit} directly. The total count always + * reflects the full filtered list. + */ + static Pair, Long> paginate( + List amoroSnapshotsOfTables, + int limit, + int offset, + String lastSnapshot) { + int total = amoroSnapshotsOfTables.size(); + int safeLimit = Math.max(limit, 0); + int safeOffset = Math.max(offset, 0); + int startIdx = resolveStartIndex(amoroSnapshotsOfTables, lastSnapshot, safeOffset); + if (startIdx >= total) { + return Pair.of(Collections.emptyList(), (long) total); + } + int endIdx = Math.min(startIdx + safeLimit, total); + return Pair.of(new ArrayList<>(amoroSnapshotsOfTables.subList(startIdx, endIdx)), (long) total); + } + + private static int resolveStartIndex( + List snapshots, String lastSnapshot, int offset) { + if (lastSnapshot != null && !lastSnapshot.isEmpty()) { + for (int i = 0; i < snapshots.size(); i++) { + if (lastSnapshot.equals(snapshots.get(i).getSnapshotId())) { + return i + 1; + } + } + // Cursor not found (snapshot rolled off) — fall back to offset. + } + return offset; } private boolean validOperationType(AmoroSnapshotsOfTable snapshot, OperationType operationType) { @@ -655,7 +700,12 @@ public List getTableConsumerInfos(AmoroTable amoroTable) { @Override public Pair, Integer> getOptimizingProcessesInfo( - AmoroTable amoroTable, String type, ProcessStatus status, int limit, int offset) { + AmoroTable amoroTable, + String type, + ProcessStatus status, + int limit, + int offset, + String lastSnapshot) { TableIdentifier tableIdentifier = amoroTable.id(); ServerTableIdentifier identifier = getAs( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java index 4e2b1aef8c..6347d3fa13 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java @@ -77,11 +77,17 @@ public ServerTableMeta getTableDetail(TableIdentifier tableIdentifier) { return formatTableDescriptor.getTableDetail(amoroTable); } - public List getSnapshots( - TableIdentifier tableIdentifier, String ref, OperationType operationType) { + public Pair, Long> getSnapshots( + TableIdentifier tableIdentifier, + String ref, + OperationType operationType, + int limit, + int offset, + String lastSnapshot) { AmoroTable amoroTable = loadTable(tableIdentifier); FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format()); - return formatTableDescriptor.getSnapshots(amoroTable, ref, operationType); + return formatTableDescriptor.getSnapshots( + amoroTable, ref, operationType, limit, offset, lastSnapshot); } public List getSnapshotDetail( @@ -129,11 +135,16 @@ public List getTableConsumersInfos(TableIdentifier tableIdentifier } public Pair, Integer> getOptimizingProcessesInfo( - TableIdentifier tableIdentifier, String type, ProcessStatus status, int limit, int offset) { + TableIdentifier tableIdentifier, + String type, + ProcessStatus status, + int limit, + int offset, + String lastSnapshot) { AmoroTable amoroTable = loadTable(tableIdentifier); FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format()); return formatTableDescriptor.getOptimizingProcessesInfo( - amoroTable, type, status, limit, offset); + amoroTable, type, status, limit, offset, lastSnapshot); } public List getOptimizingProcessTaskInfos( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java index d688f9e29e..5380d50813 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java @@ -18,7 +18,6 @@ package org.apache.amoro.server.dashboard.controller; -import static org.apache.amoro.TableFormat.HUDI; import static org.apache.amoro.TableFormat.ICEBERG; import static org.apache.amoro.TableFormat.MIXED_HIVE; import static org.apache.amoro.TableFormat.MIXED_ICEBERG; @@ -139,8 +138,6 @@ public class CatalogController { CatalogDescriptor.of(CATALOG_TYPE_HIVE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_HIVE)); VALIDATE_CATALOGS.add( CatalogDescriptor.of(CATALOG_TYPE_HIVE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, PAIMON)); - VALIDATE_CATALOGS.add( - CatalogDescriptor.of(CATALOG_TYPE_HIVE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, HUDI)); VALIDATE_CATALOGS.add( CatalogDescriptor.of( CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_ICEBERG)); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/ProcessController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/ProcessController.java new file mode 100644 index 0000000000..1dbcc06f61 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/ProcessController.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.dashboard.controller; + +import com.github.pagehelper.Page; +import com.github.pagehelper.PageHelper; +import com.github.pagehelper.PageInfo; +import io.javalin.http.Context; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.server.dashboard.response.OkResponse; +import org.apache.amoro.server.dashboard.response.PageResult; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.process.TableProcessMeta; +import org.apache.amoro.server.table.TableManager; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.table.TableIdentifier; +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; + +public class ProcessController extends PersistentBase { + + private final TableManager tableManager; + + public ProcessController(TableManager tableManager) { + this.tableManager = tableManager; + } + + public void getTableProcesses(Context ctx) { + String catalog = ctx.pathParam("catalog"); + String db = ctx.pathParam("db"); + String table = ctx.pathParam("table"); + String processType = ctx.queryParam("type"); + String status = ctx.queryParam("status"); + Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1); + Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); + + if (StringUtils.isBlank(processType)) { + processType = null; + } + + Preconditions.checkArgument(page >= 1, "page[%s] must >= 1", page); + Preconditions.checkArgument(pageSize > 0, "pageSize[%s] must > 0", pageSize); + + TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table); + ServerTableIdentifier serverTableIdentifier = + tableManager.getServerTableIdentifier(tableIdentifier.buildTableIdentifier()); + if (serverTableIdentifier == null) { + ctx.json(OkResponse.of(PageResult.of(Collections.emptyList(), 0))); + return; + } + + long tableId = serverTableIdentifier.getId(); + ProcessStatus processStatus = + StringUtils.isBlank(status) ? null : ProcessStatus.valueOf(status); + + int total = 0; + List processMetaList = Collections.emptyList(); + final String finalProcessType = processType; + try (Page ignored = PageHelper.startPage(page, pageSize, true)) { + processMetaList = + getAs( + TableProcessMapper.class, + mapper -> mapper.listProcessMeta(tableId, finalProcessType, processStatus)); + PageInfo pageInfo = new PageInfo<>(processMetaList); + total = (int) pageInfo.getTotal(); + } + + ctx.json(OkResponse.of(PageResult.of(processMetaList, total))); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java index 8b4acba44a..82e594af92 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java @@ -335,6 +335,7 @@ public void getOptimizingProcesses(Context ctx) { String status = ctx.queryParam("status"); Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1); Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20); + String lastSnapshot = ctx.queryParam("lastSnapshot"); int offset = (page - 1) * pageSize; int limit = pageSize; @@ -346,7 +347,12 @@ public void getOptimizingProcesses(Context ctx) { StringUtils.isBlank(status) ? null : ProcessStatus.valueOf(status); Pair, Integer> optimizingProcessesInfo = tableDescriptor.getOptimizingProcessesInfo( - tableIdentifier.buildTableIdentifier(), type, processStatus, limit, offset); + tableIdentifier.buildTableIdentifier(), + type, + processStatus, + limit, + offset, + lastSnapshot); List result = optimizingProcessesInfo.getLeft(); int total = optimizingProcessesInfo.getRight(); @@ -407,16 +413,20 @@ public void getTableSnapshots(Context ctx) { String operation = ctx.queryParamAsClass("operation", String.class) .getOrDefault(OperationType.ALL.displayName()); + String lastSnapshot = ctx.queryParam("lastSnapshot"); OperationType operationType = OperationType.of(operation); - - List snapshotsOfTables = + int offset = (page - 1) * pageSize; + int limit = pageSize; + Pair, Long> snapshotsOfTables = tableDescriptor.getSnapshots( TableIdentifier.of(catalog, database, tableName).buildTableIdentifier(), ref, - operationType); - int offset = (page - 1) * pageSize; + operationType, + limit, + offset, + lastSnapshot); PageResult pageResult = - PageResult.of(snapshotsOfTables, offset, pageSize); + PageResult.of(snapshotsOfTables.getLeft(), snapshotsOfTables.getRight().intValue()); ctx.json(OkResponse.of(pageResult)); } @@ -540,8 +550,6 @@ public void getTableList(Context ctx) { return TableMeta.TableType.PAIMON.toString(); } else if (format.equals(TableFormat.ICEBERG)) { return TableMeta.TableType.ICEBERG.toString(); - } else if (format.equals(TableFormat.HUDI)) { - return TableMeta.TableType.HUDI.toString(); } else { return format.toString(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java index dead0be96b..b70fe01bd8 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java @@ -66,9 +66,7 @@ public enum TableType { ARCTIC("arctic"), HIVE("hive"), ICEBERG("iceberg"), - PAIMON("paimon"), - - HUDI("hudi"); + PAIMON("paimon"); private final String name; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java index 48282f2a9e..bb34a6c81b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java @@ -22,6 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -54,7 +55,7 @@ public void registerAndElect() throws Exception {} @Override public List getAliveNodes() { - return List.of(); + return Collections.emptyList(); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java index c05a853cce..d23decf3ee 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java @@ -32,13 +32,18 @@ import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.optimizing.RewriteStageTask; +import org.apache.amoro.optimizing.TableOptimizingCommitter; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.table.MixedTable; import org.apache.amoro.utils.ContentFiles; +import org.apache.amoro.utils.MixedDataFiles; import org.apache.amoro.utils.MixedTableUtil; +import org.apache.amoro.utils.TablePropertyUtil; import org.apache.commons.collections.CollectionUtils; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.StructLike; import org.apache.iceberg.exceptions.ValidationException; @@ -51,15 +56,17 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; -public class KeyedTableCommit extends UnKeyedTableCommit { +public class KeyedTableCommit extends UnKeyedTableCommit implements TableOptimizingCommitter { private static final Logger LOG = LoggerFactory.getLogger(KeyedTableCommit.class); protected MixedTable table; - protected Collection> tasks; + protected Collection tasks; protected Long fromSnapshotId; @@ -69,16 +76,36 @@ public class KeyedTableCommit extends UnKeyedTableCommit { public KeyedTableCommit( MixedTable table, - Collection> tasks, + Collection tasks, Long fromSnapshotId, - StructLikeMap fromSequenceOfPartitions, - StructLikeMap toSequenceOfPartitions) { + Map fromSequence, + Map toSequence) { super(fromSnapshotId, table, tasks); this.table = table; this.tasks = tasks; this.fromSnapshotId = fromSnapshotId == null ? Constants.INVALID_SNAPSHOT_ID : fromSnapshotId; - this.fromSequenceOfPartitions = fromSequenceOfPartitions; - this.toSequenceOfPartitions = toSequenceOfPartitions; + this.fromSequenceOfPartitions = convertPartitionSequence(table, fromSequence); + this.toSequenceOfPartitions = convertPartitionSequence(table, toSequence); + } + + private static StructLikeMap convertPartitionSequence( + MixedTable table, Map partitionSequence) { + PartitionSpec spec = table == null ? null : table.spec(); + PartitionSpec safeSpec = spec == null ? PartitionSpec.unpartitioned() : spec; + StructLikeMap results = StructLikeMap.create(safeSpec.partitionType()); + if (partitionSequence == null || partitionSequence.isEmpty()) { + return results; + } + partitionSequence.forEach( + (partition, sequence) -> { + if (safeSpec.isUnpartitioned()) { + results.put(TablePropertyUtil.EMPTY_STRUCT, sequence); + } else { + StructLike partitionData = MixedDataFiles.data(safeSpec, partition); + results.put(partitionData, sequence); + } + }); + return results; } @Override @@ -94,6 +121,19 @@ public void commit() throws OptimizingCommitException { tasks.size(), fromSnapshotId); + // Filter tasks with null output to avoid deleting input files of failed tasks + List successTasks = + tasks.stream().filter(task -> task.getOutput() != null).collect(Collectors.toList()); + + if (successTasks.isEmpty()) { + LOG.info("No tasks with output to commit for table {}", table.id()); + return; + } + + // Rebuild toSequenceOfPartitions from only success tasks to avoid consuming + // change files that belong to failed tasks + rebuildToSequenceOfPartitions(successTasks); + // In the scene of moving files to hive, the files will be renamed List hiveNewDataFiles = moveFile2HiveIfNeed(); @@ -105,8 +145,11 @@ public void commit() throws OptimizingCommitException { StructLikeMap partitionOptimizedSequence = MixedTableUtil.readOptimizedSequence(table.asKeyedTable()); - for (TaskRuntime taskRuntime : tasks) { - RewriteFilesInput input = taskRuntime.getTaskDescriptor().getInput(); + for (RewriteStageTask task : successTasks) { + RewriteFilesInput input = task.getInput(); + if (input == null) { + continue; + } StructLike partition = partition(input); // Check if the partition version has expired @@ -115,6 +158,7 @@ public void commit() throws OptimizingCommitException { toSequenceOfPartitions.remove(partition); continue; } + // Only base data file need to remove if (input.rewrittenDataFiles() != null) { Arrays.stream(input.rewrittenDataFiles()) @@ -131,7 +175,7 @@ public void commit() throws OptimizingCommitException { .forEach(removedDeleteFiles::add); } - RewriteFilesOutput output = taskRuntime.getTaskDescriptor().getOutput(); + RewriteFilesOutput output = task.getOutput(); if (CollectionUtils.isNotEmpty(hiveNewDataFiles)) { addedDataFiles.addAll(hiveNewDataFiles); } else if (output.getDataFiles() != null) { @@ -219,4 +263,25 @@ private boolean fileInPartitionNeedSkip( private StructLike partition(RewriteFilesInput input) { return input.allFiles()[0].partition(); } + + private void rebuildToSequenceOfPartitions(List successTasks) { + toSequenceOfPartitions.clear(); + for (RewriteStageTask task : successTasks) { + RewriteFilesInput input = task.getInput(); + if (input == null) { + continue; + } + StructLike partition = partition(input); + for (ContentFile file : input.allFiles()) { + if (ContentFiles.isDeleteFile(file)) { + continue; + } + DataFileType type = ((PrimaryKeyedFile) ContentFiles.asDataFile(file)).type(); + if (type == DataFileType.INSERT_FILE || type == DataFileType.EQ_DELETE_FILE) { + long seq = file.dataSequenceNumber(); + toSequenceOfPartitions.merge(partition, seq, Math::max); + } + } + } + } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/LegacyExecutorFactoryDefaults.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/LegacyExecutorFactoryDefaults.java new file mode 100644 index 0000000000..652c737992 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/LegacyExecutorFactoryDefaults.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing; + +import org.apache.amoro.TableFormat; +import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory; +import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory; +import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory; +import org.apache.amoro.optimizing.TaskProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Resolves the default {@link TaskProperties#TASK_EXECUTOR_FACTORY_IMPL} class name for task rows + * persisted before the multi-format refactor (C3), where the property did not yet exist. + * + *

Pre-0.9 Iceberg rows restored at AMS startup pass through {@link + * org.apache.amoro.server.persistence.converter.TaskDescriptorTypeConverter}, which correctly falls + * back to {@link org.apache.amoro.optimizing.RewriteStageTask} but does not populate the + * missing executor-factory key on the descriptor's {@code properties} map. Once the task reaches + * {@code OptimizerExecutor.executeTask(...)} it reads that key to instantiate the executor factory + * via reflection and NPEs on the missing entry. + * + *

This util is the compensation hook: {@link OptimizingQueue}'s DB-restore path calls it per + * task runtime and, when non-null, injects the returned class name into the descriptor properties. + * For formats whose tasks always carry their own factory impl (e.g. Paimon) and for unknown + * formats, we return {@code null} and emit a WARN so operators see the skipped row without taking + * the process down. + * + *

Kept as a pure static util (no state) so it is trivially testable and can never diverge from + * {@link TaskDescriptorTypeConverter}'s routing table — both must stay consistent with the default + * factory impl each format emits at plan time. + */ +public final class LegacyExecutorFactoryDefaults { + + private static final Logger LOG = LoggerFactory.getLogger(LegacyExecutorFactoryDefaults.class); + + public static final String ICEBERG_FACTORY_IMPL = IcebergRewriteExecutorFactory.class.getName(); + public static final String MIXED_ICEBERG_FACTORY_IMPL = + MixedIcebergRewriteExecutorFactory.class.getName(); + public static final String MIXED_HIVE_FACTORY_IMPL = + MixedHiveRewriteExecutorFactory.class.getName(); + + private LegacyExecutorFactoryDefaults() {} + + /** + * Return the default {@code task-executor-factory-impl} class name for a legacy task row of the + * given table format, or {@code null} if the format has no meaningful default (Paimon or + * unknown). + * + *

Callers should treat {@code null} as "skip — the task descriptor must already carry the key, + * or the row is unrecoverable". A WARN is logged for null cases so operators can spot stale rows + * during restore. + */ + public static String resolveDefaultExecutorFactoryImpl(TableFormat format) { + if (format == null) { + LOG.warn( + "Cannot resolve default {} for legacy task row: table format is null", + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL); + return null; + } + if (TableFormat.ICEBERG.equals(format)) { + return ICEBERG_FACTORY_IMPL; + } + if (TableFormat.MIXED_ICEBERG.equals(format)) { + return MIXED_ICEBERG_FACTORY_IMPL; + } + if (TableFormat.MIXED_HIVE.equals(format)) { + return MIXED_HIVE_FACTORY_IMPL; + } + if (TableFormat.PAIMON.equals(format)) { + // Paimon tasks always carry their own factory impl in properties (set by the Paimon planner), + // so a missing key here indicates a corrupted row rather than a pre-refactor legacy row. + LOG.warn( + "Skipping default {} injection for PAIMON: Paimon tasks must carry the key at plan time", + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL); + return null; + } + LOG.warn( + "Skipping default {} injection: unknown table format {}", + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, + format); + return null; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 52eb46b4cc..5713bdc913 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -21,18 +21,23 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.OptimizerProperties; import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; import org.apache.amoro.api.BlockableOperation; import org.apache.amoro.api.OptimizingTaskId; import org.apache.amoro.api.OptimizingTaskResult; import org.apache.amoro.exception.OptimizingClosedException; import org.apache.amoro.exception.PersistenceException; import org.apache.amoro.exception.TaskNotFoundException; +import org.apache.amoro.optimizing.BaseOptimizingInput; import org.apache.amoro.optimizing.MetricsSummary; +import org.apache.amoro.optimizing.OptimizingPlanResult; import org.apache.amoro.optimizing.OptimizingType; -import org.apache.amoro.optimizing.RewriteFilesInput; -import org.apache.amoro.optimizing.RewriteStageTask; -import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner; +import org.apache.amoro.optimizing.TableOptimizingCommitter; +import org.apache.amoro.optimizing.TableOptimizingPlanner; +import org.apache.amoro.optimizing.TaskMetricsSummary; +import org.apache.amoro.optimizing.TaskProperties; import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.StagedTaskDescriptor; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.AmoroServiceConstants; import org.apache.amoro.server.catalog.CatalogManager; @@ -41,45 +46,42 @@ import org.apache.amoro.server.persistence.OptimizingProcessState; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.TaskFilesPersistence; +import org.apache.amoro.server.persistence.converter.TaskDescriptorRecoveryTypes; import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; +import org.apache.amoro.server.process.ProcessFactoryRouter; import org.apache.amoro.server.process.TableProcessMeta; import org.apache.amoro.server.resource.OptimizerInstance; import org.apache.amoro.server.resource.OptimizerThread; import org.apache.amoro.server.resource.QuotaProvider; import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.server.table.OptimizingOwnerConflictException; import org.apache.amoro.server.table.blocker.TableBlocker; -import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; -import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableIdentifier; import org.apache.amoro.utils.CompatiblePropertyUtil; import org.apache.amoro.utils.ExceptionUtil; -import org.apache.amoro.utils.MixedDataFiles; -import org.apache.amoro.utils.TablePropertyUtil; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.util.StructLikeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; @@ -91,6 +93,13 @@ public class OptimizingQueue extends PersistentBase { private static final Logger LOG = LoggerFactory.getLogger(OptimizingQueue.class); + private static final long STALE_OWNER_RECYCLE_MILLIS = TimeUnit.MINUTES.toMillis(15); + private static final Set ACTIVE_PROCESS_STATUSES = + EnumSet.of( + ProcessStatus.PENDING, + ProcessStatus.SUBMITTED, + ProcessStatus.RUNNING, + ProcessStatus.CANCELING); private final QuotaProvider quotaProvider; private final Queue tableQueue = new LinkedTransferQueue<>(); @@ -98,14 +107,25 @@ public class OptimizingQueue extends PersistentBase { private final CatalogManager catalogManager; private final Executor planExecutor; // Keep all planning table identifiers - private final Set planningTables = new HashSet<>(); + private final Set planningTables = ConcurrentHashMap.newKeySet(); private final Lock scheduleLock = new ReentrantLock(); private final Condition planningCompleted = scheduleLock.newCondition(); private final int maxPlanningParallelism; + private final Semaphore planningSlots; private final OptimizerGroupMetrics metrics; private ResourceGroup optimizerGroup; + private final ProcessFactoryRouter router; private final Map optimizingTasksMap = new ConcurrentHashMap<>(); + private final Set warmingTables = ConcurrentHashMap.newKeySet(); + private final Set warmedTables = ConcurrentHashMap.newKeySet(); + + public enum WarmupResult { + WARMED, + SKIPPED, + DUPLICATE, + FAILED + } public OptimizingQueue( CatalogManager catalogManager, @@ -113,54 +133,85 @@ public OptimizingQueue( QuotaProvider quotaProvider, Executor planExecutor, List tableRuntimeList, - int maxPlanningParallelism) { + int maxPlanningParallelism, + ProcessFactoryRouter router) { Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null"); this.planExecutor = planExecutor; this.optimizerGroup = optimizerGroup; this.quotaProvider = quotaProvider; this.scheduler = new SchedulingPolicy(optimizerGroup); this.catalogManager = catalogManager; + Preconditions.checkArgument( + maxPlanningParallelism >= 0, "Max planning parallelism can not be negative"); this.maxPlanningParallelism = maxPlanningParallelism; + this.planningSlots = new Semaphore(this.maxPlanningParallelism); + this.router = router; this.metrics = new OptimizerGroupMetrics( optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this); this.metrics.register(); - tableRuntimeList.forEach(this::initTableRuntime); } - private void initTableRuntime(DefaultTableRuntime tableRuntime) { + public WarmupResult warmupTable(DefaultTableRuntime tableRuntime) { + ServerTableIdentifier identifier = tableRuntime.getTableIdentifier(); + if (warmedTables.contains(identifier)) { + return WarmupResult.DUPLICATE; + } + if (!warmingTables.add(identifier)) { + return WarmupResult.DUPLICATE; + } try { - TableOptimizingProcess process = loadProcess(tableRuntime); - - if (!tableRuntime.getOptimizingConfig().isEnabled()) { - closeProcessIfRunning(process); - return; + WarmupResult result = doWarmupTable(tableRuntime); + if (result == WarmupResult.WARMED || result == WarmupResult.SKIPPED) { + warmedTables.add(identifier); } + return result; + } catch (Exception e) { + LOG.error("Failed to warm up table runtime for table {}, skipping", identifier, e); + return WarmupResult.FAILED; + } finally { + warmingTables.remove(identifier); + } + } - tableRuntime.resetTaskQuotas( - System.currentTimeMillis() - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME); + private WarmupResult doWarmupTable(DefaultTableRuntime tableRuntime) { + TableOptimizingProcess process = loadProcess(tableRuntime); - if (canResumeProcess(process, tableRuntime)) { - if (process.allTasksPrepared()) { - LOG.info( - "All tasks already completed for process {} on table {} during recovery," - + " triggering commit", - process.getProcessId(), - tableRuntime.getTableIdentifier()); - tableRuntime.beginCommitting(); - } else { - tableQueue.offer(process); - } + if (!tableRuntime.getOptimizingConfig().isEnabled()) { + closeProcessIfRunning(process); + return WarmupResult.SKIPPED; + } + + if (!isFormatSupported(tableRuntime)) { + closeProcessIfRunning(process); + return WarmupResult.SKIPPED; + } + + tableRuntime.resetTaskQuotas( + System.currentTimeMillis() - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME); + + if (canReplayPaimonCommittingProcess(process, tableRuntime)) { + LOG.info( + "Paimon process {} on table {} is already COMMITTING with completed tasks during" + + " recovery, keeping it for commit replay", + process.getProcessId(), + tableRuntime.getTableIdentifier()); + } else if (canResumeProcess(process, tableRuntime)) { + if (process.allTasksPrepared()) { + LOG.info( + "All tasks already completed for process {} on table {} during recovery," + + " triggering commit", + process.getProcessId(), + tableRuntime.getTableIdentifier()); + tableRuntime.beginCommitting(); } else { - resetTableForRecovery(process, tableRuntime); - scheduler.addTable(tableRuntime); + tableQueue.offer(process); } - } catch (Exception e) { - LOG.error( - "Failed to initialize table runtime for table {}, skipping", - tableRuntime.getTableIdentifier(), - e); + } else { + resetTableForRecovery(process, tableRuntime); + scheduler.addTable(tableRuntime); } + return WarmupResult.WARMED; } private TableOptimizingProcess loadProcess(DefaultTableRuntime tableRuntime) { @@ -195,6 +246,15 @@ private boolean canResumeProcess( && tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING; } + private boolean canReplayPaimonCommittingProcess( + TableOptimizingProcess process, DefaultTableRuntime tableRuntime) { + return tableRuntime.getFormat() == TableFormat.PAIMON + && process != null + && process.getStatus() == ProcessStatus.RUNNING + && tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING + && process.allTasksPrepared(); + } + private void resetTableForRecovery( TableOptimizingProcess process, DefaultTableRuntime tableRuntime) { closeProcessIfRunning(process); @@ -220,9 +280,12 @@ public String getContainerName() { } public void refreshTable(DefaultTableRuntime tableRuntime) { + if (!isFormatSupported(tableRuntime)) { + return; + } if (tableRuntime.getOptimizingConfig().isEnabled() && !tableRuntime.getOptimizingStatus().isProcessing()) { - LOG.info( + LOG.debug( "Bind queue {} success with table {}", optimizerGroup.getName(), tableRuntime.getTableIdentifier()); @@ -232,7 +295,21 @@ public void refreshTable(DefaultTableRuntime tableRuntime) { } } + private boolean isFormatSupported(DefaultTableRuntime tableRuntime) { + TableFormat format = tableRuntime.getFormat(); + if (!router.supportedFormats().contains(format)) { + LOG.debug( + "Skip table {} with unsupported format {} for queue {}", + tableRuntime.getTableIdentifier(), + format, + optimizerGroup.getName()); + return false; + } + return true; + } + public void releaseTable(DefaultTableRuntime tableRuntime) { + clearWarmupState(tableRuntime); scheduler.removeTable(tableRuntime); List processList = tableQueue.stream() @@ -257,7 +334,12 @@ public TaskRuntime pollTask( resetStaleTasksForThread(thread); long deadline = calculateDeadline(maxWaitTime); TaskRuntime task = fetchScheduledTask(thread, true); - while (task == null && waitTask(deadline)) { + while (task == null) { + fillPlanningSlots(); + task = fetchScheduledTask(thread, true); + if (task != null || !awaitPlanningCompleted(deadline)) { + break; + } task = fetchScheduledTask(thread, true); } if (task == null && breakQuotaLimit && planningTables.isEmpty()) { @@ -275,15 +357,15 @@ private long calculateDeadline(long maxWaitTime) { return deadline <= 0 ? Long.MAX_VALUE : deadline; } - private boolean waitTask(long waitDeadline) { + private boolean awaitPlanningCompleted(long waitDeadline) { scheduleLock.lock(); try { long currentTime = System.currentTimeMillis(); - scheduleTableIfNecessary(currentTime); return waitDeadline > currentTime && planningCompleted.await(waitDeadline - currentTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.error("Schedule table interrupted", e); + Thread.currentThread().interrupt(); return false; } finally { scheduleLock.unlock(); @@ -298,12 +380,31 @@ private TaskRuntime fetchScheduledTask(OptimizerThread thread, boolean needQu .orElse(null); } - private void scheduleTableIfNecessary(long startTime) { - if (planningTables.size() < maxPlanningParallelism) { - Set skipTables = new HashSet<>(planningTables); - skipBlockedTables(skipTables); - Optional.ofNullable(scheduler.scheduleTable(skipTables)) - .ifPresent(tableRuntime -> triggerAsyncPlanning(tableRuntime, skipTables, startTime)); + private void fillPlanningSlots() { + int availableSlots = planningSlots.availablePermits(); + if (availableSlots <= 0) { + return; + } + + Set skipTables = new HashSet<>(planningTables); + skipBlockedTables(skipTables); + + List candidates = scheduler.scheduleTables(skipTables, availableSlots); + for (DefaultTableRuntime tableRuntime : candidates) { + ServerTableIdentifier identifier = tableRuntime.getTableIdentifier(); + if (!planningSlots.tryAcquire()) { + return; + } + if (!planningTables.add(identifier)) { + planningSlots.release(); + continue; + } + skipTables.add(identifier); + if (!triggerAsyncPlanning(tableRuntime, new HashSet<>(skipTables))) { + planningTables.remove(identifier); + planningSlots.release(); + signalPlanningCompleted(); + } } } @@ -313,7 +414,7 @@ private void skipBlockedTables(Set skipTables) { TableBlockerMapper.class, mapper -> mapper.selectAllBlockers(System.currentTimeMillis())); Map identifierMap = Maps.newHashMap(); - for (ServerTableIdentifier identifier : scheduler.getTableRuntimeMap().keySet()) { + for (ServerTableIdentifier identifier : scheduler.tableIdentifiersSnapshot()) { identifierMap.put(identifier.getIdentifier(), identifier); } tableBlockerList.stream() @@ -327,76 +428,250 @@ private void skipBlockedTables(Set skipTables) { .forEach(skipTables::add); } - private void triggerAsyncPlanning( - DefaultTableRuntime tableRuntime, Set skipTables, long startTime) { + private boolean triggerAsyncPlanning( + DefaultTableRuntime tableRuntime, Set skipTables) { + long startTime = System.currentTimeMillis(); LOG.info( "Trigger planning table {} by policy {}", tableRuntime.getTableIdentifier(), scheduler.name()); - planningTables.add(tableRuntime.getTableIdentifier()); - CompletableFuture.supplyAsync(() -> planInternal(tableRuntime), planExecutor) - .whenComplete( - (process, throwable) -> { - if (throwable != null) { - LOG.error("Failed to plan table {}", tableRuntime.getTableIdentifier(), throwable); - } - long currentTime = System.currentTimeMillis(); - scheduleLock.lock(); - try { - tableRuntime.setLastPlanTime(currentTime); - planningTables.remove(tableRuntime.getTableIdentifier()); - if (process != null) { - tableQueue.offer(process); - String skipIds = - skipTables.stream() - .map(ServerTableIdentifier::getId) - .sorted() - .map(item -> item + "") - .collect(Collectors.joining(",")); - LOG.info( - "Completed planning on table {} with {} tasks with a total cost of {} ms, skipping {} tables.", - tableRuntime.getTableIdentifier(), - process.getTaskMap().size(), - currentTime - startTime, - skipTables.size()); - LOG.debug("Skipped planning table IDs:{}", skipIds); - } else if (throwable == null) { - LOG.info( - "Skipping planning table {} with a total cost of {} ms.", - tableRuntime.getTableIdentifier(), - currentTime - startTime); - } - planningCompleted.signalAll(); - } finally { - scheduleLock.unlock(); - } - }); + try { + CompletableFuture.supplyAsync(() -> planInternal(tableRuntime), planExecutor) + .whenComplete( + (process, throwable) -> + completePlanning(tableRuntime, process, throwable, skipTables, startTime)); + return true; + } catch (RuntimeException e) { + LOG.error("Failed to submit planning table {}", tableRuntime.getTableIdentifier(), e); + return false; + } + } + + private void completePlanning( + DefaultTableRuntime tableRuntime, + TableOptimizingProcess process, + Throwable throwable, + Set skipTables, + long startTime) { + long currentTime = System.currentTimeMillis(); + try { + if (throwable != null) { + LOG.error("Failed to plan table {}", tableRuntime.getTableIdentifier(), throwable); + } + if (process != null) { + tableQueue.offer(process); + String skipIds = + skipTables.stream() + .map(ServerTableIdentifier::getId) + .sorted() + .map(String::valueOf) + .collect(Collectors.joining(",")); + LOG.info( + "Completed planning on table {} with {} tasks with a total cost of {} ms, skipping {} tables.", + tableRuntime.getTableIdentifier(), + process.getTaskMap().size(), + currentTime - startTime, + skipTables.size()); + LOG.debug("Skipped planning table IDs:{}", skipIds); + } else if (throwable == null) { + LOG.info( + "Skipping planning table {} with a total cost of {} ms.", + tableRuntime.getTableIdentifier(), + currentTime - startTime); + } + tableRuntime.setLastPlanTime(currentTime); + } finally { + planningTables.remove(tableRuntime.getTableIdentifier()); + planningSlots.release(); + signalPlanningCompleted(); + } + } + + private void signalPlanningCompleted() { + scheduleLock.lock(); + try { + planningCompleted.signalAll(); + } finally { + scheduleLock.unlock(); + } + } + + private boolean prepareOwnerForPlanning(DefaultTableRuntime tableRuntime) { + for (int attempts = 0; attempts < 3; attempts++) { + long ownerProcessId = tableRuntime.getProcessId(); + if (ownerProcessId == 0L) { + return true; + } + + TableProcessMeta processMeta = + getAs(TableProcessMapper.class, mapper -> mapper.getProcessMeta(ownerProcessId)); + if (processMeta == null || !isActiveProcess(processMeta.getStatus())) { + if (tableRuntime.normalizeProcessOwner(ownerProcessId)) { + LOG.info( + "Normalized owner process {} for table {} before planning", + ownerProcessId, + tableRuntime.getTableIdentifier()); + return true; + } + continue; + } + + if (shouldRecycleStaleOwner(processMeta) + && recycleStaleOwner(tableRuntime, processMeta, ownerProcessId)) { + continue; + } + + LOG.info( + "Skip planning table {} because owner process {} is still active with status {}", + tableRuntime.getTableIdentifier(), + ownerProcessId, + processMeta.getStatus()); + return false; + } + + if (tableRuntime.getProcessId() == 0L) { + return true; + } + LOG.warn( + "Skip planning table {} because owner normalization exceeded retry budget", + tableRuntime.getTableIdentifier()); + return false; + } + + private boolean shouldRecycleStaleOwner(TableProcessMeta processMeta) { + return System.currentTimeMillis() - processMeta.getCreateTime() > STALE_OWNER_RECYCLE_MILLIS; + } + + private boolean recycleStaleOwner( + DefaultTableRuntime tableRuntime, TableProcessMeta processMeta, long ownerProcessId) { + int unfinishedTasks = + getAs( + OptimizingProcessMapper.class, + mapper -> + mapper.countUnfinishedTasks( + tableRuntime.getTableIdentifier().getId(), ownerProcessId)); + if (unfinishedTasks > 0) { + return false; + } + + String staleReason = + String.format( + "recycled stale optimizing owner process %d after %d ms without unfinished tasks", + ownerProcessId, System.currentTimeMillis() - processMeta.getCreateTime()); + doAsTransaction( + () -> + doAs( + TableProcessMapper.class, + mapper -> + mapper.updateProcess( + tableRuntime.getTableIdentifier().getId(), + ownerProcessId, + processMeta.getExternalProcessIdentifier(), + ProcessStatus.FAILED, + tableRuntime.getOptimizingStatus().name().toLowerCase(), + processMeta.getRetryNumber(), + System.currentTimeMillis(), + staleReason, + processMeta.getProcessParameters() == null + ? new HashMap<>() + : processMeta.getProcessParameters(), + processMeta.getSummary() == null + ? new HashMap<>() + : processMeta.getSummary())), + () -> { + if (!tableRuntime.normalizeProcessOwner(ownerProcessId)) { + throw new IllegalStateException( + String.format( + "failed to clear stale owner process %d for table %s", + ownerProcessId, tableRuntime.getTableIdentifier())); + } + }, + () -> + tableRuntime + .store() + .begin() + .updateStatusCode(code -> OptimizingStatus.PENDING.getCode()) + .commit()); + LOG.warn( + "Recycled stale owner process {} for table {}", + ownerProcessId, + tableRuntime.getTableIdentifier()); + return true; + } + + private boolean isActiveProcess(ProcessStatus status) { + return ACTIVE_PROCESS_STATUSES.contains(status); } private TableOptimizingProcess planInternal(DefaultTableRuntime tableRuntime) { + if (!prepareOwnerForPlanning(tableRuntime)) { + return null; + } tableRuntime.beginPlanning(); try { ServerTableIdentifier identifier = tableRuntime.getTableIdentifier(); AmoroTable table = catalogManager.loadTable(identifier.getIdentifier()); - AbstractOptimizingPlanner planner = - IcebergTableUtil.createOptimizingPlanner( - tableRuntime.refresh(table), - (MixedTable) table.originalTable(), - getAvailableCore(), - maxInputSizePerThread()); + tableRuntime.refresh(table); + + if (!isFormatSupported(tableRuntime)) { + tableRuntime.completeEmptyProcess(); + return null; + } + + TableOptimizingPlanner planner = + router + .forFormat(tableRuntime.getFormat()) + .createPlanner(tableRuntime, table, getAvailableCore(), maxInputSizePerThread()); if (planner.isNecessary()) { - return new TableOptimizingProcess(planner, tableRuntime); + OptimizingPlanResult planResult = planner.plan(); + if (planResult.getTasks() == null || planResult.getTasks().isEmpty()) { + LOG.info( + "Planner {} returned empty tasks despite isNecessary=true; skip creating empty process for {}", + planner.getClass().getSimpleName(), + tableRuntime.getTableIdentifier()); + tableRuntime.completeEmptyProcess(); + return null; + } + return new TableOptimizingProcess(planResult, tableRuntime); } else { tableRuntime.completeEmptyProcess(); return null; } } catch (Throwable throwable) { + if (containsCause(throwable, OptimizingOwnerConflictException.class)) { + long currentOwner = tableRuntime.getProcessId(); + if (currentOwner == 0L) { + // Defensive fallback for rare races where owner changed after beginPlanning but before + // exception handling. Keep the table schedulable instead of leaving it in PLANNING. + tableRuntime.planFailed(); + LOG.warn( + "Owner conflict observed for table {} but owner is already cleared, reset status to PENDING", + tableRuntime.getTableIdentifier()); + } else { + LOG.info( + "Skip planning table {} because optimizing owner {} is already held by another process", + tableRuntime.getTableIdentifier(), + currentOwner); + } + return null; + } tableRuntime.planFailed(); LOG.error("Planning table {} failed", tableRuntime.getTableIdentifier(), throwable); throw throwable; } } + private boolean containsCause(Throwable throwable, Class causeClass) { + Throwable current = throwable; + while (current != null) { + if (causeClass.isInstance(current)) { + return true; + } + current = current.getCause(); + } + return false; + } + public void ackTask(OptimizingTaskId taskId, OptimizerThread thread) { findProcess(taskId).ackTask(taskId, thread); } @@ -419,7 +694,7 @@ public List> collectTasks(Predicate> predicate) { } public void retryTask(TaskRuntime taskRuntime) { - findProcess(taskRuntime.getTaskId()).resetTask((TaskRuntime) taskRuntime); + findProcess(taskRuntime.getTaskId()).resetTask(taskRuntime); } private void resetStaleTasksForThread(OptimizerThread thread) { @@ -463,6 +738,8 @@ public void removeOptimizer(OptimizerInstance optimizerInstance) { } public void dispose() { + warmingTables.clear(); + warmedTables.clear(); this.metrics.unregister(); } @@ -490,17 +767,33 @@ SchedulingPolicy getSchedulingPolicy() { return scheduler; } + private void clearWarmupState(DefaultTableRuntime tableRuntime) { + ServerTableIdentifier identifier = tableRuntime.getTableIdentifier(); + warmingTables.remove(identifier); + warmedTables.remove(identifier); + } + + @VisibleForTesting + boolean containsWarmupStateForTest(ServerTableIdentifier identifier) { + return warmingTables.contains(identifier) || warmedTables.contains(identifier); + } + private class TableOptimizingProcess implements OptimizingProcess { private final Lock lock = new ReentrantLock(); private final long processId; + private final TableFormat format; private final OptimizingType optimizingType; private final DefaultTableRuntime tableRuntime; private final long planTime; private final long targetSnapshotId; private final long targetChangeSnapshotId; - private final Map> taskMap = Maps.newHashMap(); - private final Queue> taskQueue = new LinkedList<>(); + // Widened to StagedTaskDescriptor so non-Iceberg formats (Paimon today, Mixed-Hive + // tomorrow) share the same process/queue machinery without unchecked downcasts. + private final Map>> + taskMap = Maps.newHashMap(); + private final Queue>> taskQueue = + new LinkedList<>(); private volatile ProcessStatus status = ProcessStatus.RUNNING; private volatile String failedReason; private long endTime = AmoroServiceConstants.INVALID_TIME; @@ -539,16 +832,17 @@ public TaskRuntime poll(OptimizerThread thread, boolean needQuotaChecking) { } public TableOptimizingProcess( - AbstractOptimizingPlanner planner, DefaultTableRuntime tableRuntime) { - processId = planner.getProcessId(); + OptimizingPlanResult planResult, DefaultTableRuntime tableRuntime) { + processId = planResult.getProcessId(); this.tableRuntime = tableRuntime; - optimizingType = planner.getOptimizingType(); - planTime = planner.getPlanTime(); - targetSnapshotId = planner.getTargetSnapshotId(); - targetChangeSnapshotId = planner.getTargetChangeSnapshotId(); - loadTaskRuntimes(planner.planTasks()); - fromSequence = planner.getFromSequence(); - toSequence = planner.getToSequence(); + this.format = tableRuntime.getFormat(); + optimizingType = planResult.getOptimizingType(); + planTime = planResult.getPlanTime(); + targetSnapshotId = planResult.getTargetSnapshotId(); + targetChangeSnapshotId = planResult.getTargetChangeSnapshotId(); + loadTaskRuntimes(planResult.getTasks()); + fromSequence = planResult.getFromSequence(); + toSequence = planResult.getToSequence(); beginAndPersistProcess(); } @@ -557,6 +851,7 @@ public TableOptimizingProcess( TableProcessMeta processMeta, OptimizingProcessState processState) { this.tableRuntime = tableRuntime; + this.format = tableRuntime.getFormat(); processId = tableRuntime.getProcessId(); optimizingType = OptimizingType.valueOf(processMeta.getProcessType()); targetSnapshotId = processState.getTargetSnapshotId(); @@ -590,6 +885,10 @@ public long getProcessId() { return processId; } + TableFormat getFormat() { + return format; + } + @Override public OptimizingType getOptimizingType() { return optimizingType; @@ -658,6 +957,10 @@ private void acceptResult(TaskRuntime taskRuntime) { } return v; }); + // task cancel means persistAndSetCompleted has been called and start/end may be absent. + if (taskRuntime.getStatus() == TaskRuntime.Status.CANCELED) { + return; + } try { tableRuntime.addTaskQuota(taskRuntime.getCurrentQuota()); } catch (Throwable throwable) { @@ -667,10 +970,6 @@ private void acceptResult(TaskRuntime taskRuntime) { taskRuntime.getTaskId(), throwable); } - // task cancel means persistAndSetCompleted has been called - if (taskRuntime.getStatus() == TaskRuntime.Status.CANCELED) { - return; - } if (isClosed()) { throw new OptimizingClosedException(processId); } @@ -716,11 +1015,17 @@ private void acceptResult(TaskRuntime taskRuntime) { } } - private void resetTask(TaskRuntime taskRuntime) { + private void resetTask(TaskRuntime taskRuntime) { lock.lock(); try { taskRuntime.reset(); - taskQueue.add(taskRuntime); + // Cast is safe: only descriptors that already entered the widened taskMap end up here; the + // public retryTask(TaskRuntime) entry point forwards whatever pollTask previously + // returned, and pollTask only hands out entries sourced from taskMap itself. + @SuppressWarnings("unchecked") + TaskRuntime> widened = + (TaskRuntime>) taskRuntime; + taskQueue.add(widened); } finally { lock.unlock(); } @@ -759,7 +1064,8 @@ public String getFailedReason() { return failedReason; } - private Map> getTaskMap() { + private Map>> + getTaskMap() { return taskMap; } @@ -796,18 +1102,12 @@ public int getActualQuota() { @Override public void commit() { - List> successTasks = + List>> successTasks = taskMap.values().stream() .filter(task -> task.getStatus() == Status.SUCCESS) .collect(Collectors.toList()); LOG.debug( - "{} get {} tasks of {} partitions to commit", - tableRuntime.getTableIdentifier(), - successTasks.size(), - successTasks.stream() - .map(task -> task.getTaskDescriptor().getPartition()) - .distinct() - .count()); + "{} get {} tasks to commit", tableRuntime.getTableIdentifier(), successTasks.size()); lock.lock(); try { @@ -821,6 +1121,15 @@ public void commit() { } try { hasCommitted = true; + if (successTasks.isEmpty()) { + status = ProcessStatus.FAILED; + failedReason = + "No successful task is available for commit, stop building committer to avoid" + + " invalid commit identity."; + endTime = System.currentTimeMillis(); + persistAndSetCompleted(false); + return; + } buildCommit().commit(); if (allTasksPrepared()) { status = ProcessStatus.SUCCESS; @@ -851,47 +1160,35 @@ public void commit() { @Override public MetricsSummary getSummary() { - List taskSummaries = + // Route through StagedTaskDescriptor.toMetricsSummary() so non-Iceberg descriptors (e.g. + // PaimonCompactionTask) contribute to the aggregate without OptimizingQueue needing to know + // their concrete type. MetricsSummary#aggregate handles the mixed list. + List taskSummaries = taskMap.values().stream() .map(TaskRuntime::getTaskDescriptor) - .map(RewriteStageTask::getSummary) + .map(StagedTaskDescriptor::toMetricsSummary) .collect(Collectors.toList()); - return new MetricsSummary(taskSummaries); + return MetricsSummary.aggregate(taskSummaries); } - private UnKeyedTableCommit buildCommit() { - MixedTable table = - (MixedTable) - catalogManager - .loadTable(tableRuntime.getTableIdentifier().getIdentifier()) - .originalTable(); - if (table.isUnkeyedTable()) { - return new UnKeyedTableCommit(targetSnapshotId, table, taskMap.values()); - } else { - return new KeyedTableCommit( - table, - taskMap.values(), - targetSnapshotId, - convertPartitionSequence(table, fromSequence), - convertPartitionSequence(table, toSequence)); - } - } - - private StructLikeMap convertPartitionSequence( - MixedTable table, Map partitionSequence) { - PartitionSpec spec = table.spec(); - StructLikeMap results = StructLikeMap.create(spec.partitionType()); - partitionSequence.forEach( - (partition, sequence) -> { - if (spec.isUnpartitioned()) { - results.put(TablePropertyUtil.EMPTY_STRUCT, sequence); - } else { - StructLike partitionData = MixedDataFiles.data(spec, partition); - results.put(partitionData, sequence); - } - }); - return results; + private TableOptimizingCommitter buildCommit() { + AmoroTable table = + catalogManager.loadTable(tableRuntime.getTableIdentifier().getIdentifier()); + List> taskDescriptors = + taskMap.values().stream() + .filter(task -> task.getStatus() == Status.SUCCESS) + .map(TaskRuntime::getTaskDescriptor) + .collect(Collectors.toList()); + return router + .forFormat(format) + .createCommitter( + table, + targetSnapshotId, + targetChangeSnapshotId, + taskDescriptors, + fromSequence, + toSequence); } private void beginAndPersistProcess() { @@ -926,8 +1223,8 @@ private void beginAndPersistProcess() { () -> doAs( OptimizingProcessMapper.class, - mapper -> mapper.insertTaskRuntimes(Lists.newArrayList(taskMap.values()))), - () -> TaskFilesPersistence.persistTaskInputs(processId, taskMap.values()), + mapper -> mapper.insertTaskRuntimes(typedTaskRuntimes())), + () -> TaskFilesPersistence.persistTaskInputs(processId, typedTaskRuntimes()), () -> tableRuntime.beginProcess(this)); } @@ -953,7 +1250,7 @@ private void persistAndSetCompleted(boolean success) { getFailedReason(), new HashMap<>(), getSummary().summaryAsMap(false))), - () -> tableRuntime.completeProcess(success), + () -> tableRuntime.completeProcess(this, success), () -> clearProcess(this)); } @@ -961,21 +1258,108 @@ private void cancelTasks() { taskMap.values().forEach(TaskRuntime::tryCanceling); } + /** + * Narrow the task-map values to the bound expected by {@code insertTaskRuntimes} and {@code + * TaskFilesPersistence}. Every descriptor that enters {@code taskMap} extends {@code + * StagedTaskDescriptor} where {@code I extends BaseOptimizingInput}, so the unchecked + * cast is safe. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private List>> + typedTaskRuntimes() { + return (List) Lists.newArrayList(taskMap.values()); + } + + /** + * Bind a loaded {@link BaseOptimizingInput} back into its descriptor. The helper exists solely + * to "capture" the descriptor's {@code I} type parameter so we can pass the polymorphic input + * through {@link StagedTaskDescriptor#setInput} without the caller knowing the concrete + * subclass. Type safety is guaranteed by {@code TaskDescriptorTypeConverter} pairing the + * descriptor class with the input written by {@code TaskFilesPersistence#persistTaskInputs}. + * + *

A null {@code input} is passed through unchanged (preserving the pre-C4 behaviour where a + * missing entry in the loaded map would null out the descriptor's input — each {@code + * calculateSummary} override tolerates this). + */ + private void bindLoadedInput( + StagedTaskDescriptor descriptor, BaseOptimizingInput input) { + @SuppressWarnings("unchecked") + I typedInput = (I) input; + descriptor.setInput(typedInput); + } + + /** + * Back-fill {@link TaskProperties#TASK_EXECUTOR_FACTORY_IMPL} on descriptors restored from + * pre-0.9 Iceberg rows whose {@code properties} column predates the multi-format refactor. + * + *

The injection is format-driven via {@link LegacyExecutorFactoryDefaults}: Iceberg / + * Mixed-Iceberg / Mixed-Hive get their canonical executor factory class name; Paimon and + * unknown formats are logged and skipped (Paimon planners always emit the key, so a missing + * entry for Paimon indicates a corrupted row the optimizer cannot safely resume). + * + *

Idempotent: rows that already carry the key are untouched, so new-format rows co-located + * with legacy Iceberg rows in the same process still work. + */ + private void injectLegacyExecutorFactoryDefaults( + List>> + taskRuntimes) { + for (TaskRuntime> + taskRuntime : taskRuntimes) { + StagedTaskDescriptor descriptor = + taskRuntime.getTaskDescriptor(); + if (descriptor == null) { + continue; + } + Map props = descriptor.getProperties(); + if (props != null && props.containsKey(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL)) { + continue; + } + String defaultImpl = + LegacyExecutorFactoryDefaults.resolveDefaultExecutorFactoryImpl( + tableRuntime.getFormat()); + if (defaultImpl == null) { + continue; + } + if (descriptor.ensureExecutorFactoryImpl(defaultImpl)) { + LOG.info( + "Restored legacy task {} with default {}={} (format={})", + taskRuntime.getTaskId(), + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL, + defaultImpl, + tableRuntime.getFormat()); + } + } + } + private void loadTaskRuntimes(OptimizingProcess optimizingProcess) { try { - List> taskRuntimes = - getAs( - OptimizingProcessMapper.class, - mapper -> - mapper.selectTaskRuntimes( - tableRuntime.getTableIdentifier().getId(), processId)); - Map inputs = TaskFilesPersistence.loadTaskInputs(processId); + // Recovery is format-agnostic after C3/C4: TaskDescriptorTypeConverter resolves the + // descriptor subclass from TASK_EXECUTOR_FACTORY_IMPL, and TaskFilesPersistence returns + // a Map whose concrete values carry their own runtime + // class. We widen the compile-time view to StagedTaskDescriptor so the same call recovers both Iceberg and Paimon rows. + List>> + taskRuntimes = + getAs( + OptimizingProcessMapper.class, + mapper -> + mapper.selectTaskRuntimes( + tableRuntime.getTableIdentifier().getId(), processId)); + Map inputs = TaskFilesPersistence.loadTaskInputs(processId); + // Compensation for pre-0.9 Iceberg rows: TaskDescriptorTypeConverter correctly routes rows + // with a missing TASK_EXECUTOR_FACTORY_IMPL to RewriteStageTask, but the key itself is + // still absent from the descriptor's properties map on restore. OptimizerExecutor later + // reads that key via reflection to build the executor factory — without this injection it + // would NPE. Idempotent: we never overwrite an existing key (e.g. for Paimon rows where + // the Paimon planner already populated it). + injectLegacyExecutorFactoryDefaults(taskRuntimes); taskRuntimes.forEach( taskRuntime -> { taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime)); - taskRuntime - .getTaskDescriptor() - .setInput(inputs.get(taskRuntime.getTaskId().getTaskId())); + BaseOptimizingInput input = inputs.get(taskRuntime.getTaskId().getTaskId()); + TaskDescriptorRecoveryTypes.validateRecoveredTask( + taskRuntime.getTaskDescriptor(), input, tableRuntime.getFormat()); + bindLoadedInput(taskRuntime.getTaskDescriptor(), input); taskMap.put(taskRuntime.getTaskId(), taskRuntime); if (taskRuntime.getStatus() == TaskRuntime.Status.PLANNED) { taskQueue.offer(taskRuntime); @@ -992,7 +1376,7 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) { retryTask(taskRuntime); } }); - } catch (IllegalArgumentException e) { + } catch (IllegalArgumentException | ClassCastException e) { LOG.warn( "Load task inputs failed, close the optimizing process : {}", optimizingProcess.getProcessId(), @@ -1001,10 +1385,11 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) { } } - private void loadTaskRuntimes(List taskDescriptors) { + private > void loadTaskRuntimes( + List taskDescriptors) { int taskId = 1; - for (RewriteStageTask taskDescriptor : taskDescriptors) { - TaskRuntime taskRuntime = + for (T taskDescriptor : taskDescriptors) { + TaskRuntime taskRuntime = new TaskRuntime<>(new OptimizingTaskId(processId, taskId++), taskDescriptor); LOG.info( "{} plan new task {}, summary {}", diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupMetrics.java new file mode 100644 index 0000000000..c2128603e7 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupMetrics.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing; + +import static org.apache.amoro.metrics.MetricDefine.defineGauge; + +import org.apache.amoro.metrics.Gauge; +import org.apache.amoro.metrics.Metric; +import org.apache.amoro.metrics.MetricDefine; +import org.apache.amoro.metrics.MetricKey; +import org.apache.amoro.metrics.MetricRegistry; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** Global metrics for startup optimizing queue warmup. */ +public class OptimizingQueueWarmupMetrics { + + public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_TOTAL = + defineGauge("optimizing_queue_warmup_total") + .withDescription("Total number of table runtimes submitted to optimizing queue warmup") + .build(); + public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_LOADED = + defineGauge("optimizing_queue_warmup_loaded") + .withDescription("Number of table runtimes successfully warmed into optimizing queues") + .build(); + public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_SKIPPED = + defineGauge("optimizing_queue_warmup_skipped") + .withDescription("Number of table runtimes skipped during optimizing queue warmup") + .build(); + public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_FAILED = + defineGauge("optimizing_queue_warmup_failed") + .withDescription("Number of table runtimes failed during optimizing queue warmup") + .build(); + public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_RUNNING = + defineGauge("optimizing_queue_warmup_running") + .withDescription("Number of table runtimes currently running optimizing queue warmup") + .build(); + public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_RETRYING = + defineGauge("optimizing_queue_warmup_retrying") + .withDescription("Number of table runtimes retried during optimizing queue warmup") + .build(); + public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_INITIALIZING = + defineGauge("optimizing_queue_warmup_initializing") + .withDescription("Number of table runtimes waiting for optimizing queue warmup") + .build(); + + private final MetricRegistry registry; + private final List registeredMetricKeys = Lists.newArrayList(); + + private final AtomicLong total = new AtomicLong(); + private final AtomicLong loaded = new AtomicLong(); + private final AtomicLong skipped = new AtomicLong(); + private final AtomicLong failed = new AtomicLong(); + private final AtomicLong running = new AtomicLong(); + private final AtomicLong retrying = new AtomicLong(); + private final AtomicLong initializing = new AtomicLong(); + + public OptimizingQueueWarmupMetrics(MetricRegistry registry) { + this.registry = registry; + } + + public void register() { + registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_TOTAL, (Gauge) total::get); + registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_LOADED, (Gauge) loaded::get); + registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_SKIPPED, (Gauge) skipped::get); + registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_FAILED, (Gauge) failed::get); + registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_RUNNING, (Gauge) running::get); + registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_RETRYING, (Gauge) retrying::get); + registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_INITIALIZING, (Gauge) initializing::get); + } + + public void unregister() { + registeredMetricKeys.forEach(registry::unregister); + registeredMetricKeys.clear(); + } + + void recordSubmitted(long count) { + if (count <= 0) { + return; + } + total.addAndGet(count); + initializing.addAndGet(count); + } + + void recordStart() { + initializing.decrementAndGet(); + running.incrementAndGet(); + } + + void recordRetry() { + retrying.incrementAndGet(); + } + + void recordResult(OptimizingQueue.WarmupResult result) { + running.decrementAndGet(); + if (result == OptimizingQueue.WarmupResult.WARMED) { + loaded.incrementAndGet(); + } else if (result == OptimizingQueue.WarmupResult.SKIPPED + || result == OptimizingQueue.WarmupResult.DUPLICATE) { + skipped.incrementAndGet(); + } else { + failed.incrementAndGet(); + } + } + + private void registerMetric(MetricRegistry registry, MetricDefine define, Metric metric) { + MetricKey key = registry.register(define, Collections.emptyMap(), metric); + registeredMetricKeys.add(key); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupService.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupService.java new file mode 100644 index 0000000000..6ef19781c9 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupService.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.optimizing; + +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** Asynchronously warms table runtimes into optimizing queues during AMS startup. */ +public class OptimizingQueueWarmupService implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(OptimizingQueueWarmupService.class); + private static final int MAX_ATTEMPTS = 3; + + private final Function> queueSupplier; + private final Predicate runtimeStillActive; + private final OptimizingQueueWarmupMetrics metrics; + private final ExecutorService executor; + private final AtomicInteger pendingTasks = new AtomicInteger(); + private final Object completionMonitor = new Object(); + private volatile boolean closed; + + public OptimizingQueueWarmupService( + int threadCount, + Function> queueSupplier, + Predicate runtimeStillActive, + OptimizingQueueWarmupMetrics metrics) { + this.queueSupplier = queueSupplier; + this.runtimeStillActive = runtimeStillActive; + this.metrics = metrics; + this.executor = + Executors.newFixedThreadPool( + Math.max(1, threadCount), + new ThreadFactoryBuilder() + .setNameFormat("optimizing-queue-warmup-%d") + .setDaemon(true) + .build()); + } + + public void warmupTables(List tableRuntimes) { + List sortedRuntimes = + Optional.ofNullable(tableRuntimes).orElseGet(Collections::emptyList).stream() + .sorted(Comparator.comparingInt(this::priority)) + .collect(Collectors.toList()); + if (sortedRuntimes.isEmpty() || closed) { + return; + } + metrics.recordSubmitted(sortedRuntimes.size()); + pendingTasks.addAndGet(sortedRuntimes.size()); + LOG.info("Start optimizing queue warmup: tables={}", sortedRuntimes.size()); + sortedRuntimes.forEach(runtime -> executor.submit(() -> warmupWithRetry(runtime))); + } + + public boolean awaitCompletionForTest(long timeout, TimeUnit unit) throws InterruptedException { + long deadline = System.nanoTime() + unit.toNanos(timeout); + synchronized (completionMonitor) { + while (pendingTasks.get() > 0) { + long remaining = deadline - System.nanoTime(); + if (remaining <= 0) { + return false; + } + TimeUnit.NANOSECONDS.timedWait(completionMonitor, remaining); + } + return true; + } + } + + @Override + public void close() { + closed = true; + executor.shutdownNow(); + metrics.unregister(); + synchronized (completionMonitor) { + completionMonitor.notifyAll(); + } + } + + private void warmupWithRetry(DefaultTableRuntime runtime) { + OptimizingQueue.WarmupResult result = OptimizingQueue.WarmupResult.FAILED; + metrics.recordStart(); + try { + for (int attempt = 1; attempt <= MAX_ATTEMPTS && !closed; attempt++) { + result = warmup(runtime); + if (result != OptimizingQueue.WarmupResult.FAILED) { + return; + } + if (attempt < MAX_ATTEMPTS) { + metrics.recordRetry(); + LOG.warn( + "Optimizing queue warmup failed for table {}, retrying attempt {}/{}", + runtime.getTableIdentifier(), + attempt + 1, + MAX_ATTEMPTS); + } + } + } finally { + metrics.recordResult(result); + if (pendingTasks.decrementAndGet() == 0) { + synchronized (completionMonitor) { + completionMonitor.notifyAll(); + } + } + } + } + + private OptimizingQueue.WarmupResult warmup(DefaultTableRuntime runtime) { + if (!runtimeStillActive.test(runtime)) { + LOG.info("Skip optimizing queue warmup for inactive table {}", runtime.getTableIdentifier()); + return OptimizingQueue.WarmupResult.SKIPPED; + } + Optional queue = queueSupplier.apply(runtime.getGroupName()); + if (!queue.isPresent()) { + LOG.info( + "Skip optimizing queue warmup for table {}, optimizer group {} does not exist", + runtime.getTableIdentifier(), + runtime.getGroupName()); + return OptimizingQueue.WarmupResult.SKIPPED; + } + return queue.get().warmupTable(runtime); + } + + private int priority(DefaultTableRuntime runtime) { + if (runtime.getProcessId() > 0) { + return 0; + } + if (runtime.getOptimizingStatus().isProcessing()) { + return 1; + } + return 2; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java index 0759f1e367..a2536de310 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java @@ -28,9 +28,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; @@ -38,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; public class SchedulingPolicy { @@ -84,14 +89,32 @@ public String name() { return policyName; } - public DefaultTableRuntime scheduleTable(Set skipSet) { + public List scheduleTables(Set skipSet, int limit) { + if (limit <= 0) { + return Collections.emptyList(); + } + + List tableRuntimeSnapshot; + tableLock.lock(); + try { + tableRuntimeSnapshot = new ArrayList<>(tableRuntimeMap.values()); + } finally { + tableLock.unlock(); + } + + Set effectiveSkipSet = new HashSet<>(skipSet); + fillSkipSet(tableRuntimeSnapshot, effectiveSkipSet); + return tableRuntimeSnapshot.stream() + .filter(tableRuntime -> !effectiveSkipSet.contains(tableRuntime.getTableIdentifier())) + .sorted(createSorterByPolicy()) + .limit(limit) + .collect(Collectors.toList()); + } + + public Set tableIdentifiersSnapshot() { tableLock.lock(); try { - fillSkipSet(skipSet); - return tableRuntimeMap.values().stream() - .filter(tableRuntime -> !skipSet.contains(tableRuntime.getTableIdentifier())) - .min(createSorterByPolicy()) - .orElse(null); + return new HashSet<>(tableRuntimeMap.keySet()); } finally { tableLock.unlock(); } @@ -106,9 +129,10 @@ private Comparator createSorterByPolicy() { } } - private void fillSkipSet(Set originalSet) { + private void fillSkipSet( + List tableRuntimes, Set originalSet) { long currentTime = System.currentTimeMillis(); - tableRuntimeMap.values().stream() + tableRuntimes.stream() .filter( tableRuntime -> !isTablePending(tableRuntime) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java index d71ae484b6..e8205be2e1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java @@ -33,10 +33,9 @@ import org.apache.amoro.op.SnapshotSummary; import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.optimizing.RewriteStageTask; +import org.apache.amoro.optimizing.TableOptimizingCommitter; import org.apache.amoro.optimizing.TaskProperties; import org.apache.amoro.properties.HiveTableProperties; -import org.apache.amoro.server.optimizing.TaskRuntime.Status; -import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.UnkeyedTable; @@ -74,26 +73,24 @@ import java.util.Set; import java.util.stream.Collectors; -public class UnKeyedTableCommit { +public class UnKeyedTableCommit implements TableOptimizingCommitter { private static final Logger LOG = LoggerFactory.getLogger(UnKeyedTableCommit.class); private final Long targetSnapshotId; private final MixedTable table; - private final Collection> tasks; + private final Collection tasks; public UnKeyedTableCommit( - Long targetSnapshotId, MixedTable table, Collection> tasks) { + Long targetSnapshotId, MixedTable table, Collection tasks) { this.targetSnapshotId = targetSnapshotId; this.table = table; this.tasks = tasks; } - private Set> getExcludedDeleteFiles( - List> successTasks) { + private Set> getExcludedDeleteFiles(List successTasks) { Set> excludedDeleteFiles = new HashSet<>(); tasks.stream() .filter(task -> !successTasks.contains(task)) - .map(TaskRuntime::getTaskDescriptor) .filter(task -> task.getInput().rewrittenDeleteFiles() != null) .forEach( task -> @@ -120,8 +117,11 @@ protected List moveFile2HiveIfNeed() { : table.asKeyedTable().baseTable().spec().partitionType(); List newTargetFiles = new ArrayList<>(); - for (TaskRuntime taskRuntime : tasks) { - RewriteFilesOutput output = taskRuntime.getTaskDescriptor().getOutput(); + for (RewriteStageTask task : tasks) { + if (task.getOutput() == null) { + continue; + } + RewriteFilesOutput output = task.getOutput(); DataFile[] dataFiles = output.getDataFiles(); if (dataFiles == null) { continue; @@ -139,7 +139,7 @@ protected List moveFile2HiveIfNeed() { for (DataFile targetFile : targetFiles) { String partitionPath = partitionPathMap.computeIfAbsent( - taskRuntime.getTaskDescriptor().getPartition(), + task.getPartition(), key -> getPartitionPath(hiveClient, maxTransactionId, targetFile, partitionSchema)); DataFile finalDataFile = moveTargetFiles(targetFile, partitionPath); @@ -207,10 +207,8 @@ private String getIcebergPartitionLocation(StructLike partitionData) { } public void commit() throws OptimizingCommitException { - List> successTasks = - tasks.stream() - .filter(task -> task.getStatus() == Status.SUCCESS) - .collect(Collectors.toList()); + List successTasks = + tasks.stream().filter(task -> task.getOutput() != null).collect(Collectors.toList()); if (successTasks.isEmpty()) { LOG.info("No tasks to commit for table {}", table.id()); return; @@ -226,29 +224,27 @@ public void commit() throws OptimizingCommitException { Set removedDataFiles = Sets.newHashSet(); Set addedDeleteFiles = Sets.newHashSet(); Set removedDeleteFiles = Sets.newHashSet(); - successTasks.stream() - .map(TaskRuntime::getTaskDescriptor) - .forEach( - task -> { - if (CollectionUtils.isNotEmpty(hiveNewDataFiles)) { - addedDataFiles.addAll(hiveNewDataFiles); - } else if (task.getOutput().getDataFiles() != null) { - addedDataFiles.addAll(Arrays.asList(task.getOutput().getDataFiles())); - } - if (task.getOutput().getDeleteFiles() != null) { - addedDeleteFiles.addAll(Arrays.asList(task.getOutput().getDeleteFiles())); - } - if (task.getInput().rewrittenDataFiles() != null) { - removedDataFiles.addAll(Arrays.asList(task.getInput().rewrittenDataFiles())); - } - if (task.getInput().rewrittenDeleteFiles() != null) { - removedDeleteFiles.addAll( - Arrays.stream(task.getInput().rewrittenDeleteFiles()) - .filter(deleteFile -> needRemove(excludedDeleteFiles, deleteFile)) - .map(ContentFiles::asDeleteFile) - .collect(Collectors.toSet())); - } - }); + successTasks.forEach( + task -> { + if (CollectionUtils.isNotEmpty(hiveNewDataFiles)) { + addedDataFiles.addAll(hiveNewDataFiles); + } else if (task.getOutput().getDataFiles() != null) { + addedDataFiles.addAll(Arrays.asList(task.getOutput().getDataFiles())); + } + if (task.getOutput().getDeleteFiles() != null) { + addedDeleteFiles.addAll(Arrays.asList(task.getOutput().getDeleteFiles())); + } + if (task.getInput().rewrittenDataFiles() != null) { + removedDataFiles.addAll(Arrays.asList(task.getInput().rewrittenDataFiles())); + } + if (task.getInput().rewrittenDeleteFiles() != null) { + removedDeleteFiles.addAll( + Arrays.stream(task.getInput().rewrittenDeleteFiles()) + .filter(deleteFile -> needRemove(excludedDeleteFiles, deleteFile)) + .map(ContentFiles::asDeleteFile) + .collect(Collectors.toSet())); + } + }); try { Transaction transaction = table.asUnkeyedTable().newTransaction(); if (removedDeleteFiles.isEmpty() && !addedDeleteFiles.isEmpty()) { @@ -394,7 +390,8 @@ private DataFile moveTargetFiles(DataFile targetFile, String hiveLocation) { private static Set getCommittedDataFilesFromSnapshotId( UnkeyedTable table, Long snapshotId) { - long currentSnapshotId = IcebergTableUtil.getSnapshotId(table, true); + Snapshot current = table.currentSnapshot(); + long currentSnapshotId = current != null ? current.snapshotId() : Constants.INVALID_SNAPSHOT_ID; if (currentSnapshotId == Constants.INVALID_SNAPSHOT_ID) { return Collections.emptySet(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java index 2ee650c522..cb99097437 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java @@ -67,7 +67,7 @@ public AmoroSessionTableSchema(DataSource dataSource) { setLastNodeColumn("last_node"); setAccessTimeColumn("access_time"); setLastAccessTimeColumn("last_access_time"); - setCreateTimeColumn("create_time"); + setCreateTimeColumn("session_create_time"); setCookieTimeColumn("cookie_time"); setLastSavedTimeColumn("last_save_time"); setExpiryTimeColumn("expiry_time"); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java index 1163960f54..e59a40a545 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java @@ -18,9 +18,9 @@ package org.apache.amoro.server.persistence; -import org.apache.amoro.optimizing.RewriteFilesInput; +import org.apache.amoro.optimizing.BaseOptimizingInput; import org.apache.amoro.optimizing.RewriteFilesOutput; -import org.apache.amoro.optimizing.RewriteStageTask; +import org.apache.amoro.process.StagedTaskDescriptor; import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper; import org.apache.amoro.server.utils.CompressUtil; @@ -32,12 +32,40 @@ import java.util.Map; import java.util.stream.Collectors; +/** + * Static facade for persisting and reloading the per-task {@code OptimizingInput} blob bound to an + * {@code OptimizingProcess}. + * + *

Storage shape is intentionally format-agnostic: we serialize a {@code Map} via {@link SerializationUtil#simpleSerialize(Object)} (Java serialization) + * and gzip it through {@link CompressUtil}. Any {@link BaseOptimizingInput} subclass that is {@code + * Serializable} round-trips without this layer needing to know the concrete type — concretely, both + * the Iceberg {@code RewriteFilesInput} and Paimon {@code PaimonCompactionInput} are covered. + * + *

The reason the load-side return type is {@link BaseOptimizingInput} (rather than a format + * specific subclass) is recovery: {@code OptimizingQueue.TableOptimizingProcess#loadTaskRuntimes} + * pushes each input into {@link StagedTaskDescriptor#setInput(Object)} whose signature is erased to + * {@code I}. The caller and the descriptor instance (resolved by {@code + * TaskDescriptorTypeConverter}) agree on the concrete subclass at runtime; this layer only + * guarantees we do not downcast prematurely. + */ public class TaskFilesPersistence { private static final DatabasePersistence persistence = new DatabasePersistence(); + /** + * Persist the per-task {@code OptimizingInput} map for one process, keyed by {@code taskId}. + * + *

Accepts any {@link TaskRuntime} whose descriptor declares a {@link BaseOptimizingInput} + * subclass — so Iceberg {@code RewriteStageTask} / {@code RewriteFilesInput} and Paimon {@code + * PaimonCompactionTask} / {@code PaimonCompactionInput} both flow through unchanged. + */ public static void persistTaskInputs( - long processId, Collection> tasks) { + long processId, + Collection< + ? extends + TaskRuntime>> + tasks) { persistence.persistTaskInputs( processId, tasks.stream() @@ -46,7 +74,16 @@ public static void persistTaskInputs( e -> e.getTaskId().getTaskId(), task -> task.getTaskDescriptor().getInput()))); } - public static Map loadTaskInputs(long processId) { + /** + * Load the per-task inputs previously persisted for {@code processId}. Returns an empty map if + * the row is missing or carries no blob. + * + *

Each value is a {@link BaseOptimizingInput} subclass; the concrete type is whatever {@code + * persistTaskInputs} wrote (Java serialization preserves the runtime class). Callers that need + * the format-specific fields are expected to obtain the matching descriptor via {@code + * TaskDescriptorTypeConverter} and route through {@link StagedTaskDescriptor#setInput}. + */ + public static Map loadTaskInputs(long processId) { List bytes = persistence.getAs( OptimizingProcessMapper.class, mapper -> mapper.selectProcessInputFiles(processId)); @@ -63,7 +100,7 @@ public static RewriteFilesOutput loadTaskOutput(byte[] content) { private static class DatabasePersistence extends PersistentBase { - public void persistTaskInputs(long processId, Map tasks) { + public void persistTaskInputs(long processId, Map tasks) { doAs( OptimizingProcessMapper.class, mapper -> mapper.updateProcessInputFiles(processId, tasks)); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorRecoveryTypes.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorRecoveryTypes.java new file mode 100644 index 0000000000..ca2f6a9142 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorRecoveryTypes.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.amoro.TableFormat; +import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionExecutorFactory; +import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionInput; +import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionOutput; +import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionTask; +import org.apache.amoro.formats.paimon.optimizing.PaimonMetricsSummary; +import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionExecutorFactory; +import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionInput; +import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionOutput; +import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionTask; +import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory; +import org.apache.amoro.optimizing.BaseOptimizingInput; +import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory; +import org.apache.amoro.optimizing.MetricsSummary; +import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory; +import org.apache.amoro.optimizing.RewriteFilesInput; +import org.apache.amoro.optimizing.RewriteFilesOutput; +import org.apache.amoro.optimizing.RewriteStageTask; +import org.apache.amoro.optimizing.TaskProperties; +import org.apache.amoro.process.StagedTaskDescriptor; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Registry for restoring persisted optimizing task descriptors and their typed payloads. + * + *

Rows without {@link TaskProperties#TASK_EXECUTOR_FACTORY_IMPL} are legacy Iceberg rows and + * intentionally fall back to {@link RewriteStageTask}. Rows with an unknown factory fail fast so a + * new lake format cannot be silently restored with the wrong descriptor, input, output, or summary + * class. + */ +public final class TaskDescriptorRecoveryTypes { + + private static final Gson GSON = new Gson(); + private static final RecoveryTypes LEGACY_ICEBERG_TYPES = + new RecoveryTypes( + RewriteStageTask.class, + RewriteFilesInput.class, + RewriteFilesOutput.class, + MetricsSummary.class); + private static final Map FACTORY_IMPL_TO_RECOVERY_TYPES; + + static { + Map mappings = new HashMap<>(); + mappings.put(IcebergRewriteExecutorFactory.class.getName(), LEGACY_ICEBERG_TYPES); + mappings.put(MixedIcebergRewriteExecutorFactory.class.getName(), LEGACY_ICEBERG_TYPES); + mappings.put(MixedHiveRewriteExecutorFactory.class.getName(), LEGACY_ICEBERG_TYPES); + mappings.put( + PaimonCompactionExecutorFactory.class.getName(), + new RecoveryTypes( + PaimonCompactionTask.class, + PaimonCompactionInput.class, + PaimonCompactionOutput.class, + PaimonMetricsSummary.class)); + mappings.put( + PaimonPrimaryKeyCompactionExecutorFactory.class.getName(), + new RecoveryTypes( + PaimonPrimaryKeyCompactionTask.class, + PaimonPrimaryKeyCompactionInput.class, + PaimonPrimaryKeyCompactionOutput.class, + PaimonMetricsSummary.class)); + FACTORY_IMPL_TO_RECOVERY_TYPES = Collections.unmodifiableMap(mappings); + } + + private TaskDescriptorRecoveryTypes() {} + + public static RecoveryTypes resolve(String factoryImpl) { + if (factoryImpl == null || factoryImpl.isEmpty()) { + return LEGACY_ICEBERG_TYPES; + } + RecoveryTypes types = FACTORY_IMPL_TO_RECOVERY_TYPES.get(factoryImpl); + if (types == null) { + throw new IllegalArgumentException( + "Unknown " + + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL + + " = " + + factoryImpl + + "; register it in TaskDescriptorRecoveryTypes"); + } + return types; + } + + public static void validateRecoveredTask( + StagedTaskDescriptor descriptor, BaseOptimizingInput input) { + validateRecoveredTask(descriptor, input, null); + } + + public static void validateRecoveredTask( + StagedTaskDescriptor descriptor, + BaseOptimizingInput input, + TableFormat tableFormat) { + if (descriptor == null) { + throw new IllegalArgumentException("Recovered task descriptor must not be null"); + } + String factoryImpl = factoryImpl(descriptor.getProperties()); + if ((factoryImpl == null || factoryImpl.isEmpty()) + && !allowsLegacyMissingFactory(tableFormat)) { + throw new IllegalArgumentException( + "Recovered task is missing " + + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL + + " for table format " + + tableFormat); + } + RecoveryTypes types = resolve(factoryImpl); + validateType("descriptor", descriptor, types.descriptorClass()); + validateType("input", input, types.inputClass()); + validateType("output", descriptor.getOutput(), types.outputClass()); + validateType("summary", descriptor.getSummary(), types.summaryClass()); + } + + static Map>> descriptorMappings() { + Map>> mappings = new HashMap<>(); + FACTORY_IMPL_TO_RECOVERY_TYPES.forEach( + (factoryImpl, types) -> mappings.put(factoryImpl, types.descriptorClass())); + return Collections.unmodifiableMap(mappings); + } + + static String readFactoryImpl(ResultSet rs) throws SQLException { + String raw; + try { + raw = rs.getString("properties"); + } catch (SQLException e) { + return null; + } + if (raw == null || raw.isEmpty()) { + return null; + } + Map props; + try { + props = GSON.fromJson(raw, new TypeToken>() {}.getType()); + } catch (RuntimeException e) { + return null; + } + return factoryImpl(props); + } + + private static String factoryImpl(Map properties) { + if (properties == null) { + return null; + } + return properties.get(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL); + } + + private static boolean allowsLegacyMissingFactory(TableFormat tableFormat) { + return tableFormat == null + || TableFormat.ICEBERG.equals(tableFormat) + || TableFormat.MIXED_ICEBERG.equals(tableFormat) + || TableFormat.MIXED_HIVE.equals(tableFormat); + } + + private static void validateType(String name, Object value, Class expectedClass) { + if (value == null || expectedClass.isInstance(value)) { + return; + } + throw new IllegalArgumentException( + "Recovered task " + + name + + " type mismatch, expected " + + expectedClass.getName() + + " but got " + + value.getClass().getName()); + } + + public static final class RecoveryTypes { + private final Class> descriptorClass; + private final Class inputClass; + private final Class outputClass; + private final Class summaryClass; + + private RecoveryTypes( + Class> descriptorClass, + Class inputClass, + Class outputClass, + Class summaryClass) { + this.descriptorClass = descriptorClass; + this.inputClass = inputClass; + this.outputClass = outputClass; + this.summaryClass = summaryClass; + } + + public Class> descriptorClass() { + return descriptorClass; + } + + public Class inputClass() { + return inputClass; + } + + public Class outputClass() { + return outputClass; + } + + public Class summaryClass() { + return summaryClass; + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorSummaryConverter.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorSummaryConverter.java new file mode 100644 index 0000000000..62c2d99c4f --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorSummaryConverter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.TypeHandler; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** Restores {@code task_runtime.metrics_summary} into the descriptor-specific summary class. */ +public class TaskDescriptorSummaryConverter implements TypeHandler { + + private static final ObjectMapper MAPPER = + new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + @Override + public void setParameter(PreparedStatement ps, int i, Object parameter, JdbcType jdbcType) + throws SQLException { + if (parameter == null) { + ps.setString(i, null); + return; + } + try { + ps.setString(i, MAPPER.writeValueAsString(parameter)); + } catch (JsonProcessingException e) { + throw new SQLException("Error converting task descriptor summary to JSON string", e); + } + } + + @Override + public Object getResult(ResultSet rs, String columnName) throws SQLException { + return parse(rs.getString(columnName), TaskDescriptorRecoveryTypes.readFactoryImpl(rs)); + } + + @Override + public Object getResult(ResultSet rs, int columnIndex) throws SQLException { + return parse(rs.getString(columnIndex), TaskDescriptorRecoveryTypes.readFactoryImpl(rs)); + } + + @Override + public Object getResult(CallableStatement cs, int columnIndex) { + throw new UnsupportedOperationException( + "TaskDescriptorSummaryConverter.getResult(CallableStatement) is not supported. " + + "ResultSet-based mapping should be used instead."); + } + + private Object parse(String jsonString, String factoryImpl) throws SQLException { + if (jsonString == null || jsonString.trim().isEmpty()) { + return null; + } + Class summaryClass = TaskDescriptorRecoveryTypes.resolve(factoryImpl).summaryClass(); + try { + return MAPPER.readValue(jsonString, summaryClass); + } catch (JsonProcessingException e) { + throw new SQLException("Error parsing task descriptor summary JSON string", e); + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorTypeConverter.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorTypeConverter.java index 143ad849ae..6a1a641f74 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorTypeConverter.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorTypeConverter.java @@ -19,6 +19,7 @@ package org.apache.amoro.server.persistence.converter; import org.apache.amoro.optimizing.RewriteStageTask; +import org.apache.amoro.optimizing.TaskProperties; import org.apache.amoro.process.StagedTaskDescriptor; import org.apache.ibatis.type.JdbcType; import org.apache.ibatis.type.TypeHandler; @@ -27,10 +28,33 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Map; -// TODO : support more types of stage +/** + * Routes {@code task_runtime} rows to the correct {@link StagedTaskDescriptor} subclass based on + * the task's {@link TaskProperties#TASK_EXECUTOR_FACTORY_IMPL} property. + * + *

The routing table is explicit and fail-fast: an unknown factory class name raises {@link + * IllegalArgumentException} rather than silently downgrading to {@link RewriteStageTask}. Rows + * whose {@code properties} column is null or missing the key default to {@link RewriteStageTask} + * for backwards compatibility with legacy Iceberg rows produced before the multi-format refactor. + * + *

New formats register themselves in {@link TaskDescriptorRecoveryTypes}. We deliberately avoid + * {@code ServiceLoader}: the mapping is small, bounded, and benefits from compile-time class + * references so that renames in downstream modules break this file's build rather than surface as + * NullPointer at runtime. + */ public class TaskDescriptorTypeConverter implements TypeHandler> { + /** + * Factory-impl class name → descriptor class. + * + *

Mixed (Iceberg/Hive) tables share {@link RewriteStageTask} with pure Iceberg — only the + * executor differs, the descriptor/input/output types are identical. + */ + static final Map>> + FACTORY_IMPL_TO_TASK_CLASS = TaskDescriptorRecoveryTypes.descriptorMappings(); + @Override public void setParameter( PreparedStatement ps, int i, StagedTaskDescriptor parameter, JdbcType jdbcType) @@ -39,18 +63,43 @@ public void setParameter( @Override public StagedTaskDescriptor getResult(ResultSet rs, String columnName) throws SQLException { - return new RewriteStageTask(); + return instantiate(TaskDescriptorRecoveryTypes.readFactoryImpl(rs)); } @Override public StagedTaskDescriptor getResult(ResultSet rs, int columnIndex) throws SQLException { - return new RewriteStageTask(); + return instantiate(TaskDescriptorRecoveryTypes.readFactoryImpl(rs)); } @Override public StagedTaskDescriptor getResult(CallableStatement cs, int columnIndex) throws SQLException { - return new RewriteStageTask(); + throw new UnsupportedOperationException( + "TaskDescriptorTypeConverter.getResult(CallableStatement) is not supported. " + + "ResultSet-based mapping should be used instead."); + } + + private static StagedTaskDescriptor instantiate(String factoryImpl) { + if (factoryImpl == null || factoryImpl.isEmpty()) { + // Backwards-compat with pre-refactor rows: default to Iceberg's RewriteStageTask. + return new RewriteStageTask(); + } + Class> clazz = + FACTORY_IMPL_TO_TASK_CLASS.get(factoryImpl); + if (clazz == null) { + throw new IllegalArgumentException( + "Unknown " + + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL + + " = " + + factoryImpl + + "; register it in TaskDescriptorRecoveryTypes"); + } + try { + return clazz.getDeclaredConstructor().newInstance(); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException( + "Failed to instantiate task descriptor " + clazz.getName() + " for " + factoryImpl, e); + } } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java index db0919b20e..25678a5e6c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingProcessMapper.java @@ -18,8 +18,7 @@ package org.apache.amoro.server.persistence.mapper; -import org.apache.amoro.optimizing.RewriteFilesInput; -import org.apache.amoro.optimizing.RewriteStageTask; +import org.apache.amoro.optimizing.BaseOptimizingInput; import org.apache.amoro.process.StagedTaskDescriptor; import org.apache.amoro.server.optimizing.OptimizingTaskMeta; import org.apache.amoro.server.optimizing.TaskRuntime; @@ -29,6 +28,7 @@ import org.apache.amoro.server.persistence.converter.Map2StringConverter; import org.apache.amoro.server.persistence.converter.MapLong2StringConverter; import org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert; +import org.apache.amoro.server.persistence.converter.TaskDescriptorSummaryConverter; import org.apache.amoro.server.persistence.converter.TaskDescriptorTypeConverter; import org.apache.amoro.server.persistence.extension.InListExtendedLanguageDriver; import org.apache.ibatis.annotations.Delete; @@ -75,7 +75,7 @@ void insertInternalProcessState( + " typeHandler=org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert}" + " WHERE process_id = #{processId}") void updateProcessInputFiles( - @Param("processId") long processId, @Param("input") Map input); + @Param("processId") long processId, @Param("input") Map input); @Select( "SELECT target_snapshot_id, target_change_snapshot_id, from_sequence, to_sequence " @@ -109,13 +109,26 @@ void updateProcessInputFiles( + " #{taskRuntime.token, jdbcType=VARCHAR}, #{taskRuntime.threadId, " + "jdbcType=INTEGER}, #{taskRuntime.taskDescriptor.output, jdbcType=BLOB, " + " typeHandler=org.apache.amoro.server.persistence.converter.Object2ByteArrayConvert}," - + " #{taskRuntime.taskDescriptor.summary, typeHandler=org.apache.amoro.server.persistence.converter.JsonObjectConverter}," + + " #{taskRuntime.taskDescriptor.summary, typeHandler=org.apache.amoro.server.persistence.converter.TaskDescriptorSummaryConverter}," + "#{taskRuntime.taskDescriptor.properties, typeHandler=org.apache.amoro.server.persistence.converter.Map2StringConverter})", "", "" }) - void insertTaskRuntimes(@Param("taskRuntimes") List> taskRuntimes); + void insertTaskRuntimes( + @Param("taskRuntimes") + List>> + taskRuntimes); + /** + * Load task_runtime rows for a given process, hydrating each row's descriptor via {@link + * TaskDescriptorTypeConverter} (which routes by {@code TASK_EXECUTOR_FACTORY_IMPL} — see C3). + * + *

Return type is wildcarded at {@link BaseOptimizingInput} so that both Iceberg {@code + * RewriteStageTask} and Paimon {@code PaimonCompactionTask} rows can be recovered through the + * same call. Callers that need to push a loaded {@link BaseOptimizingInput} back into the + * descriptor via {@link StagedTaskDescriptor#setInput} can do so without a format-specific + * downcast because erasure of {@code setInput(I)} is {@code setInput(Object)}. + */ @Select( "SELECT process_id, task_id, 'executing' as stage, retry_num, table_id, partition_data, create_time, start_time, end_time," + " status, fail_reason, optimizer_token, thread_id, rewrite_output, metrics_summary, properties FROM " @@ -143,14 +156,14 @@ void updateProcessInputFiles( @Result( property = "taskDescriptor.summary", column = "metrics_summary", - typeHandler = JsonObjectConverter.class), + typeHandler = TaskDescriptorSummaryConverter.class), @Result( property = "taskDescriptor.properties", column = "properties", typeHandler = Map2StringConverter.class) }) - List> selectTaskRuntimes( - @Param("table_id") long tableId, @Param("process_id") long processId); + List>> + selectTaskRuntimes(@Param("table_id") long tableId, @Param("process_id") long processId); @Select( " + + + + diff --git a/amoro-web/src/views/tables/components/Optimizing.vue b/amoro-web/src/views/tables/components/Optimizing.vue index c7ccc0b506..ea1bde1782 100644 --- a/amoro-web/src/views/tables/components/Optimizing.vue +++ b/amoro-web/src/views/tables/components/Optimizing.vue @@ -81,6 +81,7 @@ const cancelDisabled = ref(true) const writable = ref(canManageTable()) const pagination = reactive(usePagination()) const breadcrumbPagination = reactive(usePagination()) +const lastSnapshot = ref(null) // Store last snapshot ID for cursor-based pagination const route = useRoute() const query = route.query const sourceData = reactive({ @@ -113,9 +114,18 @@ async function refreshOptimizingProcesses() { status: statusType.value || '', page: pagination.current, pageSize: pagination.pageSize, + lastSnapshot: lastSnapshot.value, } as any) const { list, total = 0 } = result pagination.total = total + + // Update lastSnapshot with the last item's processId from current page + if (list && list.length > 0) { + lastSnapshot.value = list[list.length - 1].processId?.toString() || null + } else { + lastSnapshot.value = null + } + dataSource.push(...[...list || []].map((item) => { const { inputFiles = {}, outputFiles = {} } = item return { @@ -171,6 +181,19 @@ function change({ current = 1, pageSize = 25 }) { breadcrumbPagination.pageSize = pageSize } else { + const prevCurrent = pagination.current + // Reset cursor-based pagination when: + // - returning to first page + // - page size changed + // - non-sequential jump (user skipped pages or went backward) + // Cursor (lastSnapshot) is only valid for a sequential prev → prev+1 step. + if ( + current === 1 + || pageSize !== pagination.pageSize + || current !== prevCurrent + 1 + ) { + lastSnapshot.value = null + } pagination.current = current if (pageSize !== pagination.pageSize) { pagination.current = 1 @@ -220,12 +243,23 @@ async function refreshOptimizingTasks() { } } +function handleFilterChange() { + // Reset cursor-based pagination when filters change + lastSnapshot.value = null + pagination.current = 1 + refresh() +} + function toggleBreadcrumb(rowProcessId: number, status: string) { processId.value = rowProcessId cancelDisabled.value = status !== 'RUNNING' hasBreadcrumb.value = !hasBreadcrumb.value if (hasBreadcrumb.value) { breadcrumbPagination.current = 1 + } else { + // Reset cursor-based pagination when returning to main view + lastSnapshot.value = null + pagination.current = 1 } refresh() } diff --git a/amoro-web/src/views/tables/components/Process.vue b/amoro-web/src/views/tables/components/Process.vue new file mode 100644 index 0000000000..1ee9b1d495 --- /dev/null +++ b/amoro-web/src/views/tables/components/Process.vue @@ -0,0 +1,294 @@ + + + + + + + + + diff --git a/amoro-web/src/views/tables/components/Snapshots.vue b/amoro-web/src/views/tables/components/Snapshots.vue index e40c2e890e..cd592bf085 100644 --- a/amoro-web/src/views/tables/components/Snapshots.vue +++ b/amoro-web/src/views/tables/components/Snapshots.vue @@ -54,6 +54,7 @@ const snapshotId = ref('') const loading = ref(false) const pagination = reactive(usePagination()) const breadcrumbPagination = reactive(usePagination()) +const lastSnapshot = ref(null) // Store last snapshot ID for cursor-based pagination const route = useRoute() const query = route.query const sourceData = reactive({ @@ -72,6 +73,9 @@ const operation = ref('') function onRefChange(params: { ref: string, operation: string }) { tblRef.value = params.ref operation.value = params.operation + // Reset cursor-based pagination when ref or operation changes + lastSnapshot.value = null + pagination.current = 1 getTableInfo() } function onConsumerChange(params: { @@ -100,6 +104,7 @@ async function getTableInfo() { operation: operation.value, page: pagination.current, pageSize: pagination.pageSize, + lastSnapshot: lastSnapshot.value, }) const { list = [], total } = result const rcData: ILineChartOriginalData = {} @@ -115,6 +120,14 @@ async function getTableInfo() { p.commitTime = p.commitTime ? dateFormat(p.commitTime) : '-' dataSource.push(p) }) + + // Update lastSnapshot with the last item's snapshotId from current page for cursor-based pagination + if (list && list.length > 0) { + lastSnapshot.value = list[list.length - 1].snapshotId || null + } else { + lastSnapshot.value = null + } + recordChartOption.value = generateLineChartOption(t('recordChartTitle'), rcData) fileChartOption.value = generateLineChartOption(t('fileChartTitle'), fcData) pagination.total = total @@ -135,6 +148,19 @@ function change({ current = 1, pageSize = 25 }) { breadcrumbPagination.pageSize = pageSize } else { + const prevCurrent = pagination.current + // Reset cursor-based pagination when: + // - returning to first page + // - page size changed + // - non-sequential jump (user skipped pages or went backward) + // Cursor (lastSnapshot) is only valid for a sequential prev → prev+1 step. + if ( + current === 1 + || pageSize !== pagination.pageSize + || current !== prevCurrent + 1 + ) { + lastSnapshot.value = null + } pagination.current = current if (pageSize !== pagination.pageSize) { pagination.current = 1 @@ -185,6 +211,10 @@ function toggleBreadcrumb(record: SnapshotItem) { if (hasBreadcrumb.value) { breadcrumbPagination.current = 1 getBreadcrumbTable() + } else { + // Reset cursor-based pagination when returning to main view + lastSnapshot.value = null + pagination.current = 1 } } diff --git a/amoro-web/src/views/tables/components/TableExplorer.vue b/amoro-web/src/views/tables/components/TableExplorer.vue index 96740e9de7..c302a4165e 100755 --- a/amoro-web/src/views/tables/components/TableExplorer.vue +++ b/amoro-web/src/views/tables/components/TableExplorer.vue @@ -20,6 +20,7 @@ limitations under the License. import { computed, onBeforeMount, reactive, watch } from 'vue' import { useRoute, useRouter } from 'vue-router' import { getCatalogList, getDatabaseList, getTableList } from '@/services/table.service' +import { tableTypeIconMap } from '@/types/common.type' import type { ICatalogItem } from '@/types/common.type' // Node types: Catalog / Database / Table @@ -235,11 +236,11 @@ function handleSelectTable(catalog: string, db: string, tableName: string, table catalog, database: db, tableName, + type, })) - const path = type === 'HIVE' ? '/hive-tables' : '/tables' const pathQuery = { - path, + path: '/tables', query: { catalog, db, @@ -334,7 +335,7 @@ function getNodeIcon(node: TreeNode) { if (node.nodeType === 'database') { return 'database' } - return 'tables' + return tableTypeIconMap[node.tableType as keyof typeof tableTypeIconMap] || 'tables' } function normalizeKeyword(raw: string) { @@ -601,7 +602,6 @@ onBeforeMount(async () => { No results.. - diff --git a/amoro-web/src/views/tables/index.vue b/amoro-web/src/views/tables/index.vue index f3704dbe23..da0de2884c 100644 --- a/amoro-web/src/views/tables/index.vue +++ b/amoro-web/src/views/tables/index.vue @@ -24,8 +24,10 @@ import UFiles from './components/Files.vue' import UOperations from './components/Operations.vue' import USnapshots from './components/Snapshots.vue' import UOptimizing from './components/Optimizing.vue' +import UProcess from './components/Process.vue' import UHealthScore from './components/HealthScoreDetails.vue' import TableExplorer from './components/TableExplorer.vue' +import HiveDetails from './components/HiveDetails.vue' import useStore from '@/store/index' import type { IBaseDetailInfo } from '@/types/common.type' import { usePageScroll } from '@/hooks/usePageScroll' @@ -38,8 +40,10 @@ export default defineComponent({ UOperations, USnapshots, UOptimizing, + UProcess, UHealthScore, TableExplorer, + HiveDetails, }, setup() { const router = useRouter() @@ -107,6 +111,7 @@ export default defineComponent({ { key: 'Snapshots', label: 'snapshots' }, { key: 'Optimizing', label: 'optimizing' }, { key: 'Operations', label: 'operations' }, + { key: 'Process', label: 'process' }, ]) const state = reactive({ @@ -134,6 +139,8 @@ export default defineComponent({ }) const hasSelectedTable = computed(() => !!(route.query?.catalog && route.query?.db && route.query?.table)) + const isHiveTable = computed(() => route.query?.type === 'HIVE') + const isHiveUpgrade = computed(() => route.path.includes('hive-upgrade')) const setBaseDetailInfo = (baseInfo: IBaseDetailInfo & { comment?: string }) => { state.detailLoaded = true @@ -180,9 +187,13 @@ export default defineComponent({ watch( () => route.query, (value, oldVal) => { - const { catalog, db, table } = value - const { catalog: oldCatalog, db: oldDb, table: oldTable } = oldVal - if (`${catalog}${db}${table}` !== `${oldCatalog}${oldDb}${oldTable}`) { + const { catalog, db, table, type } = value + const { catalog: oldCatalog, db: oldDb, table: oldTable, type: oldType } = oldVal + if (isHiveTable.value || isHiveUpgrade.value) { + state.activeKey = (value.tab as string) || 'Details' + return + } + if (`${catalog}${db}${table}${type}` !== `${oldCatalog}${oldDb}${oldTable}${oldType}`) { state.activeKey = 'Details' nextTick(() => { if (detailRef.value) { @@ -198,7 +209,7 @@ export default defineComponent({ watch( hasSelectedTable, (value, oldVal) => { - if (value && !oldVal && detailRef.value) { + if (value && !oldVal && !isHiveTable.value && !isHiveUpgrade.value && detailRef.value) { detailRef.value.getTableDetails() } }, @@ -209,7 +220,7 @@ export default defineComponent({ state.activeKey = (route.query?.tab as string) || 'Details' nextTick(() => { - if (detailRef.value && hasSelectedTable.value) { + if (detailRef.value && hasSelectedTable.value && !isHiveTable.value && !isHiveUpgrade.value) { detailRef.value.getTableDetails() } }) @@ -227,6 +238,8 @@ export default defineComponent({ store, isIceberg, hasSelectedTable, + isHiveTable, + isHiveUpgrade, setBaseDetailInfo, handleTableNotFound, goBack, @@ -240,7 +253,7 @@ export default defineComponent({