HDDS-14020. Use ForkJoinPool instead of a ScheduledThreadPoolExecutor in BackgroundService#9686
HDDS-14020. Use ForkJoinPool instead of a ScheduledThreadPoolExecutor in BackgroundService#9686smengcl wants to merge 6 commits intoapache:masterfrom
Conversation
…rvice Change-Id: I4c1a051e8574d32375cbebeb10546563f4a4f817
Change-Id: Ie2118356f902443a93fe666d890ad4d59e9dd467
…omment: apache#9390 (comment) Refactor BackgroundTask to use wrapper pattern for ForkJoinPool integration This commit introduces BackgroundTaskForkJoin as a wrapper class to integrate BackgroundTask with ForkJoinPool, avoiding the need to change all service implementations from 'implements' to 'extends'. Key changes: - Reverted BackgroundTask from abstract class back to interface - Created BackgroundTaskForkJoin wrapper extending RecursiveTask - Updated BackgroundService to wrap tasks before forking - Reverted all service task classes to 'implements BackgroundTask'
|
There are some issues after rebasing. Let me fix it. |
There was a problem hiding this comment.
Pull request overview
Refactors Ozone background services to run tasks via a ForkJoinPool (instead of a ScheduledThreadPoolExecutor) and updates directory deletion to support fork/join-style parallelism, targeting shutdown deadlock avoidance (HDDS-14020).
Changes:
- Reworked
BackgroundServicescheduling/execution to use aForkJoinPoolplus a sharedScheduledExecutorService. - Updated
DirectoryDeletingServiceto optionally fork internal deletion work and adjusted related tests. - Introduced fork/join wrappers and minor task-wrapping/refactoring in deleting services and SST filtering.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 16 comments.
Show a summary per file
| File | Description |
|---|---|
| hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java | Core switch to ForkJoin-based execution and custom periodic scheduling. |
| hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java | New ForkJoin wrapper for BackgroundTask execution. |
| hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java | New shared scheduler supplier for periodic rescheduling. |
| hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java | Uses fork/join-style parallelism instead of internal executors; adds allowTasksToFork plumbing. |
| hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java | Refactors task wrapping into a reusable BackgroundDeleteTask. |
| hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java | Adjusts call() signature and adds interrupt handling. |
| hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java | Updates task construction for the new DirDeletingTask signature. |
| hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java | Updates PeriodicalTask construction to match new signature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); |
There was a problem hiding this comment.
After catching InterruptedException, the code re-interrupts the thread but continues processing subsequent snapshots. Typically interruption should stop the task (e.g., break the loop / return) to allow the service to shut down promptly and avoid doing more work on an interrupted thread.
| LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); | |
| LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); | |
| break; |
| scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)), | ||
| intervalInMillis, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
The scheduled runnable in scheduleNextTask() calls exec.submit(...) without re-checking shutdown state or whether exec is still non-null. Since shutdown() sets exec = null and the shared scheduler uses shutdown() (not shutdownNow()), already-scheduled tasks can still execute after shutdown and hit NPE / RejectedExecutionException. Guard inside the scheduled runnable (check shutdown flag + exec != null) and/or cancel scheduled tasks on shutdown.
| scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)), | |
| intervalInMillis, TimeUnit.MILLISECONDS); | |
| scheduledExecuterService.schedule(() -> { | |
| if (!isShutdown.get() && exec != null) { | |
| exec.submit(new PeriodicalTask(this)); | |
| } | |
| }, intervalInMillis, TimeUnit.MILLISECONDS); |
| exec = new ForkJoinPool(threadPoolSize, factory, null, false); | ||
| isShutdown = new AtomicReference<>(false); |
There was a problem hiding this comment.
Background tasks frequently do blocking I/O (eg OM Ratis submitRequest, RocksDB calls). Using a ForkJoinPool for blocking work can reduce parallelism and hurt throughput unless blocking sections use ForkJoinPool.managedBlock(...) (or a dedicated blocking pool is used). Consider addressing blocking sections or documenting why ForkJoinPool is safe here.
| public static synchronized UncheckedAutoCloseableSupplier<ScheduledExecutorService> get() { | ||
| if (executor == null) { | ||
| ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); | ||
| executor = ReferenceCountedObject.wrap(scheduler, () -> { }, (shutdown) -> { |
There was a problem hiding this comment.
BackgroundServiceScheduler uses the default thread factory for ScheduledThreadPoolExecutor, which creates non-daemon threads. Since this is a shared background scheduler, a non-daemon thread can keep the JVM alive if something forgets to close/release it. Consider using a daemon thread factory (and naming the thread) for the scheduler.
| public class BackgroundTaskForkJoin extends RecursiveTask<BackgroundTaskForkJoin.BackgroundTaskForkResult> { | ||
| private static final long serialVersionUID = 1L; | ||
| private final transient BackgroundTask backgroundTask; | ||
|
|
||
| public BackgroundTaskForkJoin(BackgroundTask backgroundTask) { |
There was a problem hiding this comment.
This class is added under the hdds-common module but depends on BackgroundTask/BackgroundTaskResult, which are defined in hdds-server-framework (and hdds-common does not depend on it). Since hdds-server-framework already depends on hdds-common, adding the reverse dependency would create a cycle; as-is, this likely won’t compile. Consider moving this class into hdds-server-framework, or into a new shared module that both can depend on.
| }, exec).exceptionally(e -> null), (Void1, Void) -> null); | ||
| Consumer<BackgroundTaskForkJoin> taskForkHandler = task -> { | ||
| task.fork(); | ||
| tasksInFlight.offer(task); |
There was a problem hiding this comment.
Method accept ignores exceptional return value of Queue.offer.
| tasksInFlight.offer(task); | |
| if (!tasksInFlight.offer(task)) { | |
| LOG.error("Failed to enqueue background task for service {}. Task will not be tracked.", serviceName); | |
| } |
| } | ||
| }; | ||
| task.fork(); | ||
| recursiveTasks.offer(task); |
There was a problem hiding this comment.
Method processDeletedDirsForStore ignores exceptional return value of Queue<RecursiveTask>.offer.
| recursiveTasks.offer(task); | |
| if (!recursiveTasks.offer(task)) { | |
| // If the task cannot be enqueued, ensure it is joined and | |
| // mark that not all deleted directories were processed. | |
| task.join(); | |
| processedAllDeletedDirs = false; | |
| break; | |
| } |
| pendingDeletedDirInfo.getValue(), | ||
| pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList, | ||
| getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); | ||
| dds.getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); |
| dds.optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, | ||
| subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey, | ||
| startTime, getOzoneManager().getKeyManager(), | ||
| startTime, dds.getOzoneManager().getKeyManager(), |
| omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), snapInfo.getBucketName(), | ||
| snapInfo.getName())) { | ||
| KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager() | ||
| KeyManager keyManager = snapInfo == null ? dds.getOzoneManager().getKeyManager() |
What changes were proposed in this pull request?
This is the continuation of #9390
This addressed comment from @sumitagrawl (#9390 (comment)) which significantly reduces the number of files touched.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14020
How was this patch tested?