[AMORO-3988][ams] Automatic restart the optimizers if it is down unexpected#4180
Open
lintingbin wants to merge 11 commits intoapache:masterfrom
Open
[AMORO-3988][ams] Automatic restart the optimizers if it is down unexpected#4180lintingbin wants to merge 11 commits intoapache:masterfrom
lintingbin wants to merge 11 commits intoapache:masterfrom
Conversation
…ectedly Add auto-restart mechanism for optimizers that die unexpectedly. When an optimizer process crashes, the resource record remains in the DB but the optimizer record is removed by heartbeat timeout. This leaves 'orphaned' resources with no active optimizer. Changes: - Add orphaned resource detection in OptimizerGroupKeeper: periodically cross-check resource table against optimizer table to find resources without active optimizers - Restart orphaned optimizer via container.requestResource() - Track retry count per orphaned resource; clean up resource from DB after exceeding configurable max retries - Add configuration options: - optimizer.auto-restart-enabled (default: false) - optimizer.auto-restart-max-retries (default: 5) - Add 3 test cases for auto-restart scenarios
Fix several issues in the auto-restart mechanism: 1. Prevent double provisioning: restartOrphanedOptimizers now returns the total thread count of orphaned resources being restarted, which is subtracted from requiredCores in tryKeeping to avoid duplicate resource allocation. 2. Add grace period for orphaned resource detection: new config optimizer.auto-restart-grace-period (default 5min) prevents misidentifying resources whose optimizer is still starting up (e.g. Flink/Kubernetes) as orphaned. 3. Persist updated properties after restart: add updateResource method to ResourceMapper/ResourceManager/DefaultOptimizerManager so that new job-id and other properties from doScaleOut are persisted to DB after restarting an orphaned resource. 4. Use timestamp-based tracking: replace simple retry counter with OrphanedResourceState that tracks both firstDetectedTime and restartAttempts. After a successful restart, the grace period timer is reset to allow the optimizer time to register. 5. Use InternalResourceContainer interface instead of casting to AbstractOptimizerContainer, with instanceof check for safety. 6. Improve test stability: replace Thread.sleep with polling-based waitUntil helper to reduce flakiness in CI environments.
- Fix grace period to use lastRestartTime instead of firstDetectedTime so retry attempts are rate-limited from the most recent restart, not first detection - Fix rawRequiredCores vs requiredCores: orphaned cores must not be counted as satisfied capacity when resetting minParallelism after max keeping attempts - Fix non-InternalResourceContainer: log warn only once, suppress further cycles via grace period, and do not consume restartAttempts (requires manual cleanup) - Fix TOCTOU: add defensive deleteOptimizer before deleteResource on max retries - Add start_time = CURRENT_TIMESTAMP to updateResource so restarts are visible - Fix test race in testNoRestartWhenOptimizerIsActive: authenticate before createResource to close the orphan detection window - Replace ConcurrentHashMap with HashMap in orphanedResourceStates (single-threaded) - Add docs/configuration/ams-config.md entries for three new config keys - Add comments explaining TOCTOU, grace period semantics, and test rationale
441b8ff to
c177a13
Compare
Follow-up to the previous review round: - Add InternalResourceContainer#supportsAutoRestart() (default true) and override it in KubernetesOptimizerContainer to return false. AMS-driven auto-restart would otherwise race against the K8s Deployment's own Pod-level self-healing: doScaleOut uses .create() and throws AlreadyExists when a Deployment re-created by the ReplicaSet is still present, and after max retries the resource record would be deleted, dragging the live Deployment down with it. Opted-out containers are now logged once and silenced via the grace period, without consuming restartAttempts and without ever reaching the cleanup branch. - Remove "start_time = CURRENT_TIMESTAMP" from ResourceMapper.updateResource. start_time represents the original optimizer start timestamp and is relied on for dashboard display and audit; restarts should not overwrite it. updateResource now persists only thread_count, memory and properties (the fields that genuinely change across a restart). - Clarify in the restartOrphanedOptimizers javadoc that this method is already leader-gated by AbstractKeeper#run (via haContainer.hasLeadership), so master-slave mode cannot race two AMS nodes on the same orphan. - Refactor restartOrphanedOptimizers to compute the InternalResourceContainer / supportsAutoRestart check once and branch from there, collapsing two prior instanceof checks. - Add TestOptimizerGroupKeeper#testNoRestartWhenContainerOptsOut covering the new opt-out path: asserts that neither requestResource is called nor the DB row is deleted when the container returns supportsAutoRestart=false.
- Expand InternalResourceContainer#supportsAutoRestart javadoc to (a) mention Flink/Spark HA as another valid opt-out case and (b) spell out that orphaned rows will accumulate in the DB when external self-healing fails permanently, so implementers understand the trade-off. - Move KubernetesOptimizerContainer#supportsAutoRestart out of the constants block and next to the other overrides so the file structure matches the rest of the container (override methods grouped together). - Document in OrphanedResourceState#lastRestartTime that it also serves as a log-once sentinel for opt-out containers, so its dual role is visible to future readers instead of implicit in the call site. No behavior change.
The previous revision set OPTIMIZER_AUTO_RESTART_ENABLED=true and OPTIMIZER_AUTO_RESTART_GRACE_PERIOD=0ms as static defaults in AMSServiceTestBase, which meant every AMS test class inherited them. Combined with the parent-base HB_TIMEOUT=800ms and check-interval=10ms, unrelated tests that let an optimizer expire would trip orphan detection in the keeper, which then hit listOptimizers + listResourcesByGroup in a tight loop. On the hadoop3 CI profile this SELECT loop deadlocks with the TRUNCATE OPTIMIZER issued during DerbyPersistence cleanup, leaving the DB in a half-cleaned state and cascading AlreadyExists / Metric already registered failures into the next test class (TestInternalMixedCatalogService in the failing CI run). Fix: - Revert AMSServiceTestBase to the production default (auto-restart disabled, grace-period unset). Sibling test classes no longer pay for this test class's needs. - In TestOptimizerGroupKeeper itself, flip autoRestartEnabled=true and autoRestartGracePeriodMs=0 on the shared OPTIMIZING_SERVICE instance via DynFields in @BeforeClass, and restore the previous values in @afterclass. Only this class's orphan-detection tests run with the aggressive settings; no other test sees them. Locally TestInternalMixedCatalogService (the canary in the failing CI), TestDefaultOptimizingService, and TestOptimizerGroupKeeper all pass after the change.
Follow-up to the CI fix (c35203b). The previous revision mutated autoRestartEnabled / autoRestartGracePeriodMs via DynFields reflection on private final fields. That works on JDK 8/11 but has two problems: - JLS 17.5.3 does not guarantee happens-before for reflective writes to final fields. The keeper thread happened to see updates due to the BlockingQueue lock in suspendingQueue.take(), but we shouldn't rely on that accident. - JDK 12+ is tightening access to final fields; reflective writes may become IllegalAccessException under future JVMs. Changes: - DefaultOptimizingService: change autoRestartEnabled and autoRestartGracePeriodMs from final to volatile, and add three @VisibleForTesting methods (setAutoRestartForTest, isAutoRestartEnabled, getAutoRestartGracePeriodMs). autoRestartMaxRetries remains final since tests never mutate it. volatile guarantees cross-thread visibility without speculative JMM concerns, and the production call sites are unchanged (constructor-only writes). - AMSServiceTestBase: add protected static optimizingServiceStatic() so @BeforeClass / @afterclass hooks in subclasses can reach the shared instance without reflecting against the private static field. Also avoids brittle string-literal field names. - TestOptimizerGroupKeeper: drop the DynFields reflection and the optimizingServiceRef() helper; call setAutoRestartForTest directly through the new accessor. Also upgrade the @afterclass restoration from silent catch-Throwable to a LOG.warn, so a failure to restore cannot silently pollute the next test class (the exact regression the previous commit fixed).
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #4180 +/- ##
============================================
+ Coverage 29.88% 30.01% +0.12%
- Complexity 4288 4296 +8
============================================
Files 679 680 +1
Lines 55060 55188 +128
Branches 7032 7046 +14
============================================
+ Hits 16455 16565 +110
- Misses 37375 37389 +14
- Partials 1230 1234 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Four small hardenings on top of the auto-restart feature. No behavior
change for successful restart paths; only error-handling robustness,
API ergonomics, and documentation.
1. Config doc clarifications
- max-retries description now notes "per leader tenure" (master-slave
failover resets the counter) and the worst-case cleanup time is
approximately max-retries * grace-period.
- enabled description notes that failed restart attempts may cause
short-lived duplicate resources (orphan retried in background while
a fresh optimizer is provisioned).
- grace-period description now notes it is also used between
consecutive restart attempts, not just after first detection.
2. Zombie-row resilience in restartOrphanedOptimizers
If a resource row references a container that has been deregistered
from config, Containers.get throws; previously the outer catch
swallowed the error and aborted the whole cycle, permanently blocking
restart of every other orphan in the group. Resolve the container in
a per-orphan try/catch and route resolution failures through the
opt-out path (log once, skip, do not consume retry attempts).
3. rawRequiredCores comment clarified
The previous comment suggested orphaned-cores exclusion happened in
rawRequiredCores itself. It actually happens inside tryKeeping via
listOptimizers (which returns only live, heartbeating optimizers).
Rewrite to describe the actual data flow.
4. Collapse @VisibleForTesting surface (3 methods -> 1)
overrideAutoRestartForTest now returns an AutoCloseable that restores
the previous values on close, replacing the setter + two getters that
TestOptimizerGroupKeeper used to snapshot-and-restore manually. The
test's @BeforeClass / @afterclass become a textbook open/close pair.
Null out the static handle in a finally block so a forked JVM that reuses this test class doesn't observe a stale already-closed AutoCloseable on the next @BeforeClass invocation. No behavior change in the normal single-run path.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Search before asking
What type of PR is this?
What does this PR do?
Add an automatic restart mechanism for optimizers that die unexpectedly.
Problem
When an optimizer process crashes, the
OptimizerKeeperdetects the heartbeat timeout and removes the optimizer record from theoptimizertable. However, the corresponding resource record in theresourcetable is not cleaned up, leaving an "orphaned" resource with no active optimizer. Currently there is no mechanism to detect this situation and restart the optimizer.Solution
Extend the existing
OptimizerGroupKeeperto periodically detect orphaned resources and automatically restart the optimizer:OptimizerGroupKeeper.processTask(), cross-check theresourcetable against theoptimizertable to find resources without any active optimizer instances.container.requestResource(resource)to restart the optimizer process.Configuration
Two new configuration options are added (disabled by default):
optimizer.auto-restart-enabledfalseoptimizer.auto-restart-max-retries5Bug fix
Also fixes
ResourceMapper.selectResourcesByGroupresult mapping where property names did not matchResourceclass fields (group→groupName,container→containerName,totalMemory→memoryMb).Checklist
Closes #3988