diff --git a/.asf.yaml b/.asf.yaml
index bcacbb58d4..3eaeedaed6 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -26,7 +26,6 @@ github:
- lake-house
- iceberg
- paimon
- - hudi
- management-system
- self-optimizing
- flink
diff --git a/.github/workflows/core-hadoop2-ci.yml b/.github/workflows/core-hadoop2-ci.yml
index ad891366c5..4d75245506 100644
--- a/.github/workflows/core-hadoop2-ci.yml
+++ b/.github/workflows/core-hadoop2-ci.yml
@@ -25,7 +25,6 @@ on:
- "amoro-optimizer/**"
- "amoro-format-iceberg/**"
- "amoro-format-paimon/**"
- - "amoro-format-hudi/**"
- "amoro-format-mixed/amoro-mixed-flink/**"
- "amoro-format-mixed/amoro-mixed-hive/**"
- "amoro-format-mixed/amoro-mixed-spark/**"
diff --git a/.github/workflows/core-hadoop3-ci.yml b/.github/workflows/core-hadoop3-ci.yml
index e93a32aa4f..50c2b068ac 100644
--- a/.github/workflows/core-hadoop3-ci.yml
+++ b/.github/workflows/core-hadoop3-ci.yml
@@ -25,7 +25,6 @@ on:
- "amoro-optimizer/**"
- "amoro-format-iceberg/**"
- "amoro-format-paimon/**"
- - "amoro-format-hudi/**"
- "amoro-format-mixed/amoro-mixed-flink/**"
- "amoro-format-mixed/amoro-mixed-hive/**"
- "amoro-format-mixed/amoro-mixed-spark/**"
diff --git a/README.md b/README.md
index e07afe7b0c..ba491057e1 100644
--- a/README.md
+++ b/README.md
@@ -105,7 +105,6 @@ Amoro contains modules as below:
- `amoro-web` is the dashboard frontend for ams
- `amoro-optimizer` provides default optimizer implementation
- `amoro-format-iceberg` contains integration of Apache Iceberg format
-- `amoro-format-hudi` contains integration of Apache Hudi format
- `amoro-format-paimon` contains integration of Apache Paimon format
- `amoro-format-mixed` provides Mixed format implementation
- `amoro-mixed-hive` integrates with Apache Hive and implements Mixed Hive format
@@ -131,7 +130,6 @@ Amoro is built using Maven with JDK 8, 11 and 17(required for `amoro-format-mixe
* Build all modules: `./mvnw clean package -DskipTests -Ptoolchain,build-mixed-format-trino`, besides you need config `toolchains.xml` in `${user.home}/.m2/` dir with content below.
* Build a distribution package with all formats integrated: `./mvnw clean package -Psupport-all-formats`
* Build a distribution package with Apache Paimon format: `./mvnw clean package -Psupport-paimon-format`
- * Build a distribution package with Apache Hudi format: `./mvnw clean package -Psupport-hudi-format`
```
diff --git a/amoro-ams/pom.xml b/amoro-ams/pom.xml
index 34f5591c73..f05b4f0557 100644
--- a/amoro-ams/pom.xml
+++ b/amoro-ams/pom.xml
@@ -243,7 +243,6 @@
mysqlmysql-connector-java
- provided
@@ -359,14 +358,6 @@
-
-
- org.apache.amoro
- amoro-format-mixed-spark-${spark.major.version}_${scala.binary.version}
- ${project.version}
- runtime
-
-
org.apache.amoroamoro-optimizer-standalone
@@ -410,7 +401,6 @@
org.apache.amoroamoro-format-paimon${project.version}
- test
@@ -462,12 +452,14 @@
org.testcontainerstestcontainers
+ 1.20.4testorg.testcontainersjunit-jupiter
+ 1.20.4test
@@ -481,12 +473,14 @@
org.testcontainersk3s
+ 1.20.4testorg.testcontainersmysql
+ 1.20.4test
@@ -517,7 +511,7 @@
com.google.guavaguava
- ${guava-hive.version}
+ ${guava.version}
@@ -616,15 +610,6 @@
-
- support-hudi-format
-
-
- org.apache.amoro
- amoro-format-hudi
-
-
- support-all-formats
@@ -632,10 +617,6 @@
org.apache.amoroamoro-format-paimon
-
- org.apache.amoro
- amoro-format-hudi
-
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 9ac6fb08e2..ccdd864474 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -518,6 +518,13 @@ public class AmoroManagementConf {
.withDescription(
"Allow the table to break the quota limit when the resource is sufficient.");
+ public static final ConfigOption OPTIMIZING_QUEUE_WARMUP_THREAD_COUNT =
+ ConfigOptions.key("self-optimizing.queue-warmup.thread-count")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The number of threads used to warm up optimizing queues during AMS startup.");
+
/** @deprecated Use {@link #PROCESS_HISTORY_DATA_KEEP_TIME} instead. */
@Deprecated
public static final ConfigOption PROCESS_HISTORY_DATA_KEEP_DAYS =
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
index 1e78520f07..d4e94e2234 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConfValidator.java
@@ -81,6 +81,7 @@ public static void validateConfig(Configurations configurations) {
validateThreadCount(configurations, AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT);
validateThreadCount(configurations, AmoroManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT);
+ validateThreadCount(configurations, AmoroManagementConf.OPTIMIZING_QUEUE_WARMUP_THREAD_COUNT);
if (configurations.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) {
validateThreadCount(configurations, AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT);
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 0be364523a..ee1958c614 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -283,7 +283,8 @@ public void startOptimizingService() throws Exception {
optimizerManager,
tableService,
bucketAssignStore,
- haContainer);
+ haContainer,
+ processFactories);
LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index efc1c375b7..a44bb654dd 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -33,6 +33,7 @@
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.exception.PluginRetryAuthException;
+import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceContainer;
import org.apache.amoro.resource.ResourceGroup;
@@ -41,14 +42,18 @@
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
import org.apache.amoro.server.ha.HighAvailabilityContainer;
import org.apache.amoro.server.manager.AbstractOptimizerContainer;
+import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingQueue;
+import org.apache.amoro.server.optimizing.OptimizingQueueWarmupMetrics;
+import org.apache.amoro.server.optimizing.OptimizingQueueWarmupService;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.ProcessFactoryRouter;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.resource.Containers;
import org.apache.amoro.server.resource.OptimizerInstance;
@@ -58,6 +63,7 @@
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableService;
+import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -83,6 +89,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -121,9 +128,11 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final TableService tableService;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;
+ private final OptimizingQueueWarmupService queueWarmupService;
private final BucketAssignStore bucketAssignStore;
private final HighAvailabilityContainer haContainer;
private final boolean isMasterSlaveMode;
+ private final ProcessFactoryRouter router;
public DefaultOptimizingService(
Configurations serviceConfig,
@@ -131,7 +140,29 @@ public DefaultOptimizingService(
OptimizerManager optimizerManager,
TableService tableService,
BucketAssignStore bucketAssignStore,
- HighAvailabilityContainer haContainer) {
+ HighAvailabilityContainer haContainer,
+ List processFactories) {
+ this(
+ serviceConfig,
+ catalogManager,
+ optimizerManager,
+ tableService,
+ bucketAssignStore,
+ haContainer,
+ processFactories,
+ defaultWarmupServiceFactory(serviceConfig));
+ }
+
+ @VisibleForTesting
+ DefaultOptimizingService(
+ Configurations serviceConfig,
+ CatalogManager catalogManager,
+ OptimizerManager optimizerManager,
+ TableService tableService,
+ BucketAssignStore bucketAssignStore,
+ HighAvailabilityContainer haContainer,
+ List processFactories,
+ WarmupServiceFactory warmupServiceFactory) {
this.optimizerTouchTimeout =
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout =
@@ -159,6 +190,9 @@ public DefaultOptimizingService(
this.isMasterSlaveMode =
haContainer != null
&& serviceConfig.getBoolean(AmoroManagementConf.HA_USE_MASTER_SLAVE_MODE);
+ this.router =
+ new ProcessFactoryRouter(
+ Optional.ofNullable(processFactories).orElseGet(Collections::emptyList));
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
@@ -166,31 +200,75 @@ public DefaultOptimizingService(
.setNameFormat("plan-executor-thread-%d")
.setDaemon(true)
.build());
+ OptimizingQueueWarmupMetrics warmupMetrics =
+ new OptimizingQueueWarmupMetrics(MetricManager.getInstance().getGlobalRegistry());
+ warmupMetrics.register();
+ this.queueWarmupService =
+ warmupServiceFactory.create(
+ this::getOptionalQueueByGroup, this::runtimeStillActiveForWarmup, warmupMetrics);
+ LOG.info(
+ "Optimizing router initialised: delegates={} formats={}",
+ router.delegates().stream().map(ProcessFactory::name).collect(Collectors.toList()),
+ router.supportedFormats());
+ }
+
+ @VisibleForTesting
+ interface WarmupServiceFactory {
+ OptimizingQueueWarmupService create(
+ Function> queueSupplier,
+ Predicate runtimeStillActive,
+ OptimizingQueueWarmupMetrics metrics);
+ }
+
+ private static WarmupServiceFactory defaultWarmupServiceFactory(Configurations serviceConfig) {
+ return (queueSupplier, runtimeStillActive, metrics) ->
+ new OptimizingQueueWarmupService(
+ serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_QUEUE_WARMUP_THREAD_COUNT),
+ queueSupplier,
+ runtimeStillActive,
+ metrics);
}
public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}
+ private boolean runtimeStillActiveForWarmup(DefaultTableRuntime runtime) {
+ if (!tableService.isStarted()) {
+ return getOptionalQueueByGroup(runtime.getGroupName()).isPresent();
+ }
+ TableRuntime current = tableService.getRuntime(runtime.getTableIdentifier().getId());
+ return current == runtime && getOptionalQueueByGroup(runtime.getGroupName()).isPresent();
+ }
+
+ @VisibleForTesting
+ boolean awaitQueueWarmupForTest(long timeout, TimeUnit unit) throws InterruptedException {
+ return queueWarmupService.awaitCompletionForTest(timeout, unit);
+ }
+
private void loadOptimizingQueues(List tableRuntimeList) {
List optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map> groupToTableRuntimes =
tableRuntimeList.stream().collect(Collectors.groupingBy(TableRuntime::getGroupName));
+ List warmupRuntimes = new ArrayList<>();
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
- List tableRuntimes = groupToTableRuntimes.remove(groupName);
+ List tableRuntimes =
+ Optional.ofNullable(groupToTableRuntimes.remove(groupName)).orElseGet(ArrayList::new);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
catalogManager,
group,
this,
planExecutor,
- Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
- maxPlanningParallelism);
+ Collections.emptyList(),
+ maxPlanningParallelism,
+ router);
optimizingQueueByGroup.put(groupName, optimizingQueue);
+ warmupRuntimes.addAll(tableRuntimes);
optimizerGroupKeeper.keepInTouch(groupName, 1);
});
optimizers.forEach(optimizer -> registerOptimizer(optimizer, false));
@@ -216,6 +294,7 @@ private void loadOptimizingQueues(List tableRuntimeList) {
tr.completeEmptyProcess();
});
});
+ queueWarmupService.warmupTables(warmupRuntimes);
}
private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) {
@@ -393,6 +472,12 @@ public void deleteOptimizer(String group, String resourceId) {
public void createResourceGroup(ResourceGroup resourceGroup) {
doAsTransaction(
() -> {
+ String groupName = resourceGroup.getName();
+ OptimizingQueue existingQueue = optimizingQueueByGroup.get(groupName);
+ if (existingQueue != null) {
+ existingQueue.updateOptimizerGroup(resourceGroup);
+ return;
+ }
OptimizingQueue optimizingQueue =
new OptimizingQueue(
catalogManager,
@@ -400,8 +485,8 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
this,
planExecutor,
new ArrayList<>(),
- maxPlanningParallelism);
- String groupName = resourceGroup.getName();
+ maxPlanningParallelism,
+ router);
optimizingQueueByGroup.put(groupName, optimizingQueue);
optimizerGroupKeeper.keepInTouch(groupName, 1);
});
@@ -409,7 +494,9 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
public void deleteResourceGroup(String groupName) {
OptimizingQueue optimizingQueue = optimizingQueueByGroup.remove(groupName);
- optimizingQueue.dispose();
+ if (optimizingQueue != null) {
+ optimizingQueue.dispose();
+ }
}
public void updateResourceGroup(ResourceGroup resourceGroup) {
@@ -418,11 +505,22 @@ public void updateResourceGroup(ResourceGroup resourceGroup) {
}
public void dispose() {
+ queueWarmupService.close();
planExecutor.shutdown();
// shutdown sync group first, stop syncing group
optimizingConfigWatcher.dispose();
// dispose all queues
optimizingQueueByGroup.values().forEach(OptimizingQueue::dispose);
+ router
+ .delegates()
+ .forEach(
+ factory -> {
+ try {
+ factory.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing ProcessFactory '{}': {}", factory.name(), e.getMessage());
+ }
+ });
optimizerKeeper.dispose();
optimizerGroupKeeper.dispose();
tableHandlerChain.dispose();
@@ -444,7 +542,8 @@ private class TableRuntimeHandlerImpl extends RuntimeHandlerChain {
@Override
public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) {
DefaultTableRuntime defaultTableRuntime = (DefaultTableRuntime) tableRuntime;
- if (!defaultTableRuntime.getOptimizingStatus().isProcessing()) {
+ OptimizingStatus status = defaultTableRuntime.getOptimizingStatus();
+ if (status == OptimizingStatus.PENDING || status == OptimizingStatus.IDLE) {
getOptionalQueueByGroup(defaultTableRuntime.getGroupName())
.ifPresent(q -> q.refreshTable(defaultTableRuntime));
}
@@ -459,7 +558,7 @@ public void handleConfigChanged(TableRuntime runtime, TableConfiguration origina
getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime));
// If the new group doesn't exist, close the process to avoid the table in limbo(PENDING)
// status.
- if (newQueue.isEmpty()) {
+ if (!newQueue.isPresent()) {
LOG.warn(
"Cannot find the resource group: {}, try to release optimizing process of table {} directly",
tableRuntime.getGroupName(),
@@ -502,7 +601,9 @@ protected void initHandler(List tableRuntimeList) {
}
@Override
- protected void doDispose() {}
+ protected void doDispose() {
+ queueWarmupService.close();
+ }
}
private class OptimizerKeepingTask implements Delayed {
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java
index 70f01ee02b..4d47e1c9d5 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/authorization/CasbinAuthorizationManager.java
@@ -28,6 +28,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
@@ -109,13 +110,12 @@ private static Enforcer createEnforcer() {
}
private static List> readPolicies(String resource) {
- return readResource(resource)
- .lines()
+ return Arrays.stream(readResource(resource).split("\\R"))
.map(String::trim)
.filter(line -> !line.isEmpty() && !line.startsWith("#"))
.map(
line ->
- List.of(line.split("\\s*,\\s*")).stream()
+ Arrays.stream(line.split("\\s*,\\s*"))
.map(String::trim)
.collect(Collectors.toList()))
.collect(Collectors.toList());
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java
index 31c8240cae..ebb5db7a55 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java
@@ -43,11 +43,9 @@ public class CatalogBuilder {
private static final Map> formatSupportedMatrix =
ImmutableMap.of(
CATALOG_TYPE_HADOOP,
- Sets.newHashSet(
- TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON, TableFormat.HUDI),
+ Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON),
CATALOG_TYPE_FILESYSTEM,
- Sets.newHashSet(
- TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON, TableFormat.HUDI),
+ Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.PAIMON),
CATALOG_TYPE_GLUE,
Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG),
CATALOG_TYPE_REST,
@@ -59,8 +57,7 @@ public class CatalogBuilder {
TableFormat.ICEBERG,
TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE,
- TableFormat.PAIMON,
- TableFormat.HUDI),
+ TableFormat.PAIMON),
CATALOG_TYPE_AMS,
Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG));
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
index 2246cf68b7..80921221d6 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java
@@ -52,6 +52,7 @@
import org.apache.amoro.server.dashboard.controller.OptimizerGroupController;
import org.apache.amoro.server.dashboard.controller.OverviewController;
import org.apache.amoro.server.dashboard.controller.PlatformFileInfoController;
+import org.apache.amoro.server.dashboard.controller.ProcessController;
import org.apache.amoro.server.dashboard.controller.SettingController;
import org.apache.amoro.server.dashboard.controller.TableController;
import org.apache.amoro.server.dashboard.controller.TerminalController;
@@ -91,6 +92,7 @@ public class DashboardServer {
private final OptimizerController optimizerController;
private final PlatformFileInfoController platformFileInfoController;
private final SettingController settingController;
+ private final ProcessController processController;
private final TableController tableController;
private final TerminalController terminalController;
private final VersionController versionController;
@@ -122,6 +124,7 @@ public DashboardServer(
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor =
new ServerTableDescriptor(catalogManager, tableManager, serviceConfig);
+ this.processController = new ProcessController(tableManager);
this.tableController =
new TableController(catalogManager, tableManager, tableDescriptor, serviceConfig);
this.terminalController = new TerminalController(terminalManager);
@@ -298,6 +301,9 @@ private EndpointGroup apiGroup() {
post(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/cancel",
tableController::cancelOptimizingProcess);
+ get(
+ "/catalogs/{catalog}/dbs/{db}/tables/{table}/processes",
+ processController::getTableProcesses);
});
get("/upgrade/properties", tableController::getUpgradeHiveTableProperties);
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index 7f3a49d0c5..d7e2bcd3ac 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -241,8 +241,13 @@ private Long snapshotIdOfTableRef(Table table, String ref) {
}
@Override
- public List getSnapshots(
- AmoroTable> amoroTable, String ref, OperationType operationType) {
+ public Pair, Long> getSnapshots(
+ AmoroTable> amoroTable,
+ String ref,
+ OperationType operationType,
+ int limit,
+ int offset,
+ String lastSnapshot) {
MixedTable mixedTable = getTable(amoroTable);
List snapshotsOfTables = new ArrayList<>();
List> tableAndSnapshotIdList = new ArrayList<>();
@@ -262,10 +267,50 @@ public List getSnapshots(
}
tableAndSnapshotIdList.forEach(
tableAndSnapshotId -> collectSnapshots(snapshotsOfTables, tableAndSnapshotId));
- return snapshotsOfTables.stream()
- .filter(s -> validOperationType(s, operationType))
- .sorted((o1, o2) -> Long.compare(o2.getCommitTime(), o1.getCommitTime()))
- .collect(Collectors.toList());
+ List amoroSnapshotsOfTables =
+ snapshotsOfTables.stream()
+ .filter(s -> validOperationType(s, operationType))
+ .sorted((o1, o2) -> Long.compare(o2.getCommitTime(), o1.getCommitTime()))
+ .collect(Collectors.toList());
+ return paginate(amoroSnapshotsOfTables, limit, offset, lastSnapshot);
+ }
+
+ /**
+ * Apply pagination against a pre-filtered, pre-sorted snapshot list.
+ *
+ *
Semantics mirror {@code PaimonTableDescriptor.getSnapshots}: if {@code lastSnapshot} is a
+ * non-empty cursor, skip forward until the matching snapshotId is found and return the next
+ * {@code limit} items; if the cursor is missing (snapshot rolled off), fall back to offset-based
+ * behaviour. Otherwise apply {@code offset} + {@code limit} directly. The total count always
+ * reflects the full filtered list.
+ */
+ static Pair, Long> paginate(
+ List amoroSnapshotsOfTables,
+ int limit,
+ int offset,
+ String lastSnapshot) {
+ int total = amoroSnapshotsOfTables.size();
+ int safeLimit = Math.max(limit, 0);
+ int safeOffset = Math.max(offset, 0);
+ int startIdx = resolveStartIndex(amoroSnapshotsOfTables, lastSnapshot, safeOffset);
+ if (startIdx >= total) {
+ return Pair.of(Collections.emptyList(), (long) total);
+ }
+ int endIdx = Math.min(startIdx + safeLimit, total);
+ return Pair.of(new ArrayList<>(amoroSnapshotsOfTables.subList(startIdx, endIdx)), (long) total);
+ }
+
+ private static int resolveStartIndex(
+ List snapshots, String lastSnapshot, int offset) {
+ if (lastSnapshot != null && !lastSnapshot.isEmpty()) {
+ for (int i = 0; i < snapshots.size(); i++) {
+ if (lastSnapshot.equals(snapshots.get(i).getSnapshotId())) {
+ return i + 1;
+ }
+ }
+ // Cursor not found (snapshot rolled off) — fall back to offset.
+ }
+ return offset;
}
private boolean validOperationType(AmoroSnapshotsOfTable snapshot, OperationType operationType) {
@@ -655,7 +700,12 @@ public List getTableConsumerInfos(AmoroTable> amoroTable) {
@Override
public Pair, Integer> getOptimizingProcessesInfo(
- AmoroTable> amoroTable, String type, ProcessStatus status, int limit, int offset) {
+ AmoroTable> amoroTable,
+ String type,
+ ProcessStatus status,
+ int limit,
+ int offset,
+ String lastSnapshot) {
TableIdentifier tableIdentifier = amoroTable.id();
ServerTableIdentifier identifier =
getAs(
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
index 4e2b1aef8c..6347d3fa13 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
@@ -77,11 +77,17 @@ public ServerTableMeta getTableDetail(TableIdentifier tableIdentifier) {
return formatTableDescriptor.getTableDetail(amoroTable);
}
- public List getSnapshots(
- TableIdentifier tableIdentifier, String ref, OperationType operationType) {
+ public Pair, Long> getSnapshots(
+ TableIdentifier tableIdentifier,
+ String ref,
+ OperationType operationType,
+ int limit,
+ int offset,
+ String lastSnapshot) {
AmoroTable> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
- return formatTableDescriptor.getSnapshots(amoroTable, ref, operationType);
+ return formatTableDescriptor.getSnapshots(
+ amoroTable, ref, operationType, limit, offset, lastSnapshot);
}
public List getSnapshotDetail(
@@ -129,11 +135,16 @@ public List getTableConsumersInfos(TableIdentifier tableIdentifier
}
public Pair, Integer> getOptimizingProcessesInfo(
- TableIdentifier tableIdentifier, String type, ProcessStatus status, int limit, int offset) {
+ TableIdentifier tableIdentifier,
+ String type,
+ ProcessStatus status,
+ int limit,
+ int offset,
+ String lastSnapshot) {
AmoroTable> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
return formatTableDescriptor.getOptimizingProcessesInfo(
- amoroTable, type, status, limit, offset);
+ amoroTable, type, status, limit, offset, lastSnapshot);
}
public List getOptimizingProcessTaskInfos(
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
index d688f9e29e..5380d50813 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
@@ -18,7 +18,6 @@
package org.apache.amoro.server.dashboard.controller;
-import static org.apache.amoro.TableFormat.HUDI;
import static org.apache.amoro.TableFormat.ICEBERG;
import static org.apache.amoro.TableFormat.MIXED_HIVE;
import static org.apache.amoro.TableFormat.MIXED_ICEBERG;
@@ -139,8 +138,6 @@ public class CatalogController {
CatalogDescriptor.of(CATALOG_TYPE_HIVE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_HIVE));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(CATALOG_TYPE_HIVE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, PAIMON));
- VALIDATE_CATALOGS.add(
- CatalogDescriptor.of(CATALOG_TYPE_HIVE, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, HUDI));
VALIDATE_CATALOGS.add(
CatalogDescriptor.of(
CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_ICEBERG));
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/ProcessController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/ProcessController.java
new file mode 100644
index 0000000000..1dbcc06f61
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/ProcessController.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard.controller;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import io.javalin.http.Context;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.server.dashboard.response.OkResponse;
+import org.apache.amoro.server.dashboard.response.PageResult;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.TableProcessMeta;
+import org.apache.amoro.server.table.TableManager;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.table.TableIdentifier;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProcessController extends PersistentBase {
+
+ private final TableManager tableManager;
+
+ public ProcessController(TableManager tableManager) {
+ this.tableManager = tableManager;
+ }
+
+ public void getTableProcesses(Context ctx) {
+ String catalog = ctx.pathParam("catalog");
+ String db = ctx.pathParam("db");
+ String table = ctx.pathParam("table");
+ String processType = ctx.queryParam("type");
+ String status = ctx.queryParam("status");
+ Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
+ Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
+
+ if (StringUtils.isBlank(processType)) {
+ processType = null;
+ }
+
+ Preconditions.checkArgument(page >= 1, "page[%s] must >= 1", page);
+ Preconditions.checkArgument(pageSize > 0, "pageSize[%s] must > 0", pageSize);
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
+ ServerTableIdentifier serverTableIdentifier =
+ tableManager.getServerTableIdentifier(tableIdentifier.buildTableIdentifier());
+ if (serverTableIdentifier == null) {
+ ctx.json(OkResponse.of(PageResult.of(Collections.emptyList(), 0)));
+ return;
+ }
+
+ long tableId = serverTableIdentifier.getId();
+ ProcessStatus processStatus =
+ StringUtils.isBlank(status) ? null : ProcessStatus.valueOf(status);
+
+ int total = 0;
+ List processMetaList = Collections.emptyList();
+ final String finalProcessType = processType;
+ try (Page> ignored = PageHelper.startPage(page, pageSize, true)) {
+ processMetaList =
+ getAs(
+ TableProcessMapper.class,
+ mapper -> mapper.listProcessMeta(tableId, finalProcessType, processStatus));
+ PageInfo pageInfo = new PageInfo<>(processMetaList);
+ total = (int) pageInfo.getTotal();
+ }
+
+ ctx.json(OkResponse.of(PageResult.of(processMetaList, total)));
+ }
+}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
index 8b4acba44a..82e594af92 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
@@ -335,6 +335,7 @@ public void getOptimizingProcesses(Context ctx) {
String status = ctx.queryParam("status");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
+ String lastSnapshot = ctx.queryParam("lastSnapshot");
int offset = (page - 1) * pageSize;
int limit = pageSize;
@@ -346,7 +347,12 @@ public void getOptimizingProcesses(Context ctx) {
StringUtils.isBlank(status) ? null : ProcessStatus.valueOf(status);
Pair, Integer> optimizingProcessesInfo =
tableDescriptor.getOptimizingProcessesInfo(
- tableIdentifier.buildTableIdentifier(), type, processStatus, limit, offset);
+ tableIdentifier.buildTableIdentifier(),
+ type,
+ processStatus,
+ limit,
+ offset,
+ lastSnapshot);
List result = optimizingProcessesInfo.getLeft();
int total = optimizingProcessesInfo.getRight();
@@ -407,16 +413,20 @@ public void getTableSnapshots(Context ctx) {
String operation =
ctx.queryParamAsClass("operation", String.class)
.getOrDefault(OperationType.ALL.displayName());
+ String lastSnapshot = ctx.queryParam("lastSnapshot");
OperationType operationType = OperationType.of(operation);
-
- List snapshotsOfTables =
+ int offset = (page - 1) * pageSize;
+ int limit = pageSize;
+ Pair, Long> snapshotsOfTables =
tableDescriptor.getSnapshots(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier(),
ref,
- operationType);
- int offset = (page - 1) * pageSize;
+ operationType,
+ limit,
+ offset,
+ lastSnapshot);
PageResult pageResult =
- PageResult.of(snapshotsOfTables, offset, pageSize);
+ PageResult.of(snapshotsOfTables.getLeft(), snapshotsOfTables.getRight().intValue());
ctx.json(OkResponse.of(pageResult));
}
@@ -540,8 +550,6 @@ public void getTableList(Context ctx) {
return TableMeta.TableType.PAIMON.toString();
} else if (format.equals(TableFormat.ICEBERG)) {
return TableMeta.TableType.ICEBERG.toString();
- } else if (format.equals(TableFormat.HUDI)) {
- return TableMeta.TableType.HUDI.toString();
} else {
return format.toString();
}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java
index dead0be96b..b70fe01bd8 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java
@@ -66,9 +66,7 @@ public enum TableType {
ARCTIC("arctic"),
HIVE("hive"),
ICEBERG("iceberg"),
- PAIMON("paimon"),
-
- HUDI("hudi");
+ PAIMON("paimon");
private final String name;
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
index 48282f2a9e..bb34a6c81b 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -22,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -54,7 +55,7 @@ public void registerAndElect() throws Exception {}
@Override
public List getAliveNodes() {
- return List.of();
+ return Collections.emptyList();
}
@Override
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java
index c05a853cce..d23decf3ee 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java
@@ -32,13 +32,18 @@
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TableOptimizingCommitter;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.ContentFiles;
+import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
+import org.apache.amoro.utils.TablePropertyUtil;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.exceptions.ValidationException;
@@ -51,15 +56,17 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
-public class KeyedTableCommit extends UnKeyedTableCommit {
+public class KeyedTableCommit extends UnKeyedTableCommit implements TableOptimizingCommitter {
private static final Logger LOG = LoggerFactory.getLogger(KeyedTableCommit.class);
protected MixedTable table;
- protected Collection> tasks;
+ protected Collection tasks;
protected Long fromSnapshotId;
@@ -69,16 +76,36 @@ public class KeyedTableCommit extends UnKeyedTableCommit {
public KeyedTableCommit(
MixedTable table,
- Collection> tasks,
+ Collection tasks,
Long fromSnapshotId,
- StructLikeMap fromSequenceOfPartitions,
- StructLikeMap toSequenceOfPartitions) {
+ Map fromSequence,
+ Map toSequence) {
super(fromSnapshotId, table, tasks);
this.table = table;
this.tasks = tasks;
this.fromSnapshotId = fromSnapshotId == null ? Constants.INVALID_SNAPSHOT_ID : fromSnapshotId;
- this.fromSequenceOfPartitions = fromSequenceOfPartitions;
- this.toSequenceOfPartitions = toSequenceOfPartitions;
+ this.fromSequenceOfPartitions = convertPartitionSequence(table, fromSequence);
+ this.toSequenceOfPartitions = convertPartitionSequence(table, toSequence);
+ }
+
+ private static StructLikeMap convertPartitionSequence(
+ MixedTable table, Map partitionSequence) {
+ PartitionSpec spec = table == null ? null : table.spec();
+ PartitionSpec safeSpec = spec == null ? PartitionSpec.unpartitioned() : spec;
+ StructLikeMap results = StructLikeMap.create(safeSpec.partitionType());
+ if (partitionSequence == null || partitionSequence.isEmpty()) {
+ return results;
+ }
+ partitionSequence.forEach(
+ (partition, sequence) -> {
+ if (safeSpec.isUnpartitioned()) {
+ results.put(TablePropertyUtil.EMPTY_STRUCT, sequence);
+ } else {
+ StructLike partitionData = MixedDataFiles.data(safeSpec, partition);
+ results.put(partitionData, sequence);
+ }
+ });
+ return results;
}
@Override
@@ -94,6 +121,19 @@ public void commit() throws OptimizingCommitException {
tasks.size(),
fromSnapshotId);
+ // Filter tasks with null output to avoid deleting input files of failed tasks
+ List successTasks =
+ tasks.stream().filter(task -> task.getOutput() != null).collect(Collectors.toList());
+
+ if (successTasks.isEmpty()) {
+ LOG.info("No tasks with output to commit for table {}", table.id());
+ return;
+ }
+
+ // Rebuild toSequenceOfPartitions from only success tasks to avoid consuming
+ // change files that belong to failed tasks
+ rebuildToSequenceOfPartitions(successTasks);
+
// In the scene of moving files to hive, the files will be renamed
List hiveNewDataFiles = moveFile2HiveIfNeed();
@@ -105,8 +145,11 @@ public void commit() throws OptimizingCommitException {
StructLikeMap partitionOptimizedSequence =
MixedTableUtil.readOptimizedSequence(table.asKeyedTable());
- for (TaskRuntime taskRuntime : tasks) {
- RewriteFilesInput input = taskRuntime.getTaskDescriptor().getInput();
+ for (RewriteStageTask task : successTasks) {
+ RewriteFilesInput input = task.getInput();
+ if (input == null) {
+ continue;
+ }
StructLike partition = partition(input);
// Check if the partition version has expired
@@ -115,6 +158,7 @@ public void commit() throws OptimizingCommitException {
toSequenceOfPartitions.remove(partition);
continue;
}
+
// Only base data file need to remove
if (input.rewrittenDataFiles() != null) {
Arrays.stream(input.rewrittenDataFiles())
@@ -131,7 +175,7 @@ public void commit() throws OptimizingCommitException {
.forEach(removedDeleteFiles::add);
}
- RewriteFilesOutput output = taskRuntime.getTaskDescriptor().getOutput();
+ RewriteFilesOutput output = task.getOutput();
if (CollectionUtils.isNotEmpty(hiveNewDataFiles)) {
addedDataFiles.addAll(hiveNewDataFiles);
} else if (output.getDataFiles() != null) {
@@ -219,4 +263,25 @@ private boolean fileInPartitionNeedSkip(
private StructLike partition(RewriteFilesInput input) {
return input.allFiles()[0].partition();
}
+
+ private void rebuildToSequenceOfPartitions(List successTasks) {
+ toSequenceOfPartitions.clear();
+ for (RewriteStageTask task : successTasks) {
+ RewriteFilesInput input = task.getInput();
+ if (input == null) {
+ continue;
+ }
+ StructLike partition = partition(input);
+ for (ContentFile> file : input.allFiles()) {
+ if (ContentFiles.isDeleteFile(file)) {
+ continue;
+ }
+ DataFileType type = ((PrimaryKeyedFile) ContentFiles.asDataFile(file)).type();
+ if (type == DataFileType.INSERT_FILE || type == DataFileType.EQ_DELETE_FILE) {
+ long seq = file.dataSequenceNumber();
+ toSequenceOfPartitions.merge(partition, seq, Math::max);
+ }
+ }
+ }
+ }
}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/LegacyExecutorFactoryDefaults.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/LegacyExecutorFactoryDefaults.java
new file mode 100644
index 0000000000..652c737992
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/LegacyExecutorFactoryDefaults.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing;
+
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
+import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
+import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
+import org.apache.amoro.optimizing.TaskProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Resolves the default {@link TaskProperties#TASK_EXECUTOR_FACTORY_IMPL} class name for task rows
+ * persisted before the multi-format refactor (C3), where the property did not yet exist.
+ *
+ *
Pre-0.9 Iceberg rows restored at AMS startup pass through {@link
+ * org.apache.amoro.server.persistence.converter.TaskDescriptorTypeConverter}, which correctly falls
+ * back to {@link org.apache.amoro.optimizing.RewriteStageTask} but does not populate the
+ * missing executor-factory key on the descriptor's {@code properties} map. Once the task reaches
+ * {@code OptimizerExecutor.executeTask(...)} it reads that key to instantiate the executor factory
+ * via reflection and NPEs on the missing entry.
+ *
+ *
This util is the compensation hook: {@link OptimizingQueue}'s DB-restore path calls it per
+ * task runtime and, when non-null, injects the returned class name into the descriptor properties.
+ * For formats whose tasks always carry their own factory impl (e.g. Paimon) and for unknown
+ * formats, we return {@code null} and emit a WARN so operators see the skipped row without taking
+ * the process down.
+ *
+ *
Kept as a pure static util (no state) so it is trivially testable and can never diverge from
+ * {@link TaskDescriptorTypeConverter}'s routing table — both must stay consistent with the default
+ * factory impl each format emits at plan time.
+ */
+public final class LegacyExecutorFactoryDefaults {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LegacyExecutorFactoryDefaults.class);
+
+ public static final String ICEBERG_FACTORY_IMPL = IcebergRewriteExecutorFactory.class.getName();
+ public static final String MIXED_ICEBERG_FACTORY_IMPL =
+ MixedIcebergRewriteExecutorFactory.class.getName();
+ public static final String MIXED_HIVE_FACTORY_IMPL =
+ MixedHiveRewriteExecutorFactory.class.getName();
+
+ private LegacyExecutorFactoryDefaults() {}
+
+ /**
+ * Return the default {@code task-executor-factory-impl} class name for a legacy task row of the
+ * given table format, or {@code null} if the format has no meaningful default (Paimon or
+ * unknown).
+ *
+ *
Callers should treat {@code null} as "skip — the task descriptor must already carry the key,
+ * or the row is unrecoverable". A WARN is logged for null cases so operators can spot stale rows
+ * during restore.
+ */
+ public static String resolveDefaultExecutorFactoryImpl(TableFormat format) {
+ if (format == null) {
+ LOG.warn(
+ "Cannot resolve default {} for legacy task row: table format is null",
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL);
+ return null;
+ }
+ if (TableFormat.ICEBERG.equals(format)) {
+ return ICEBERG_FACTORY_IMPL;
+ }
+ if (TableFormat.MIXED_ICEBERG.equals(format)) {
+ return MIXED_ICEBERG_FACTORY_IMPL;
+ }
+ if (TableFormat.MIXED_HIVE.equals(format)) {
+ return MIXED_HIVE_FACTORY_IMPL;
+ }
+ if (TableFormat.PAIMON.equals(format)) {
+ // Paimon tasks always carry their own factory impl in properties (set by the Paimon planner),
+ // so a missing key here indicates a corrupted row rather than a pre-refactor legacy row.
+ LOG.warn(
+ "Skipping default {} injection for PAIMON: Paimon tasks must carry the key at plan time",
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL);
+ return null;
+ }
+ LOG.warn(
+ "Skipping default {} injection: unknown table format {}",
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
+ format);
+ return null;
+ }
+}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 52eb46b4cc..5713bdc913 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -21,18 +21,23 @@
import org.apache.amoro.AmoroTable;
import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableFormat;
import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.exception.PersistenceException;
import org.apache.amoro.exception.TaskNotFoundException;
+import org.apache.amoro.optimizing.BaseOptimizingInput;
import org.apache.amoro.optimizing.MetricsSummary;
+import org.apache.amoro.optimizing.OptimizingPlanResult;
import org.apache.amoro.optimizing.OptimizingType;
-import org.apache.amoro.optimizing.RewriteFilesInput;
-import org.apache.amoro.optimizing.RewriteStageTask;
-import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
+import org.apache.amoro.optimizing.TableOptimizingCommitter;
+import org.apache.amoro.optimizing.TableOptimizingPlanner;
+import org.apache.amoro.optimizing.TaskMetricsSummary;
+import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.catalog.CatalogManager;
@@ -41,45 +46,42 @@
import org.apache.amoro.server.persistence.OptimizingProcessState;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TaskFilesPersistence;
+import org.apache.amoro.server.persistence.converter.TaskDescriptorRecoveryTypes;
import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
+import org.apache.amoro.server.process.ProcessFactoryRouter;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.server.table.OptimizingOwnerConflictException;
import org.apache.amoro.server.table.blocker.TableBlocker;
-import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
-import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.utils.CompatiblePropertyUtil;
import org.apache.amoro.utils.ExceptionUtil;
-import org.apache.amoro.utils.MixedDataFiles;
-import org.apache.amoro.utils.TablePropertyUtil;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
@@ -91,6 +93,13 @@
public class OptimizingQueue extends PersistentBase {
private static final Logger LOG = LoggerFactory.getLogger(OptimizingQueue.class);
+ private static final long STALE_OWNER_RECYCLE_MILLIS = TimeUnit.MINUTES.toMillis(15);
+ private static final Set ACTIVE_PROCESS_STATUSES =
+ EnumSet.of(
+ ProcessStatus.PENDING,
+ ProcessStatus.SUBMITTED,
+ ProcessStatus.RUNNING,
+ ProcessStatus.CANCELING);
private final QuotaProvider quotaProvider;
private final Queue tableQueue = new LinkedTransferQueue<>();
@@ -98,14 +107,25 @@ public class OptimizingQueue extends PersistentBase {
private final CatalogManager catalogManager;
private final Executor planExecutor;
// Keep all planning table identifiers
- private final Set planningTables = new HashSet<>();
+ private final Set planningTables = ConcurrentHashMap.newKeySet();
private final Lock scheduleLock = new ReentrantLock();
private final Condition planningCompleted = scheduleLock.newCondition();
private final int maxPlanningParallelism;
+ private final Semaphore planningSlots;
private final OptimizerGroupMetrics metrics;
private ResourceGroup optimizerGroup;
+ private final ProcessFactoryRouter router;
private final Map optimizingTasksMap =
new ConcurrentHashMap<>();
+ private final Set warmingTables = ConcurrentHashMap.newKeySet();
+ private final Set warmedTables = ConcurrentHashMap.newKeySet();
+
+ public enum WarmupResult {
+ WARMED,
+ SKIPPED,
+ DUPLICATE,
+ FAILED
+ }
public OptimizingQueue(
CatalogManager catalogManager,
@@ -113,54 +133,85 @@ public OptimizingQueue(
QuotaProvider quotaProvider,
Executor planExecutor,
List tableRuntimeList,
- int maxPlanningParallelism) {
+ int maxPlanningParallelism,
+ ProcessFactoryRouter router) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
this.optimizerGroup = optimizerGroup;
this.quotaProvider = quotaProvider;
this.scheduler = new SchedulingPolicy(optimizerGroup);
this.catalogManager = catalogManager;
+ Preconditions.checkArgument(
+ maxPlanningParallelism >= 0, "Max planning parallelism can not be negative");
this.maxPlanningParallelism = maxPlanningParallelism;
+ this.planningSlots = new Semaphore(this.maxPlanningParallelism);
+ this.router = router;
this.metrics =
new OptimizerGroupMetrics(
optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this);
this.metrics.register();
- tableRuntimeList.forEach(this::initTableRuntime);
}
- private void initTableRuntime(DefaultTableRuntime tableRuntime) {
+ public WarmupResult warmupTable(DefaultTableRuntime tableRuntime) {
+ ServerTableIdentifier identifier = tableRuntime.getTableIdentifier();
+ if (warmedTables.contains(identifier)) {
+ return WarmupResult.DUPLICATE;
+ }
+ if (!warmingTables.add(identifier)) {
+ return WarmupResult.DUPLICATE;
+ }
try {
- TableOptimizingProcess process = loadProcess(tableRuntime);
-
- if (!tableRuntime.getOptimizingConfig().isEnabled()) {
- closeProcessIfRunning(process);
- return;
+ WarmupResult result = doWarmupTable(tableRuntime);
+ if (result == WarmupResult.WARMED || result == WarmupResult.SKIPPED) {
+ warmedTables.add(identifier);
}
+ return result;
+ } catch (Exception e) {
+ LOG.error("Failed to warm up table runtime for table {}, skipping", identifier, e);
+ return WarmupResult.FAILED;
+ } finally {
+ warmingTables.remove(identifier);
+ }
+ }
- tableRuntime.resetTaskQuotas(
- System.currentTimeMillis() - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
+ private WarmupResult doWarmupTable(DefaultTableRuntime tableRuntime) {
+ TableOptimizingProcess process = loadProcess(tableRuntime);
- if (canResumeProcess(process, tableRuntime)) {
- if (process.allTasksPrepared()) {
- LOG.info(
- "All tasks already completed for process {} on table {} during recovery,"
- + " triggering commit",
- process.getProcessId(),
- tableRuntime.getTableIdentifier());
- tableRuntime.beginCommitting();
- } else {
- tableQueue.offer(process);
- }
+ if (!tableRuntime.getOptimizingConfig().isEnabled()) {
+ closeProcessIfRunning(process);
+ return WarmupResult.SKIPPED;
+ }
+
+ if (!isFormatSupported(tableRuntime)) {
+ closeProcessIfRunning(process);
+ return WarmupResult.SKIPPED;
+ }
+
+ tableRuntime.resetTaskQuotas(
+ System.currentTimeMillis() - AmoroServiceConstants.QUOTA_LOOK_BACK_TIME);
+
+ if (canReplayPaimonCommittingProcess(process, tableRuntime)) {
+ LOG.info(
+ "Paimon process {} on table {} is already COMMITTING with completed tasks during"
+ + " recovery, keeping it for commit replay",
+ process.getProcessId(),
+ tableRuntime.getTableIdentifier());
+ } else if (canResumeProcess(process, tableRuntime)) {
+ if (process.allTasksPrepared()) {
+ LOG.info(
+ "All tasks already completed for process {} on table {} during recovery,"
+ + " triggering commit",
+ process.getProcessId(),
+ tableRuntime.getTableIdentifier());
+ tableRuntime.beginCommitting();
} else {
- resetTableForRecovery(process, tableRuntime);
- scheduler.addTable(tableRuntime);
+ tableQueue.offer(process);
}
- } catch (Exception e) {
- LOG.error(
- "Failed to initialize table runtime for table {}, skipping",
- tableRuntime.getTableIdentifier(),
- e);
+ } else {
+ resetTableForRecovery(process, tableRuntime);
+ scheduler.addTable(tableRuntime);
}
+ return WarmupResult.WARMED;
}
private TableOptimizingProcess loadProcess(DefaultTableRuntime tableRuntime) {
@@ -195,6 +246,15 @@ private boolean canResumeProcess(
&& tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING;
}
+ private boolean canReplayPaimonCommittingProcess(
+ TableOptimizingProcess process, DefaultTableRuntime tableRuntime) {
+ return tableRuntime.getFormat() == TableFormat.PAIMON
+ && process != null
+ && process.getStatus() == ProcessStatus.RUNNING
+ && tableRuntime.getOptimizingStatus() == OptimizingStatus.COMMITTING
+ && process.allTasksPrepared();
+ }
+
private void resetTableForRecovery(
TableOptimizingProcess process, DefaultTableRuntime tableRuntime) {
closeProcessIfRunning(process);
@@ -220,9 +280,12 @@ public String getContainerName() {
}
public void refreshTable(DefaultTableRuntime tableRuntime) {
+ if (!isFormatSupported(tableRuntime)) {
+ return;
+ }
if (tableRuntime.getOptimizingConfig().isEnabled()
&& !tableRuntime.getOptimizingStatus().isProcessing()) {
- LOG.info(
+ LOG.debug(
"Bind queue {} success with table {}",
optimizerGroup.getName(),
tableRuntime.getTableIdentifier());
@@ -232,7 +295,21 @@ public void refreshTable(DefaultTableRuntime tableRuntime) {
}
}
+ private boolean isFormatSupported(DefaultTableRuntime tableRuntime) {
+ TableFormat format = tableRuntime.getFormat();
+ if (!router.supportedFormats().contains(format)) {
+ LOG.debug(
+ "Skip table {} with unsupported format {} for queue {}",
+ tableRuntime.getTableIdentifier(),
+ format,
+ optimizerGroup.getName());
+ return false;
+ }
+ return true;
+ }
+
public void releaseTable(DefaultTableRuntime tableRuntime) {
+ clearWarmupState(tableRuntime);
scheduler.removeTable(tableRuntime);
List processList =
tableQueue.stream()
@@ -257,7 +334,12 @@ public TaskRuntime> pollTask(
resetStaleTasksForThread(thread);
long deadline = calculateDeadline(maxWaitTime);
TaskRuntime> task = fetchScheduledTask(thread, true);
- while (task == null && waitTask(deadline)) {
+ while (task == null) {
+ fillPlanningSlots();
+ task = fetchScheduledTask(thread, true);
+ if (task != null || !awaitPlanningCompleted(deadline)) {
+ break;
+ }
task = fetchScheduledTask(thread, true);
}
if (task == null && breakQuotaLimit && planningTables.isEmpty()) {
@@ -275,15 +357,15 @@ private long calculateDeadline(long maxWaitTime) {
return deadline <= 0 ? Long.MAX_VALUE : deadline;
}
- private boolean waitTask(long waitDeadline) {
+ private boolean awaitPlanningCompleted(long waitDeadline) {
scheduleLock.lock();
try {
long currentTime = System.currentTimeMillis();
- scheduleTableIfNecessary(currentTime);
return waitDeadline > currentTime
&& planningCompleted.await(waitDeadline - currentTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.error("Schedule table interrupted", e);
+ Thread.currentThread().interrupt();
return false;
} finally {
scheduleLock.unlock();
@@ -298,12 +380,31 @@ private TaskRuntime> fetchScheduledTask(OptimizerThread thread, boolean needQu
.orElse(null);
}
- private void scheduleTableIfNecessary(long startTime) {
- if (planningTables.size() < maxPlanningParallelism) {
- Set skipTables = new HashSet<>(planningTables);
- skipBlockedTables(skipTables);
- Optional.ofNullable(scheduler.scheduleTable(skipTables))
- .ifPresent(tableRuntime -> triggerAsyncPlanning(tableRuntime, skipTables, startTime));
+ private void fillPlanningSlots() {
+ int availableSlots = planningSlots.availablePermits();
+ if (availableSlots <= 0) {
+ return;
+ }
+
+ Set skipTables = new HashSet<>(planningTables);
+ skipBlockedTables(skipTables);
+
+ List candidates = scheduler.scheduleTables(skipTables, availableSlots);
+ for (DefaultTableRuntime tableRuntime : candidates) {
+ ServerTableIdentifier identifier = tableRuntime.getTableIdentifier();
+ if (!planningSlots.tryAcquire()) {
+ return;
+ }
+ if (!planningTables.add(identifier)) {
+ planningSlots.release();
+ continue;
+ }
+ skipTables.add(identifier);
+ if (!triggerAsyncPlanning(tableRuntime, new HashSet<>(skipTables))) {
+ planningTables.remove(identifier);
+ planningSlots.release();
+ signalPlanningCompleted();
+ }
}
}
@@ -313,7 +414,7 @@ private void skipBlockedTables(Set skipTables) {
TableBlockerMapper.class,
mapper -> mapper.selectAllBlockers(System.currentTimeMillis()));
Map identifierMap = Maps.newHashMap();
- for (ServerTableIdentifier identifier : scheduler.getTableRuntimeMap().keySet()) {
+ for (ServerTableIdentifier identifier : scheduler.tableIdentifiersSnapshot()) {
identifierMap.put(identifier.getIdentifier(), identifier);
}
tableBlockerList.stream()
@@ -327,76 +428,250 @@ private void skipBlockedTables(Set skipTables) {
.forEach(skipTables::add);
}
- private void triggerAsyncPlanning(
- DefaultTableRuntime tableRuntime, Set skipTables, long startTime) {
+ private boolean triggerAsyncPlanning(
+ DefaultTableRuntime tableRuntime, Set skipTables) {
+ long startTime = System.currentTimeMillis();
LOG.info(
"Trigger planning table {} by policy {}",
tableRuntime.getTableIdentifier(),
scheduler.name());
- planningTables.add(tableRuntime.getTableIdentifier());
- CompletableFuture.supplyAsync(() -> planInternal(tableRuntime), planExecutor)
- .whenComplete(
- (process, throwable) -> {
- if (throwable != null) {
- LOG.error("Failed to plan table {}", tableRuntime.getTableIdentifier(), throwable);
- }
- long currentTime = System.currentTimeMillis();
- scheduleLock.lock();
- try {
- tableRuntime.setLastPlanTime(currentTime);
- planningTables.remove(tableRuntime.getTableIdentifier());
- if (process != null) {
- tableQueue.offer(process);
- String skipIds =
- skipTables.stream()
- .map(ServerTableIdentifier::getId)
- .sorted()
- .map(item -> item + "")
- .collect(Collectors.joining(","));
- LOG.info(
- "Completed planning on table {} with {} tasks with a total cost of {} ms, skipping {} tables.",
- tableRuntime.getTableIdentifier(),
- process.getTaskMap().size(),
- currentTime - startTime,
- skipTables.size());
- LOG.debug("Skipped planning table IDs:{}", skipIds);
- } else if (throwable == null) {
- LOG.info(
- "Skipping planning table {} with a total cost of {} ms.",
- tableRuntime.getTableIdentifier(),
- currentTime - startTime);
- }
- planningCompleted.signalAll();
- } finally {
- scheduleLock.unlock();
- }
- });
+ try {
+ CompletableFuture.supplyAsync(() -> planInternal(tableRuntime), planExecutor)
+ .whenComplete(
+ (process, throwable) ->
+ completePlanning(tableRuntime, process, throwable, skipTables, startTime));
+ return true;
+ } catch (RuntimeException e) {
+ LOG.error("Failed to submit planning table {}", tableRuntime.getTableIdentifier(), e);
+ return false;
+ }
+ }
+
+ private void completePlanning(
+ DefaultTableRuntime tableRuntime,
+ TableOptimizingProcess process,
+ Throwable throwable,
+ Set skipTables,
+ long startTime) {
+ long currentTime = System.currentTimeMillis();
+ try {
+ if (throwable != null) {
+ LOG.error("Failed to plan table {}", tableRuntime.getTableIdentifier(), throwable);
+ }
+ if (process != null) {
+ tableQueue.offer(process);
+ String skipIds =
+ skipTables.stream()
+ .map(ServerTableIdentifier::getId)
+ .sorted()
+ .map(String::valueOf)
+ .collect(Collectors.joining(","));
+ LOG.info(
+ "Completed planning on table {} with {} tasks with a total cost of {} ms, skipping {} tables.",
+ tableRuntime.getTableIdentifier(),
+ process.getTaskMap().size(),
+ currentTime - startTime,
+ skipTables.size());
+ LOG.debug("Skipped planning table IDs:{}", skipIds);
+ } else if (throwable == null) {
+ LOG.info(
+ "Skipping planning table {} with a total cost of {} ms.",
+ tableRuntime.getTableIdentifier(),
+ currentTime - startTime);
+ }
+ tableRuntime.setLastPlanTime(currentTime);
+ } finally {
+ planningTables.remove(tableRuntime.getTableIdentifier());
+ planningSlots.release();
+ signalPlanningCompleted();
+ }
+ }
+
+ private void signalPlanningCompleted() {
+ scheduleLock.lock();
+ try {
+ planningCompleted.signalAll();
+ } finally {
+ scheduleLock.unlock();
+ }
+ }
+
+ private boolean prepareOwnerForPlanning(DefaultTableRuntime tableRuntime) {
+ for (int attempts = 0; attempts < 3; attempts++) {
+ long ownerProcessId = tableRuntime.getProcessId();
+ if (ownerProcessId == 0L) {
+ return true;
+ }
+
+ TableProcessMeta processMeta =
+ getAs(TableProcessMapper.class, mapper -> mapper.getProcessMeta(ownerProcessId));
+ if (processMeta == null || !isActiveProcess(processMeta.getStatus())) {
+ if (tableRuntime.normalizeProcessOwner(ownerProcessId)) {
+ LOG.info(
+ "Normalized owner process {} for table {} before planning",
+ ownerProcessId,
+ tableRuntime.getTableIdentifier());
+ return true;
+ }
+ continue;
+ }
+
+ if (shouldRecycleStaleOwner(processMeta)
+ && recycleStaleOwner(tableRuntime, processMeta, ownerProcessId)) {
+ continue;
+ }
+
+ LOG.info(
+ "Skip planning table {} because owner process {} is still active with status {}",
+ tableRuntime.getTableIdentifier(),
+ ownerProcessId,
+ processMeta.getStatus());
+ return false;
+ }
+
+ if (tableRuntime.getProcessId() == 0L) {
+ return true;
+ }
+ LOG.warn(
+ "Skip planning table {} because owner normalization exceeded retry budget",
+ tableRuntime.getTableIdentifier());
+ return false;
+ }
+
+ private boolean shouldRecycleStaleOwner(TableProcessMeta processMeta) {
+ return System.currentTimeMillis() - processMeta.getCreateTime() > STALE_OWNER_RECYCLE_MILLIS;
+ }
+
+ private boolean recycleStaleOwner(
+ DefaultTableRuntime tableRuntime, TableProcessMeta processMeta, long ownerProcessId) {
+ int unfinishedTasks =
+ getAs(
+ OptimizingProcessMapper.class,
+ mapper ->
+ mapper.countUnfinishedTasks(
+ tableRuntime.getTableIdentifier().getId(), ownerProcessId));
+ if (unfinishedTasks > 0) {
+ return false;
+ }
+
+ String staleReason =
+ String.format(
+ "recycled stale optimizing owner process %d after %d ms without unfinished tasks",
+ ownerProcessId, System.currentTimeMillis() - processMeta.getCreateTime());
+ doAsTransaction(
+ () ->
+ doAs(
+ TableProcessMapper.class,
+ mapper ->
+ mapper.updateProcess(
+ tableRuntime.getTableIdentifier().getId(),
+ ownerProcessId,
+ processMeta.getExternalProcessIdentifier(),
+ ProcessStatus.FAILED,
+ tableRuntime.getOptimizingStatus().name().toLowerCase(),
+ processMeta.getRetryNumber(),
+ System.currentTimeMillis(),
+ staleReason,
+ processMeta.getProcessParameters() == null
+ ? new HashMap<>()
+ : processMeta.getProcessParameters(),
+ processMeta.getSummary() == null
+ ? new HashMap<>()
+ : processMeta.getSummary())),
+ () -> {
+ if (!tableRuntime.normalizeProcessOwner(ownerProcessId)) {
+ throw new IllegalStateException(
+ String.format(
+ "failed to clear stale owner process %d for table %s",
+ ownerProcessId, tableRuntime.getTableIdentifier()));
+ }
+ },
+ () ->
+ tableRuntime
+ .store()
+ .begin()
+ .updateStatusCode(code -> OptimizingStatus.PENDING.getCode())
+ .commit());
+ LOG.warn(
+ "Recycled stale owner process {} for table {}",
+ ownerProcessId,
+ tableRuntime.getTableIdentifier());
+ return true;
+ }
+
+ private boolean isActiveProcess(ProcessStatus status) {
+ return ACTIVE_PROCESS_STATUSES.contains(status);
}
private TableOptimizingProcess planInternal(DefaultTableRuntime tableRuntime) {
+ if (!prepareOwnerForPlanning(tableRuntime)) {
+ return null;
+ }
tableRuntime.beginPlanning();
try {
ServerTableIdentifier identifier = tableRuntime.getTableIdentifier();
AmoroTable> table = catalogManager.loadTable(identifier.getIdentifier());
- AbstractOptimizingPlanner planner =
- IcebergTableUtil.createOptimizingPlanner(
- tableRuntime.refresh(table),
- (MixedTable) table.originalTable(),
- getAvailableCore(),
- maxInputSizePerThread());
+ tableRuntime.refresh(table);
+
+ if (!isFormatSupported(tableRuntime)) {
+ tableRuntime.completeEmptyProcess();
+ return null;
+ }
+
+ TableOptimizingPlanner planner =
+ router
+ .forFormat(tableRuntime.getFormat())
+ .createPlanner(tableRuntime, table, getAvailableCore(), maxInputSizePerThread());
if (planner.isNecessary()) {
- return new TableOptimizingProcess(planner, tableRuntime);
+ OptimizingPlanResult planResult = planner.plan();
+ if (planResult.getTasks() == null || planResult.getTasks().isEmpty()) {
+ LOG.info(
+ "Planner {} returned empty tasks despite isNecessary=true; skip creating empty process for {}",
+ planner.getClass().getSimpleName(),
+ tableRuntime.getTableIdentifier());
+ tableRuntime.completeEmptyProcess();
+ return null;
+ }
+ return new TableOptimizingProcess(planResult, tableRuntime);
} else {
tableRuntime.completeEmptyProcess();
return null;
}
} catch (Throwable throwable) {
+ if (containsCause(throwable, OptimizingOwnerConflictException.class)) {
+ long currentOwner = tableRuntime.getProcessId();
+ if (currentOwner == 0L) {
+ // Defensive fallback for rare races where owner changed after beginPlanning but before
+ // exception handling. Keep the table schedulable instead of leaving it in PLANNING.
+ tableRuntime.planFailed();
+ LOG.warn(
+ "Owner conflict observed for table {} but owner is already cleared, reset status to PENDING",
+ tableRuntime.getTableIdentifier());
+ } else {
+ LOG.info(
+ "Skip planning table {} because optimizing owner {} is already held by another process",
+ tableRuntime.getTableIdentifier(),
+ currentOwner);
+ }
+ return null;
+ }
tableRuntime.planFailed();
LOG.error("Planning table {} failed", tableRuntime.getTableIdentifier(), throwable);
throw throwable;
}
}
+ private boolean containsCause(Throwable throwable, Class extends Throwable> causeClass) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (causeClass.isInstance(current)) {
+ return true;
+ }
+ current = current.getCause();
+ }
+ return false;
+ }
+
public void ackTask(OptimizingTaskId taskId, OptimizerThread thread) {
findProcess(taskId).ackTask(taskId, thread);
}
@@ -419,7 +694,7 @@ public List> collectTasks(Predicate> predicate) {
}
public void retryTask(TaskRuntime> taskRuntime) {
- findProcess(taskRuntime.getTaskId()).resetTask((TaskRuntime) taskRuntime);
+ findProcess(taskRuntime.getTaskId()).resetTask(taskRuntime);
}
private void resetStaleTasksForThread(OptimizerThread thread) {
@@ -463,6 +738,8 @@ public void removeOptimizer(OptimizerInstance optimizerInstance) {
}
public void dispose() {
+ warmingTables.clear();
+ warmedTables.clear();
this.metrics.unregister();
}
@@ -490,17 +767,33 @@ SchedulingPolicy getSchedulingPolicy() {
return scheduler;
}
+ private void clearWarmupState(DefaultTableRuntime tableRuntime) {
+ ServerTableIdentifier identifier = tableRuntime.getTableIdentifier();
+ warmingTables.remove(identifier);
+ warmedTables.remove(identifier);
+ }
+
+ @VisibleForTesting
+ boolean containsWarmupStateForTest(ServerTableIdentifier identifier) {
+ return warmingTables.contains(identifier) || warmedTables.contains(identifier);
+ }
+
private class TableOptimizingProcess implements OptimizingProcess {
private final Lock lock = new ReentrantLock();
private final long processId;
+ private final TableFormat format;
private final OptimizingType optimizingType;
private final DefaultTableRuntime tableRuntime;
private final long planTime;
private final long targetSnapshotId;
private final long targetChangeSnapshotId;
- private final Map> taskMap = Maps.newHashMap();
- private final Queue> taskQueue = new LinkedList<>();
+ // Widened to StagedTaskDescriptor,?,?> so non-Iceberg formats (Paimon today, Mixed-Hive
+ // tomorrow) share the same process/queue machinery without unchecked downcasts.
+ private final Map>>
+ taskMap = Maps.newHashMap();
+ private final Queue>> taskQueue =
+ new LinkedList<>();
private volatile ProcessStatus status = ProcessStatus.RUNNING;
private volatile String failedReason;
private long endTime = AmoroServiceConstants.INVALID_TIME;
@@ -539,16 +832,17 @@ public TaskRuntime> poll(OptimizerThread thread, boolean needQuotaChecking) {
}
public TableOptimizingProcess(
- AbstractOptimizingPlanner planner, DefaultTableRuntime tableRuntime) {
- processId = planner.getProcessId();
+ OptimizingPlanResult planResult, DefaultTableRuntime tableRuntime) {
+ processId = planResult.getProcessId();
this.tableRuntime = tableRuntime;
- optimizingType = planner.getOptimizingType();
- planTime = planner.getPlanTime();
- targetSnapshotId = planner.getTargetSnapshotId();
- targetChangeSnapshotId = planner.getTargetChangeSnapshotId();
- loadTaskRuntimes(planner.planTasks());
- fromSequence = planner.getFromSequence();
- toSequence = planner.getToSequence();
+ this.format = tableRuntime.getFormat();
+ optimizingType = planResult.getOptimizingType();
+ planTime = planResult.getPlanTime();
+ targetSnapshotId = planResult.getTargetSnapshotId();
+ targetChangeSnapshotId = planResult.getTargetChangeSnapshotId();
+ loadTaskRuntimes(planResult.getTasks());
+ fromSequence = planResult.getFromSequence();
+ toSequence = planResult.getToSequence();
beginAndPersistProcess();
}
@@ -557,6 +851,7 @@ public TableOptimizingProcess(
TableProcessMeta processMeta,
OptimizingProcessState processState) {
this.tableRuntime = tableRuntime;
+ this.format = tableRuntime.getFormat();
processId = tableRuntime.getProcessId();
optimizingType = OptimizingType.valueOf(processMeta.getProcessType());
targetSnapshotId = processState.getTargetSnapshotId();
@@ -590,6 +885,10 @@ public long getProcessId() {
return processId;
}
+ TableFormat getFormat() {
+ return format;
+ }
+
@Override
public OptimizingType getOptimizingType() {
return optimizingType;
@@ -658,6 +957,10 @@ private void acceptResult(TaskRuntime> taskRuntime) {
}
return v;
});
+ // task cancel means persistAndSetCompleted has been called and start/end may be absent.
+ if (taskRuntime.getStatus() == TaskRuntime.Status.CANCELED) {
+ return;
+ }
try {
tableRuntime.addTaskQuota(taskRuntime.getCurrentQuota());
} catch (Throwable throwable) {
@@ -667,10 +970,6 @@ private void acceptResult(TaskRuntime> taskRuntime) {
taskRuntime.getTaskId(),
throwable);
}
- // task cancel means persistAndSetCompleted has been called
- if (taskRuntime.getStatus() == TaskRuntime.Status.CANCELED) {
- return;
- }
if (isClosed()) {
throw new OptimizingClosedException(processId);
}
@@ -716,11 +1015,17 @@ private void acceptResult(TaskRuntime> taskRuntime) {
}
}
- private void resetTask(TaskRuntime taskRuntime) {
+ private void resetTask(TaskRuntime> taskRuntime) {
lock.lock();
try {
taskRuntime.reset();
- taskQueue.add(taskRuntime);
+ // Cast is safe: only descriptors that already entered the widened taskMap end up here; the
+ // public retryTask(TaskRuntime>) entry point forwards whatever pollTask previously
+ // returned, and pollTask only hands out entries sourced from taskMap itself.
+ @SuppressWarnings("unchecked")
+ TaskRuntime extends StagedTaskDescriptor, ?, ?>> widened =
+ (TaskRuntime extends StagedTaskDescriptor, ?, ?>>) taskRuntime;
+ taskQueue.add(widened);
} finally {
lock.unlock();
}
@@ -759,7 +1064,8 @@ public String getFailedReason() {
return failedReason;
}
- private Map> getTaskMap() {
+ private Map>>
+ getTaskMap() {
return taskMap;
}
@@ -796,18 +1102,12 @@ public int getActualQuota() {
@Override
public void commit() {
- List> successTasks =
+ List>> successTasks =
taskMap.values().stream()
.filter(task -> task.getStatus() == Status.SUCCESS)
.collect(Collectors.toList());
LOG.debug(
- "{} get {} tasks of {} partitions to commit",
- tableRuntime.getTableIdentifier(),
- successTasks.size(),
- successTasks.stream()
- .map(task -> task.getTaskDescriptor().getPartition())
- .distinct()
- .count());
+ "{} get {} tasks to commit", tableRuntime.getTableIdentifier(), successTasks.size());
lock.lock();
try {
@@ -821,6 +1121,15 @@ public void commit() {
}
try {
hasCommitted = true;
+ if (successTasks.isEmpty()) {
+ status = ProcessStatus.FAILED;
+ failedReason =
+ "No successful task is available for commit, stop building committer to avoid"
+ + " invalid commit identity.";
+ endTime = System.currentTimeMillis();
+ persistAndSetCompleted(false);
+ return;
+ }
buildCommit().commit();
if (allTasksPrepared()) {
status = ProcessStatus.SUCCESS;
@@ -851,47 +1160,35 @@ public void commit() {
@Override
public MetricsSummary getSummary() {
- List taskSummaries =
+ // Route through StagedTaskDescriptor.toMetricsSummary() so non-Iceberg descriptors (e.g.
+ // PaimonCompactionTask) contribute to the aggregate without OptimizingQueue needing to know
+ // their concrete type. MetricsSummary#aggregate handles the mixed list.
+ List taskSummaries =
taskMap.values().stream()
.map(TaskRuntime::getTaskDescriptor)
- .map(RewriteStageTask::getSummary)
+ .map(StagedTaskDescriptor::toMetricsSummary)
.collect(Collectors.toList());
- return new MetricsSummary(taskSummaries);
+ return MetricsSummary.aggregate(taskSummaries);
}
- private UnKeyedTableCommit buildCommit() {
- MixedTable table =
- (MixedTable)
- catalogManager
- .loadTable(tableRuntime.getTableIdentifier().getIdentifier())
- .originalTable();
- if (table.isUnkeyedTable()) {
- return new UnKeyedTableCommit(targetSnapshotId, table, taskMap.values());
- } else {
- return new KeyedTableCommit(
- table,
- taskMap.values(),
- targetSnapshotId,
- convertPartitionSequence(table, fromSequence),
- convertPartitionSequence(table, toSequence));
- }
- }
-
- private StructLikeMap convertPartitionSequence(
- MixedTable table, Map partitionSequence) {
- PartitionSpec spec = table.spec();
- StructLikeMap results = StructLikeMap.create(spec.partitionType());
- partitionSequence.forEach(
- (partition, sequence) -> {
- if (spec.isUnpartitioned()) {
- results.put(TablePropertyUtil.EMPTY_STRUCT, sequence);
- } else {
- StructLike partitionData = MixedDataFiles.data(spec, partition);
- results.put(partitionData, sequence);
- }
- });
- return results;
+ private TableOptimizingCommitter buildCommit() {
+ AmoroTable> table =
+ catalogManager.loadTable(tableRuntime.getTableIdentifier().getIdentifier());
+ List extends StagedTaskDescriptor, ?, ?>> taskDescriptors =
+ taskMap.values().stream()
+ .filter(task -> task.getStatus() == Status.SUCCESS)
+ .map(TaskRuntime::getTaskDescriptor)
+ .collect(Collectors.toList());
+ return router
+ .forFormat(format)
+ .createCommitter(
+ table,
+ targetSnapshotId,
+ targetChangeSnapshotId,
+ taskDescriptors,
+ fromSequence,
+ toSequence);
}
private void beginAndPersistProcess() {
@@ -926,8 +1223,8 @@ private void beginAndPersistProcess() {
() ->
doAs(
OptimizingProcessMapper.class,
- mapper -> mapper.insertTaskRuntimes(Lists.newArrayList(taskMap.values()))),
- () -> TaskFilesPersistence.persistTaskInputs(processId, taskMap.values()),
+ mapper -> mapper.insertTaskRuntimes(typedTaskRuntimes())),
+ () -> TaskFilesPersistence.persistTaskInputs(processId, typedTaskRuntimes()),
() -> tableRuntime.beginProcess(this));
}
@@ -953,7 +1250,7 @@ private void persistAndSetCompleted(boolean success) {
getFailedReason(),
new HashMap<>(),
getSummary().summaryAsMap(false))),
- () -> tableRuntime.completeProcess(success),
+ () -> tableRuntime.completeProcess(this, success),
() -> clearProcess(this));
}
@@ -961,21 +1258,108 @@ private void cancelTasks() {
taskMap.values().forEach(TaskRuntime::tryCanceling);
}
+ /**
+ * Narrow the task-map values to the bound expected by {@code insertTaskRuntimes} and {@code
+ * TaskFilesPersistence}. Every descriptor that enters {@code taskMap} extends {@code
+ * StagedTaskDescriptor} where {@code I extends BaseOptimizingInput}, so the unchecked
+ * cast is safe.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private List>>
+ typedTaskRuntimes() {
+ return (List) Lists.newArrayList(taskMap.values());
+ }
+
+ /**
+ * Bind a loaded {@link BaseOptimizingInput} back into its descriptor. The helper exists solely
+ * to "capture" the descriptor's {@code I} type parameter so we can pass the polymorphic input
+ * through {@link StagedTaskDescriptor#setInput} without the caller knowing the concrete
+ * subclass. Type safety is guaranteed by {@code TaskDescriptorTypeConverter} pairing the
+ * descriptor class with the input written by {@code TaskFilesPersistence#persistTaskInputs}.
+ *
+ *
A null {@code input} is passed through unchanged (preserving the pre-C4 behaviour where a
+ * missing entry in the loaded map would null out the descriptor's input — each {@code
+ * calculateSummary} override tolerates this).
+ */
+ private void bindLoadedInput(
+ StagedTaskDescriptor descriptor, BaseOptimizingInput input) {
+ @SuppressWarnings("unchecked")
+ I typedInput = (I) input;
+ descriptor.setInput(typedInput);
+ }
+
+ /**
+ * Back-fill {@link TaskProperties#TASK_EXECUTOR_FACTORY_IMPL} on descriptors restored from
+ * pre-0.9 Iceberg rows whose {@code properties} column predates the multi-format refactor.
+ *
+ *
The injection is format-driven via {@link LegacyExecutorFactoryDefaults}: Iceberg /
+ * Mixed-Iceberg / Mixed-Hive get their canonical executor factory class name; Paimon and
+ * unknown formats are logged and skipped (Paimon planners always emit the key, so a missing
+ * entry for Paimon indicates a corrupted row the optimizer cannot safely resume).
+ *
+ *
Idempotent: rows that already carry the key are untouched, so new-format rows co-located
+ * with legacy Iceberg rows in the same process still work.
+ */
+ private void injectLegacyExecutorFactoryDefaults(
+ List>>
+ taskRuntimes) {
+ for (TaskRuntime extends StagedTaskDescriptor extends BaseOptimizingInput, ?, ?>>
+ taskRuntime : taskRuntimes) {
+ StagedTaskDescriptor extends BaseOptimizingInput, ?, ?> descriptor =
+ taskRuntime.getTaskDescriptor();
+ if (descriptor == null) {
+ continue;
+ }
+ Map props = descriptor.getProperties();
+ if (props != null && props.containsKey(TaskProperties.TASK_EXECUTOR_FACTORY_IMPL)) {
+ continue;
+ }
+ String defaultImpl =
+ LegacyExecutorFactoryDefaults.resolveDefaultExecutorFactoryImpl(
+ tableRuntime.getFormat());
+ if (defaultImpl == null) {
+ continue;
+ }
+ if (descriptor.ensureExecutorFactoryImpl(defaultImpl)) {
+ LOG.info(
+ "Restored legacy task {} with default {}={} (format={})",
+ taskRuntime.getTaskId(),
+ TaskProperties.TASK_EXECUTOR_FACTORY_IMPL,
+ defaultImpl,
+ tableRuntime.getFormat());
+ }
+ }
+ }
+
private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
try {
- List> taskRuntimes =
- getAs(
- OptimizingProcessMapper.class,
- mapper ->
- mapper.selectTaskRuntimes(
- tableRuntime.getTableIdentifier().getId(), processId));
- Map inputs = TaskFilesPersistence.loadTaskInputs(processId);
+ // Recovery is format-agnostic after C3/C4: TaskDescriptorTypeConverter resolves the
+ // descriptor subclass from TASK_EXECUTOR_FACTORY_IMPL, and TaskFilesPersistence returns
+ // a Map whose concrete values carry their own runtime
+ // class. We widen the compile-time view to StagedTaskDescriptor extends
+ // BaseOptimizingInput, ?, ?> so the same call recovers both Iceberg and Paimon rows.
+ List>>
+ taskRuntimes =
+ getAs(
+ OptimizingProcessMapper.class,
+ mapper ->
+ mapper.selectTaskRuntimes(
+ tableRuntime.getTableIdentifier().getId(), processId));
+ Map inputs = TaskFilesPersistence.loadTaskInputs(processId);
+ // Compensation for pre-0.9 Iceberg rows: TaskDescriptorTypeConverter correctly routes rows
+ // with a missing TASK_EXECUTOR_FACTORY_IMPL to RewriteStageTask, but the key itself is
+ // still absent from the descriptor's properties map on restore. OptimizerExecutor later
+ // reads that key via reflection to build the executor factory — without this injection it
+ // would NPE. Idempotent: we never overwrite an existing key (e.g. for Paimon rows where
+ // the Paimon planner already populated it).
+ injectLegacyExecutorFactoryDefaults(taskRuntimes);
taskRuntimes.forEach(
taskRuntime -> {
taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime));
- taskRuntime
- .getTaskDescriptor()
- .setInput(inputs.get(taskRuntime.getTaskId().getTaskId()));
+ BaseOptimizingInput input = inputs.get(taskRuntime.getTaskId().getTaskId());
+ TaskDescriptorRecoveryTypes.validateRecoveredTask(
+ taskRuntime.getTaskDescriptor(), input, tableRuntime.getFormat());
+ bindLoadedInput(taskRuntime.getTaskDescriptor(), input);
taskMap.put(taskRuntime.getTaskId(), taskRuntime);
if (taskRuntime.getStatus() == TaskRuntime.Status.PLANNED) {
taskQueue.offer(taskRuntime);
@@ -992,7 +1376,7 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
retryTask(taskRuntime);
}
});
- } catch (IllegalArgumentException e) {
+ } catch (IllegalArgumentException | ClassCastException e) {
LOG.warn(
"Load task inputs failed, close the optimizing process : {}",
optimizingProcess.getProcessId(),
@@ -1001,10 +1385,11 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
}
}
- private void loadTaskRuntimes(List taskDescriptors) {
+ private > void loadTaskRuntimes(
+ List taskDescriptors) {
int taskId = 1;
- for (RewriteStageTask taskDescriptor : taskDescriptors) {
- TaskRuntime taskRuntime =
+ for (T taskDescriptor : taskDescriptors) {
+ TaskRuntime taskRuntime =
new TaskRuntime<>(new OptimizingTaskId(processId, taskId++), taskDescriptor);
LOG.info(
"{} plan new task {}, summary {}",
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupMetrics.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupMetrics.java
new file mode 100644
index 0000000000..c2128603e7
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupMetrics.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing;
+
+import static org.apache.amoro.metrics.MetricDefine.defineGauge;
+
+import org.apache.amoro.metrics.Gauge;
+import org.apache.amoro.metrics.Metric;
+import org.apache.amoro.metrics.MetricDefine;
+import org.apache.amoro.metrics.MetricKey;
+import org.apache.amoro.metrics.MetricRegistry;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** Global metrics for startup optimizing queue warmup. */
+public class OptimizingQueueWarmupMetrics {
+
+ public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_TOTAL =
+ defineGauge("optimizing_queue_warmup_total")
+ .withDescription("Total number of table runtimes submitted to optimizing queue warmup")
+ .build();
+ public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_LOADED =
+ defineGauge("optimizing_queue_warmup_loaded")
+ .withDescription("Number of table runtimes successfully warmed into optimizing queues")
+ .build();
+ public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_SKIPPED =
+ defineGauge("optimizing_queue_warmup_skipped")
+ .withDescription("Number of table runtimes skipped during optimizing queue warmup")
+ .build();
+ public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_FAILED =
+ defineGauge("optimizing_queue_warmup_failed")
+ .withDescription("Number of table runtimes failed during optimizing queue warmup")
+ .build();
+ public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_RUNNING =
+ defineGauge("optimizing_queue_warmup_running")
+ .withDescription("Number of table runtimes currently running optimizing queue warmup")
+ .build();
+ public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_RETRYING =
+ defineGauge("optimizing_queue_warmup_retrying")
+ .withDescription("Number of table runtimes retried during optimizing queue warmup")
+ .build();
+ public static final MetricDefine OPTIMIZING_QUEUE_WARMUP_INITIALIZING =
+ defineGauge("optimizing_queue_warmup_initializing")
+ .withDescription("Number of table runtimes waiting for optimizing queue warmup")
+ .build();
+
+ private final MetricRegistry registry;
+ private final List registeredMetricKeys = Lists.newArrayList();
+
+ private final AtomicLong total = new AtomicLong();
+ private final AtomicLong loaded = new AtomicLong();
+ private final AtomicLong skipped = new AtomicLong();
+ private final AtomicLong failed = new AtomicLong();
+ private final AtomicLong running = new AtomicLong();
+ private final AtomicLong retrying = new AtomicLong();
+ private final AtomicLong initializing = new AtomicLong();
+
+ public OptimizingQueueWarmupMetrics(MetricRegistry registry) {
+ this.registry = registry;
+ }
+
+ public void register() {
+ registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_TOTAL, (Gauge) total::get);
+ registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_LOADED, (Gauge) loaded::get);
+ registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_SKIPPED, (Gauge) skipped::get);
+ registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_FAILED, (Gauge) failed::get);
+ registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_RUNNING, (Gauge) running::get);
+ registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_RETRYING, (Gauge) retrying::get);
+ registerMetric(registry, OPTIMIZING_QUEUE_WARMUP_INITIALIZING, (Gauge) initializing::get);
+ }
+
+ public void unregister() {
+ registeredMetricKeys.forEach(registry::unregister);
+ registeredMetricKeys.clear();
+ }
+
+ void recordSubmitted(long count) {
+ if (count <= 0) {
+ return;
+ }
+ total.addAndGet(count);
+ initializing.addAndGet(count);
+ }
+
+ void recordStart() {
+ initializing.decrementAndGet();
+ running.incrementAndGet();
+ }
+
+ void recordRetry() {
+ retrying.incrementAndGet();
+ }
+
+ void recordResult(OptimizingQueue.WarmupResult result) {
+ running.decrementAndGet();
+ if (result == OptimizingQueue.WarmupResult.WARMED) {
+ loaded.incrementAndGet();
+ } else if (result == OptimizingQueue.WarmupResult.SKIPPED
+ || result == OptimizingQueue.WarmupResult.DUPLICATE) {
+ skipped.incrementAndGet();
+ } else {
+ failed.incrementAndGet();
+ }
+ }
+
+ private void registerMetric(MetricRegistry registry, MetricDefine define, Metric metric) {
+ MetricKey key = registry.register(define, Collections.emptyMap(), metric);
+ registeredMetricKeys.add(key);
+ }
+}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupService.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupService.java
new file mode 100644
index 0000000000..6ef19781c9
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueueWarmupService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.optimizing;
+
+import org.apache.amoro.server.table.DefaultTableRuntime;
+import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/** Asynchronously warms table runtimes into optimizing queues during AMS startup. */
+public class OptimizingQueueWarmupService implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OptimizingQueueWarmupService.class);
+ private static final int MAX_ATTEMPTS = 3;
+
+ private final Function> queueSupplier;
+ private final Predicate runtimeStillActive;
+ private final OptimizingQueueWarmupMetrics metrics;
+ private final ExecutorService executor;
+ private final AtomicInteger pendingTasks = new AtomicInteger();
+ private final Object completionMonitor = new Object();
+ private volatile boolean closed;
+
+ public OptimizingQueueWarmupService(
+ int threadCount,
+ Function> queueSupplier,
+ Predicate runtimeStillActive,
+ OptimizingQueueWarmupMetrics metrics) {
+ this.queueSupplier = queueSupplier;
+ this.runtimeStillActive = runtimeStillActive;
+ this.metrics = metrics;
+ this.executor =
+ Executors.newFixedThreadPool(
+ Math.max(1, threadCount),
+ new ThreadFactoryBuilder()
+ .setNameFormat("optimizing-queue-warmup-%d")
+ .setDaemon(true)
+ .build());
+ }
+
+ public void warmupTables(List tableRuntimes) {
+ List sortedRuntimes =
+ Optional.ofNullable(tableRuntimes).orElseGet(Collections::emptyList).stream()
+ .sorted(Comparator.comparingInt(this::priority))
+ .collect(Collectors.toList());
+ if (sortedRuntimes.isEmpty() || closed) {
+ return;
+ }
+ metrics.recordSubmitted(sortedRuntimes.size());
+ pendingTasks.addAndGet(sortedRuntimes.size());
+ LOG.info("Start optimizing queue warmup: tables={}", sortedRuntimes.size());
+ sortedRuntimes.forEach(runtime -> executor.submit(() -> warmupWithRetry(runtime)));
+ }
+
+ public boolean awaitCompletionForTest(long timeout, TimeUnit unit) throws InterruptedException {
+ long deadline = System.nanoTime() + unit.toNanos(timeout);
+ synchronized (completionMonitor) {
+ while (pendingTasks.get() > 0) {
+ long remaining = deadline - System.nanoTime();
+ if (remaining <= 0) {
+ return false;
+ }
+ TimeUnit.NANOSECONDS.timedWait(completionMonitor, remaining);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ executor.shutdownNow();
+ metrics.unregister();
+ synchronized (completionMonitor) {
+ completionMonitor.notifyAll();
+ }
+ }
+
+ private void warmupWithRetry(DefaultTableRuntime runtime) {
+ OptimizingQueue.WarmupResult result = OptimizingQueue.WarmupResult.FAILED;
+ metrics.recordStart();
+ try {
+ for (int attempt = 1; attempt <= MAX_ATTEMPTS && !closed; attempt++) {
+ result = warmup(runtime);
+ if (result != OptimizingQueue.WarmupResult.FAILED) {
+ return;
+ }
+ if (attempt < MAX_ATTEMPTS) {
+ metrics.recordRetry();
+ LOG.warn(
+ "Optimizing queue warmup failed for table {}, retrying attempt {}/{}",
+ runtime.getTableIdentifier(),
+ attempt + 1,
+ MAX_ATTEMPTS);
+ }
+ }
+ } finally {
+ metrics.recordResult(result);
+ if (pendingTasks.decrementAndGet() == 0) {
+ synchronized (completionMonitor) {
+ completionMonitor.notifyAll();
+ }
+ }
+ }
+ }
+
+ private OptimizingQueue.WarmupResult warmup(DefaultTableRuntime runtime) {
+ if (!runtimeStillActive.test(runtime)) {
+ LOG.info("Skip optimizing queue warmup for inactive table {}", runtime.getTableIdentifier());
+ return OptimizingQueue.WarmupResult.SKIPPED;
+ }
+ Optional queue = queueSupplier.apply(runtime.getGroupName());
+ if (!queue.isPresent()) {
+ LOG.info(
+ "Skip optimizing queue warmup for table {}, optimizer group {} does not exist",
+ runtime.getTableIdentifier(),
+ runtime.getGroupName());
+ return OptimizingQueue.WarmupResult.SKIPPED;
+ }
+ return queue.get().warmupTable(runtime);
+ }
+
+ private int priority(DefaultTableRuntime runtime) {
+ if (runtime.getProcessId() > 0) {
+ return 0;
+ }
+ if (runtime.getOptimizingStatus().isProcessing()) {
+ return 1;
+ }
+ return 2;
+ }
+}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
index 0759f1e367..a2536de310 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/SchedulingPolicy.java
@@ -28,9 +28,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
@@ -38,6 +42,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
public class SchedulingPolicy {
@@ -84,14 +89,32 @@ public String name() {
return policyName;
}
- public DefaultTableRuntime scheduleTable(Set skipSet) {
+ public List scheduleTables(Set skipSet, int limit) {
+ if (limit <= 0) {
+ return Collections.emptyList();
+ }
+
+ List tableRuntimeSnapshot;
+ tableLock.lock();
+ try {
+ tableRuntimeSnapshot = new ArrayList<>(tableRuntimeMap.values());
+ } finally {
+ tableLock.unlock();
+ }
+
+ Set effectiveSkipSet = new HashSet<>(skipSet);
+ fillSkipSet(tableRuntimeSnapshot, effectiveSkipSet);
+ return tableRuntimeSnapshot.stream()
+ .filter(tableRuntime -> !effectiveSkipSet.contains(tableRuntime.getTableIdentifier()))
+ .sorted(createSorterByPolicy())
+ .limit(limit)
+ .collect(Collectors.toList());
+ }
+
+ public Set tableIdentifiersSnapshot() {
tableLock.lock();
try {
- fillSkipSet(skipSet);
- return tableRuntimeMap.values().stream()
- .filter(tableRuntime -> !skipSet.contains(tableRuntime.getTableIdentifier()))
- .min(createSorterByPolicy())
- .orElse(null);
+ return new HashSet<>(tableRuntimeMap.keySet());
} finally {
tableLock.unlock();
}
@@ -106,9 +129,10 @@ private Comparator createSorterByPolicy() {
}
}
- private void fillSkipSet(Set originalSet) {
+ private void fillSkipSet(
+ List tableRuntimes, Set originalSet) {
long currentTime = System.currentTimeMillis();
- tableRuntimeMap.values().stream()
+ tableRuntimes.stream()
.filter(
tableRuntime ->
!isTablePending(tableRuntime)
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
index d71ae484b6..e8205be2e1 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java
@@ -33,10 +33,9 @@
import org.apache.amoro.op.SnapshotSummary;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TableOptimizingCommitter;
import org.apache.amoro.optimizing.TaskProperties;
import org.apache.amoro.properties.HiveTableProperties;
-import org.apache.amoro.server.optimizing.TaskRuntime.Status;
-import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.UnkeyedTable;
@@ -74,26 +73,24 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class UnKeyedTableCommit {
+public class UnKeyedTableCommit implements TableOptimizingCommitter {
private static final Logger LOG = LoggerFactory.getLogger(UnKeyedTableCommit.class);
private final Long targetSnapshotId;
private final MixedTable table;
- private final Collection> tasks;
+ private final Collection tasks;
public UnKeyedTableCommit(
- Long targetSnapshotId, MixedTable table, Collection> tasks) {
+ Long targetSnapshotId, MixedTable table, Collection tasks) {
this.targetSnapshotId = targetSnapshotId;
this.table = table;
this.tasks = tasks;
}
- private Set> getExcludedDeleteFiles(
- List> successTasks) {
+ private Set> getExcludedDeleteFiles(List successTasks) {
Set> excludedDeleteFiles = new HashSet<>();
tasks.stream()
.filter(task -> !successTasks.contains(task))
- .map(TaskRuntime::getTaskDescriptor)
.filter(task -> task.getInput().rewrittenDeleteFiles() != null)
.forEach(
task ->
@@ -120,8 +117,11 @@ protected List moveFile2HiveIfNeed() {
: table.asKeyedTable().baseTable().spec().partitionType();
List newTargetFiles = new ArrayList<>();
- for (TaskRuntime taskRuntime : tasks) {
- RewriteFilesOutput output = taskRuntime.getTaskDescriptor().getOutput();
+ for (RewriteStageTask task : tasks) {
+ if (task.getOutput() == null) {
+ continue;
+ }
+ RewriteFilesOutput output = task.getOutput();
DataFile[] dataFiles = output.getDataFiles();
if (dataFiles == null) {
continue;
@@ -139,7 +139,7 @@ protected List moveFile2HiveIfNeed() {
for (DataFile targetFile : targetFiles) {
String partitionPath =
partitionPathMap.computeIfAbsent(
- taskRuntime.getTaskDescriptor().getPartition(),
+ task.getPartition(),
key -> getPartitionPath(hiveClient, maxTransactionId, targetFile, partitionSchema));
DataFile finalDataFile = moveTargetFiles(targetFile, partitionPath);
@@ -207,10 +207,8 @@ private String getIcebergPartitionLocation(StructLike partitionData) {
}
public void commit() throws OptimizingCommitException {
- List> successTasks =
- tasks.stream()
- .filter(task -> task.getStatus() == Status.SUCCESS)
- .collect(Collectors.toList());
+ List successTasks =
+ tasks.stream().filter(task -> task.getOutput() != null).collect(Collectors.toList());
if (successTasks.isEmpty()) {
LOG.info("No tasks to commit for table {}", table.id());
return;
@@ -226,29 +224,27 @@ public void commit() throws OptimizingCommitException {
Set removedDataFiles = Sets.newHashSet();
Set addedDeleteFiles = Sets.newHashSet();
Set removedDeleteFiles = Sets.newHashSet();
- successTasks.stream()
- .map(TaskRuntime::getTaskDescriptor)
- .forEach(
- task -> {
- if (CollectionUtils.isNotEmpty(hiveNewDataFiles)) {
- addedDataFiles.addAll(hiveNewDataFiles);
- } else if (task.getOutput().getDataFiles() != null) {
- addedDataFiles.addAll(Arrays.asList(task.getOutput().getDataFiles()));
- }
- if (task.getOutput().getDeleteFiles() != null) {
- addedDeleteFiles.addAll(Arrays.asList(task.getOutput().getDeleteFiles()));
- }
- if (task.getInput().rewrittenDataFiles() != null) {
- removedDataFiles.addAll(Arrays.asList(task.getInput().rewrittenDataFiles()));
- }
- if (task.getInput().rewrittenDeleteFiles() != null) {
- removedDeleteFiles.addAll(
- Arrays.stream(task.getInput().rewrittenDeleteFiles())
- .filter(deleteFile -> needRemove(excludedDeleteFiles, deleteFile))
- .map(ContentFiles::asDeleteFile)
- .collect(Collectors.toSet()));
- }
- });
+ successTasks.forEach(
+ task -> {
+ if (CollectionUtils.isNotEmpty(hiveNewDataFiles)) {
+ addedDataFiles.addAll(hiveNewDataFiles);
+ } else if (task.getOutput().getDataFiles() != null) {
+ addedDataFiles.addAll(Arrays.asList(task.getOutput().getDataFiles()));
+ }
+ if (task.getOutput().getDeleteFiles() != null) {
+ addedDeleteFiles.addAll(Arrays.asList(task.getOutput().getDeleteFiles()));
+ }
+ if (task.getInput().rewrittenDataFiles() != null) {
+ removedDataFiles.addAll(Arrays.asList(task.getInput().rewrittenDataFiles()));
+ }
+ if (task.getInput().rewrittenDeleteFiles() != null) {
+ removedDeleteFiles.addAll(
+ Arrays.stream(task.getInput().rewrittenDeleteFiles())
+ .filter(deleteFile -> needRemove(excludedDeleteFiles, deleteFile))
+ .map(ContentFiles::asDeleteFile)
+ .collect(Collectors.toSet()));
+ }
+ });
try {
Transaction transaction = table.asUnkeyedTable().newTransaction();
if (removedDeleteFiles.isEmpty() && !addedDeleteFiles.isEmpty()) {
@@ -394,7 +390,8 @@ private DataFile moveTargetFiles(DataFile targetFile, String hiveLocation) {
private static Set getCommittedDataFilesFromSnapshotId(
UnkeyedTable table, Long snapshotId) {
- long currentSnapshotId = IcebergTableUtil.getSnapshotId(table, true);
+ Snapshot current = table.currentSnapshot();
+ long currentSnapshotId = current != null ? current.snapshotId() : Constants.INVALID_SNAPSHOT_ID;
if (currentSnapshotId == Constants.INVALID_SNAPSHOT_ID) {
return Collections.emptySet();
}
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java
index 2ee650c522..cb99097437 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/HttpSessionHandlerFactory.java
@@ -67,7 +67,7 @@ public AmoroSessionTableSchema(DataSource dataSource) {
setLastNodeColumn("last_node");
setAccessTimeColumn("access_time");
setLastAccessTimeColumn("last_access_time");
- setCreateTimeColumn("create_time");
+ setCreateTimeColumn("session_create_time");
setCookieTimeColumn("cookie_time");
setLastSavedTimeColumn("last_save_time");
setExpiryTimeColumn("expiry_time");
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
index 1163960f54..e59a40a545 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/TaskFilesPersistence.java
@@ -18,9 +18,9 @@
package org.apache.amoro.server.persistence;
-import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.optimizing.BaseOptimizingInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
-import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.mapper.OptimizingProcessMapper;
import org.apache.amoro.server.utils.CompressUtil;
@@ -32,12 +32,40 @@
import java.util.Map;
import java.util.stream.Collectors;
+/**
+ * Static facade for persisting and reloading the per-task {@code OptimizingInput} blob bound to an
+ * {@code OptimizingProcess}.
+ *
+ *
Storage shape is intentionally format-agnostic: we serialize a {@code Map} via {@link SerializationUtil#simpleSerialize(Object)} (Java serialization)
+ * and gzip it through {@link CompressUtil}. Any {@link BaseOptimizingInput} subclass that is {@code
+ * Serializable} round-trips without this layer needing to know the concrete type — concretely, both
+ * the Iceberg {@code RewriteFilesInput} and Paimon {@code PaimonCompactionInput} are covered.
+ *
+ *
The reason the load-side return type is {@link BaseOptimizingInput} (rather than a format
+ * specific subclass) is recovery: {@code OptimizingQueue.TableOptimizingProcess#loadTaskRuntimes}
+ * pushes each input into {@link StagedTaskDescriptor#setInput(Object)} whose signature is erased to
+ * {@code I}. The caller and the descriptor instance (resolved by {@code
+ * TaskDescriptorTypeConverter}) agree on the concrete subclass at runtime; this layer only
+ * guarantees we do not downcast prematurely.
+ */
public class TaskFilesPersistence {
private static final DatabasePersistence persistence = new DatabasePersistence();
+ /**
+ * Persist the per-task {@code OptimizingInput} map for one process, keyed by {@code taskId}.
+ *
+ *
Accepts any {@link TaskRuntime} whose descriptor declares a {@link BaseOptimizingInput}
+ * subclass — so Iceberg {@code RewriteStageTask} / {@code RewriteFilesInput} and Paimon {@code
+ * PaimonCompactionTask} / {@code PaimonCompactionInput} both flow through unchanged.
+ */
public static void persistTaskInputs(
- long processId, Collection> tasks) {
+ long processId,
+ Collection<
+ ? extends
+ TaskRuntime extends StagedTaskDescriptor extends BaseOptimizingInput, ?, ?>>>
+ tasks) {
persistence.persistTaskInputs(
processId,
tasks.stream()
@@ -46,7 +74,16 @@ public static void persistTaskInputs(
e -> e.getTaskId().getTaskId(), task -> task.getTaskDescriptor().getInput())));
}
- public static Map loadTaskInputs(long processId) {
+ /**
+ * Load the per-task inputs previously persisted for {@code processId}. Returns an empty map if
+ * the row is missing or carries no blob.
+ *
+ *
Each value is a {@link BaseOptimizingInput} subclass; the concrete type is whatever {@code
+ * persistTaskInputs} wrote (Java serialization preserves the runtime class). Callers that need
+ * the format-specific fields are expected to obtain the matching descriptor via {@code
+ * TaskDescriptorTypeConverter} and route through {@link StagedTaskDescriptor#setInput}.
+ */
+ public static Map loadTaskInputs(long processId) {
List bytes =
persistence.getAs(
OptimizingProcessMapper.class, mapper -> mapper.selectProcessInputFiles(processId));
@@ -63,7 +100,7 @@ public static RewriteFilesOutput loadTaskOutput(byte[] content) {
private static class DatabasePersistence extends PersistentBase {
- public void persistTaskInputs(long processId, Map tasks) {
+ public void persistTaskInputs(long processId, Map tasks) {
doAs(
OptimizingProcessMapper.class,
mapper -> mapper.updateProcessInputFiles(processId, tasks));
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorRecoveryTypes.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorRecoveryTypes.java
new file mode 100644
index 0000000000..ca2f6a9142
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/TaskDescriptorRecoveryTypes.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.converter;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionExecutorFactory;
+import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionInput;
+import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionOutput;
+import org.apache.amoro.formats.paimon.optimizing.PaimonCompactionTask;
+import org.apache.amoro.formats.paimon.optimizing.PaimonMetricsSummary;
+import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionExecutorFactory;
+import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionInput;
+import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionOutput;
+import org.apache.amoro.formats.paimon.optimizing.primary.PaimonPrimaryKeyCompactionTask;
+import org.apache.amoro.hive.optimizing.MixedHiveRewriteExecutorFactory;
+import org.apache.amoro.optimizing.BaseOptimizingInput;
+import org.apache.amoro.optimizing.IcebergRewriteExecutorFactory;
+import org.apache.amoro.optimizing.MetricsSummary;
+import org.apache.amoro.optimizing.MixedIcebergRewriteExecutorFactory;
+import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.optimizing.RewriteFilesOutput;
+import org.apache.amoro.optimizing.RewriteStageTask;
+import org.apache.amoro.optimizing.TaskProperties;
+import org.apache.amoro.process.StagedTaskDescriptor;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Registry for restoring persisted optimizing task descriptors and their typed payloads.
+ *
+ *
Rows without {@link TaskProperties#TASK_EXECUTOR_FACTORY_IMPL} are legacy Iceberg rows and
+ * intentionally fall back to {@link RewriteStageTask}. Rows with an unknown factory fail fast so a
+ * new lake format cannot be silently restored with the wrong descriptor, input, output, or summary
+ * class.
+ */
+public final class TaskDescriptorRecoveryTypes {
+
+ private static final Gson GSON = new Gson();
+ private static final RecoveryTypes LEGACY_ICEBERG_TYPES =
+ new RecoveryTypes(
+ RewriteStageTask.class,
+ RewriteFilesInput.class,
+ RewriteFilesOutput.class,
+ MetricsSummary.class);
+ private static final Map FACTORY_IMPL_TO_RECOVERY_TYPES;
+
+ static {
+ Map mappings = new HashMap<>();
+ mappings.put(IcebergRewriteExecutorFactory.class.getName(), LEGACY_ICEBERG_TYPES);
+ mappings.put(MixedIcebergRewriteExecutorFactory.class.getName(), LEGACY_ICEBERG_TYPES);
+ mappings.put(MixedHiveRewriteExecutorFactory.class.getName(), LEGACY_ICEBERG_TYPES);
+ mappings.put(
+ PaimonCompactionExecutorFactory.class.getName(),
+ new RecoveryTypes(
+ PaimonCompactionTask.class,
+ PaimonCompactionInput.class,
+ PaimonCompactionOutput.class,
+ PaimonMetricsSummary.class));
+ mappings.put(
+ PaimonPrimaryKeyCompactionExecutorFactory.class.getName(),
+ new RecoveryTypes(
+ PaimonPrimaryKeyCompactionTask.class,
+ PaimonPrimaryKeyCompactionInput.class,
+ PaimonPrimaryKeyCompactionOutput.class,
+ PaimonMetricsSummary.class));
+ FACTORY_IMPL_TO_RECOVERY_TYPES = Collections.unmodifiableMap(mappings);
+ }
+
+ private TaskDescriptorRecoveryTypes() {}
+
+ public static RecoveryTypes resolve(String factoryImpl) {
+ if (factoryImpl == null || factoryImpl.isEmpty()) {
+ return LEGACY_ICEBERG_TYPES;
+ }
+ RecoveryTypes types = FACTORY_IMPL_TO_RECOVERY_TYPES.get(factoryImpl);
+ if (types == null) {
+ throw new IllegalArgumentException(
+ "Unknown "
+ + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL
+ + " = "
+ + factoryImpl
+ + "; register it in TaskDescriptorRecoveryTypes");
+ }
+ return types;
+ }
+
+ public static void validateRecoveredTask(
+ StagedTaskDescriptor, ?, ?> descriptor, BaseOptimizingInput input) {
+ validateRecoveredTask(descriptor, input, null);
+ }
+
+ public static void validateRecoveredTask(
+ StagedTaskDescriptor, ?, ?> descriptor,
+ BaseOptimizingInput input,
+ TableFormat tableFormat) {
+ if (descriptor == null) {
+ throw new IllegalArgumentException("Recovered task descriptor must not be null");
+ }
+ String factoryImpl = factoryImpl(descriptor.getProperties());
+ if ((factoryImpl == null || factoryImpl.isEmpty())
+ && !allowsLegacyMissingFactory(tableFormat)) {
+ throw new IllegalArgumentException(
+ "Recovered task is missing "
+ + TaskProperties.TASK_EXECUTOR_FACTORY_IMPL
+ + " for table format "
+ + tableFormat);
+ }
+ RecoveryTypes types = resolve(factoryImpl);
+ validateType("descriptor", descriptor, types.descriptorClass());
+ validateType("input", input, types.inputClass());
+ validateType("output", descriptor.getOutput(), types.outputClass());
+ validateType("summary", descriptor.getSummary(), types.summaryClass());
+ }
+
+ static Map>> descriptorMappings() {
+ Map>> mappings = new HashMap<>();
+ FACTORY_IMPL_TO_RECOVERY_TYPES.forEach(
+ (factoryImpl, types) -> mappings.put(factoryImpl, types.descriptorClass()));
+ return Collections.unmodifiableMap(mappings);
+ }
+
+ static String readFactoryImpl(ResultSet rs) throws SQLException {
+ String raw;
+ try {
+ raw = rs.getString("properties");
+ } catch (SQLException e) {
+ return null;
+ }
+ if (raw == null || raw.isEmpty()) {
+ return null;
+ }
+ Map props;
+ try {
+ props = GSON.fromJson(raw, new TypeToken