diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java index 850fba4eb3..c3c079ede3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java @@ -34,6 +34,7 @@ import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import org.apache.amoro.OptimizerProperties; import org.apache.amoro.resource.Resource; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; @@ -68,6 +69,7 @@ public class KubernetesOptimizerContainer extends AbstractOptimizerContainer { public static final String PODTEMPLATE = "podTemplate"; public static final String PULL_SECRETS = "imagePullSecrets"; public static final String KUBE_CONFIG_PATH = "kube-config-path"; + public static final String SERVICE_ACCOUNT = "serviceAccount"; private static final String NAME_PREFIX = "amoro-optimizer-"; @@ -75,6 +77,13 @@ public class KubernetesOptimizerContainer extends AbstractOptimizerContainer { private static final String EXTRA_PROPERTY_PREFIX = "extra."; + /** + * Buffer added on top of optimizer's shutdown-timeout when deriving K8s + * terminationGracePeriodSeconds, to leave room for thread join, best-effort task result + * reporting, and log flush before SIGKILL. + */ + static final long TERMINATION_GRACE_BUFFER_SECONDS = 30L; + private static final Map EXTRA_PROPERTY_DEFAULTS = new HashMap<>(); static { @@ -86,6 +95,30 @@ private String getExtraProperty(Map properties, String key) { EXTRA_PROPERTY_PREFIX + key, EXTRA_PROPERTY_DEFAULTS.getOrDefault(key, null)); } + static long parseShutdownTimeoutMs(String startUpArgs) { + if (startUpArgs == null) { + return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT; + } + String[] tokens = startUpArgs.split("\\s+"); + String longFlag = "--" + OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS; + for (int i = 0; i < tokens.length - 1; i++) { + if ("-st".equals(tokens[i]) || longFlag.equals(tokens[i])) { + try { + return Long.parseLong(tokens[i + 1]); + } catch (NumberFormatException e) { + return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT; + } + } + } + return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT; + } + + static long resolveTerminationGracePeriodSeconds(String startUpArgs) { + long shutdownTimeoutMs = parseShutdownTimeoutMs(startUpArgs); + long shutdownTimeoutSeconds = (shutdownTimeoutMs + 999L) / 1000L; + return shutdownTimeoutSeconds + TERMINATION_GRACE_BUFFER_SECONDS; + } + private KubernetesClient client; @Override @@ -138,6 +171,7 @@ protected Map doScaleOut(Resource resource) { memory, imagePullSecretsList); } else { + String serviceAccount = groupProperties.get(SERVICE_ACCOUNT); deployment = initPodTemplateWithoutConfig( image, @@ -147,7 +181,8 @@ protected Map doScaleOut(Resource resource) { resourceId, startUpArgs, memory, - imagePullSecretsList); + imagePullSecretsList, + serviceAccount); } client.apps().deployments().inNamespace(namespace).resource(deployment).create(); @@ -218,7 +253,10 @@ public Deployment initPodTemplateWithoutConfig( String resourceId, String startUpArgs, long memory, - List imagePullSecretsList) { + List imagePullSecretsList, + String serviceAccount) { + + long terminationGraceSeconds = resolveTerminationGracePeriodSeconds(startUpArgs); DeploymentBuilder deploymentBuilder = new DeploymentBuilder() @@ -234,11 +272,13 @@ public Deployment initPodTemplateWithoutConfig( .addToLabels("AmoroResourceId", resourceId) .endMetadata() .withNewSpec() + .withTerminationGracePeriodSeconds(terminationGraceSeconds) + .withServiceAccountName(serviceAccount) .addNewContainer() .withName("optimizer") .withImage(image) .withImagePullPolicy(pullPolicy) - .withCommand("sh", "-c", startUpArgs) + .withCommand("sh", "-c", "exec " + startUpArgs) .withResources( new ResourceRequirementsBuilder() .withLimits( @@ -305,7 +345,7 @@ public Deployment initPodTemplateFromFrontEnd( container.setName("optimizer"); container.setImage(image); container.setImagePullPolicy(pullPolicy); - container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", startUpArgs))); + container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", "exec " + startUpArgs))); ResourceRequirements resourceRequirements = new ResourceRequirements(); resourceRequirements.setLimits( @@ -324,6 +364,13 @@ public Deployment initPodTemplateFromFrontEnd( podTemplate.getTemplate().getSpec().setImagePullSecrets(imagePullSecretsList); } + if (podTemplate.getTemplate().getSpec().getTerminationGracePeriodSeconds() == null) { + podTemplate + .getTemplate() + .getSpec() + .setTerminationGracePeriodSeconds(resolveTerminationGracePeriodSeconds(startUpArgs)); + } + DeploymentSpec deploymentSpec = new DeploymentSpec(); deploymentSpec.setTemplate(podTemplate.getTemplate()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestKubernetesOptimizerContainer.java b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestKubernetesOptimizerContainer.java index 4178870526..7ddc7b308a 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestKubernetesOptimizerContainer.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestKubernetesOptimizerContainer.java @@ -278,6 +278,189 @@ public void testBuildPodTemplateConfig() { .toString()); } + @Test + public void testContainerCommandUsesExecForSignalForwarding() { + ResourceType resourceType = ResourceType.OPTIMIZER; + Map properties = Maps.newHashMap(); + properties.put("memory", "1024"); + Resource resource = + new Resource.Builder("KubernetesContainer", "k8s", resourceType) + .setMemoryMb(1024) + .setThreadCount(1) + .setProperties(properties) + .build(); + groupProperties.putAll(resource.getProperties()); + + Map argsList = + kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties); + String image = argsList.get(IMAGE).toString(); + String pullPolicy = argsList.get(PULL_POLICY).toString(); + List imagePullSecretsList = + (List) argsList.get(PULL_SECRETS); + int cpuLimit = (int) argsList.get("cpuLimit"); + long memory = (long) argsList.get(MEMORY_PROPERTY); + String groupName = argsList.get("groupName").toString(); + String resourceId = argsList.get("resourceId").toString(); + String startUpArgs = argsList.get("startUpArgs").toString(); + + Deployment deployment = + kubernetesOptimizerContainer.initPodTemplateWithoutConfig( + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList, + null); + + List command = + deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand(); + Assert.assertEquals(Arrays.asList("sh", "-c", "exec " + startUpArgs), command); + } + + @Test + public void testContainerCommandUsesExecWithPodTemplate() { + PodTemplate podTemplate = + kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties); + + ResourceType resourceType = ResourceType.OPTIMIZER; + Map properties = Maps.newHashMap(); + properties.put("memory", "1024"); + Resource resource = + new Resource.Builder("KubernetesContainer", "k8s", resourceType) + .setMemoryMb(1024) + .setThreadCount(1) + .setProperties(properties) + .build(); + groupProperties.putAll(resource.getProperties()); + + Map argsList = + kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties); + String image = argsList.get(IMAGE).toString(); + String pullPolicy = argsList.get(PULL_POLICY).toString(); + List imagePullSecretsList = + (List) argsList.get(PULL_SECRETS); + int cpuLimit = (int) argsList.get("cpuLimit"); + long memory = (long) argsList.get(MEMORY_PROPERTY); + String groupName = argsList.get("groupName").toString(); + String resourceId = argsList.get("resourceId").toString(); + String startUpArgs = argsList.get("startUpArgs").toString(); + + Deployment deployment = + kubernetesOptimizerContainer.initPodTemplateFromFrontEnd( + podTemplate, + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList); + + List command = + deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand(); + Assert.assertEquals(Arrays.asList("sh", "-c", "exec " + startUpArgs), command); + } + + @Test + public void testTerminationGracePeriodFromDefaultShutdownTimeout() { + ResourceType resourceType = ResourceType.OPTIMIZER; + Map properties = Maps.newHashMap(); + properties.put("memory", "1024"); + Resource resource = + new Resource.Builder("KubernetesContainer", "k8s", resourceType) + .setMemoryMb(1024) + .setThreadCount(1) + .setProperties(properties) + .build(); + groupProperties.putAll(resource.getProperties()); + + Map argsList = + kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties); + String image = argsList.get(IMAGE).toString(); + String pullPolicy = argsList.get(PULL_POLICY).toString(); + List imagePullSecretsList = + (List) argsList.get(PULL_SECRETS); + int cpuLimit = (int) argsList.get("cpuLimit"); + long memory = (long) argsList.get(MEMORY_PROPERTY); + String groupName = argsList.get("groupName").toString(); + String resourceId = argsList.get("resourceId").toString(); + String startUpArgs = argsList.get("startUpArgs").toString(); + + Deployment deployment = + kubernetesOptimizerContainer.initPodTemplateWithoutConfig( + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList, + null); + + Long grace = deployment.getSpec().getTemplate().getSpec().getTerminationGracePeriodSeconds(); + Assert.assertNotNull(grace); + // default shutdown-timeout = 600_000ms → 600s + 30s buffer + Assert.assertEquals(630L, grace.longValue()); + } + + @Test + public void testTerminationGracePeriodFromShutdownTimeoutArg() { + String startUpArgs = "/entrypoint.sh optimizer 1024 -a thrift://x:1261 -p 1 -st 120000"; + long grace = KubernetesOptimizerContainer.resolveTerminationGracePeriodSeconds(startUpArgs); + // 120_000ms → 120s + 30s buffer + Assert.assertEquals(150L, grace); + } + + @Test + public void testTerminationGracePeriodFromUserPodTemplateRespected() { + PodTemplate podTemplate = + kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties); + podTemplate.getTemplate().getSpec().setTerminationGracePeriodSeconds(900L); + + ResourceType resourceType = ResourceType.OPTIMIZER; + Map properties = Maps.newHashMap(); + properties.put("memory", "1024"); + Resource resource = + new Resource.Builder("KubernetesContainer", "k8s", resourceType) + .setMemoryMb(1024) + .setThreadCount(1) + .setProperties(properties) + .build(); + groupProperties.putAll(resource.getProperties()); + + Map argsList = + kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties); + String image = argsList.get(IMAGE).toString(); + String pullPolicy = argsList.get(PULL_POLICY).toString(); + List imagePullSecretsList = + (List) argsList.get(PULL_SECRETS); + int cpuLimit = (int) argsList.get("cpuLimit"); + long memory = (long) argsList.get(MEMORY_PROPERTY); + String groupName = argsList.get("groupName").toString(); + String resourceId = argsList.get("resourceId").toString(); + String startUpArgs = argsList.get("startUpArgs").toString(); + + Deployment deployment = + kubernetesOptimizerContainer.initPodTemplateFromFrontEnd( + podTemplate, + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList); + + Long grace = deployment.getSpec().getTemplate().getSpec().getTerminationGracePeriodSeconds(); + Assert.assertEquals(Long.valueOf(900L), grace); + } + @Test public void testAMSWithConfigMap() throws Exception { ConfigMap configMap = buildConfigMap(); diff --git a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java index dd4a6f624e..3c5e7f9008 100644 --- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java +++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java @@ -49,4 +49,6 @@ public class OptimizerProperties { public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout"; public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min"; public static final String OPTIMIZER_MASTER_SLAVE_MODE_ENABLED = "master-slave-mode-enabled"; + public static final String OPTIMIZER_SHUTDOWN_TIMEOUT_MS = "shutdown-timeout-ms"; + public static final long OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT = 600_000L; // 10 min } diff --git a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java index af002bc0fd..942e2813f5 100644 --- a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java +++ b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java @@ -345,7 +345,7 @@ protected void waitAShortTime(long waitTime) { try { TimeUnit.MILLISECONDS.sleep(waitTime); } catch (InterruptedException e) { - // ignore + Thread.currentThread().interrupt(); } } diff --git a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/Optimizer.java b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/Optimizer.java index fbff34fc46..9c5dfe6004 100644 --- a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/Optimizer.java +++ b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/Optimizer.java @@ -33,6 +33,7 @@ public class Optimizer { private final OptimizerConfig config; private final OptimizerToucher toucher; private final OptimizerExecutor[] executors; + private volatile Thread[] executorThreads; public Optimizer(OptimizerConfig config) { this(config, () -> new OptimizerToucher(config), (i) -> new OptimizerExecutor(config, i)); @@ -54,20 +55,70 @@ protected Optimizer( public void startOptimizing() { LOG.info("Starting optimizer with configuration:{}", config); - Arrays.stream(executors) + executorThreads = new Thread[executors.length]; + IntStream.range(0, executors.length) .forEach( - optimizerExecutor -> { - new Thread( - optimizerExecutor::start, - String.format("Optimizer-executor-%d", optimizerExecutor.getThreadId())) - .start(); + i -> { + executorThreads[i] = + new Thread( + executors[i]::start, + String.format("Optimizer-executor-%d", executors[i].getThreadId())); + executorThreads[i].start(); }); toucher.withTokenChangeListener(new SetTokenToExecutors()).start(); } public void stopOptimizing() { - toucher.stop(); + LOG.info("Stopping optimizer, waiting for in-progress tasks to complete..."); + // Stop executors first so they don't poll new tasks, but keep the toucher alive + // so it keeps sending heartbeats. Otherwise AMS hits its heartbeat-timeout during + // long-running in-flight tasks, unregisters this optimizer, and the subsequent + // best-effort completeTask fails with "Optimizer has not been authenticated". Arrays.stream(executors).forEach(OptimizerExecutor::stop); + + Thread[] threads = executorThreads; + if (threads == null) { + toucher.stop(); + LOG.info("Optimizer stopped (no executor threads to wait for)"); + return; + } + + long shutdownTimeoutMs = config.getShutdownTimeoutMs(); + long deadline = System.currentTimeMillis() + shutdownTimeoutMs; + for (Thread t : threads) { + if (t == null) { + continue; + } + long remaining = deadline - System.currentTimeMillis(); + if (remaining <= 0) { + break; + } + try { + t.join(remaining); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for executor thread {} to finish", t.getName()); + Thread.currentThread().interrupt(); + break; + } + } + + for (Thread t : threads) { + if (t != null && t.isAlive()) { + LOG.warn( + "Executor thread {} did not terminate within {}ms timeout, force-interrupting", + t.getName(), + shutdownTimeoutMs); + t.interrupt(); + try { + t.join(1_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + toucher.stop(); + LOG.info("Optimizer stopped"); } public OptimizerToucher getToucher() { diff --git a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java index 2c70a2652a..ebe8c63d71 100644 --- a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java +++ b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java @@ -109,6 +109,15 @@ public class OptimizerConfig implements Serializable { usage = "Enable master-slave mode") private boolean masterSlaveMode = false; + @Option( + name = "-st", + aliases = "--" + OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS, + usage = + "Graceful shutdown timeout(ms) — wait this long for in-progress tasks before " + + "force-interrupting. Default 600000 (10min). Align with K8s " + + "terminationGracePeriodSeconds in containerized deployments.") + private long shutdownTimeoutMs = OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT; + public OptimizerConfig() {} public OptimizerConfig(String[] args) throws CmdLineException { @@ -228,6 +237,14 @@ public void setMasterSlaveMode(boolean masterSlaveMode) { this.masterSlaveMode = masterSlaveMode; } + public long getShutdownTimeoutMs() { + return shutdownTimeoutMs; + } + + public void setShutdownTimeoutMs(long shutdownTimeoutMs) { + this.shutdownTimeoutMs = shutdownTimeoutMs; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -245,6 +262,7 @@ public String toString() { .add("cacheMaxEntrySize", cacheMaxEntrySize) .add("cacheTimeout", cacheTimeout) .add("masterSlaveMode", masterSlaveMode) + .add("shutdownTimeoutMs", shutdownTimeoutMs) .toString(); } } diff --git a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java index 26eda902cc..04b177b7df 100644 --- a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java +++ b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java @@ -20,6 +20,7 @@ import org.apache.amoro.api.OptimizingTask; import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.client.OptimizingClientPools; import org.apache.amoro.io.reader.DeleteCache; import org.apache.amoro.optimizing.OptimizingExecutor; import org.apache.amoro.optimizing.OptimizingExecutorFactory; @@ -250,12 +251,23 @@ protected OptimizingTaskResult executeTask(OptimizingTask task) { protected void completeTask(String amsUrl, OptimizingTaskResult optimizingTaskResult) { try { - callAuthenticatedAms( - amsUrl, - (client, token) -> { - client.completeTask(token, optimizingTaskResult); - return null; - }); + if (isStarted()) { + callAuthenticatedAms( + amsUrl, + (client, token) -> { + client.completeTask(token, optimizingTaskResult); + return null; + }); + } else { + // After shutdown was requested the gated retry loop in callAuthenticatedAms exits + // immediately, which would silently drop the in-flight task result. Make a single + // best-effort direct call so graceful shutdown still reports completed work to AMS. + String token = getToken(); + if (token == null) { + throw new TException("Cannot complete task during shutdown: token unavailable"); + } + OptimizingClientPools.getClient(amsUrl).completeTask(token, optimizingTaskResult); + } LOG.info( "Optimizer executor[{}] completed task[{}](status: {}) to AMS {}", threadId, diff --git a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java index 9fd24e22ec..a3b2c08a00 100644 --- a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java +++ b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java @@ -35,6 +35,7 @@ public class OptimizerToucher extends AbstractOptimizerOperator { private transient TokenChangeListener tokenChangeListener; private final Map registerProperties = Maps.newHashMap(); private final long startTime; + private transient volatile Thread runnerThread; public OptimizerToucher(OptimizerConfig config) { super(config); @@ -54,19 +55,36 @@ public OptimizerToucher withRegisterProperty(String name, String value) { public void start() { LOG.info("Starting optimizer toucher with configuration:{}", getConfig()); - while (isStarted()) { - try { - if (checkToken()) { - touch(); + runnerThread = Thread.currentThread(); + try { + while (isStarted()) { + try { + if (checkToken()) { + touch(); + } + waitAShortTime(getConfig().getHeartBeat()); + } catch (Throwable t) { + LOG.error("Optimizer toucher got an unexpected error", t); } - waitAShortTime(getConfig().getHeartBeat()); - } catch (Throwable t) { - LOG.error("Optimizer toucher got an unexpected error", t); } + } finally { + runnerThread = null; } LOG.info("Optimizer toucher stopped"); } + @Override + public void stop() { + super.stop(); + // Wake the runner immediately if it is sleeping in waitAShortTime, so the heartbeat + // loop terminates without waiting up to one full heartbeat interval. waitAShortTime + // preserves the interrupt flag so the loop exits cleanly on its next isStarted() check. + Thread t = runnerThread; + if (t != null) { + t.interrupt(); + } + } + private boolean checkToken() { if (!tokenIsReady()) { LOG.info( diff --git a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizer.java b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizer.java index 79c42b78b2..0f055f1add 100644 --- a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizer.java +++ b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizer.java @@ -54,4 +54,70 @@ public void testStartOptimizer() throws InterruptedException { Assert.assertEquals(2, taskResults.size()); optimizer.stopOptimizing(); } + + @Test + public void testGracefulShutdown() throws InterruptedException { + OptimizerConfig optimizerConfig = + OptimizerTestHelpers.buildOptimizerConfig(TEST_AMS.getServerUrl()); + Optimizer optimizer = new Optimizer(optimizerConfig); + Thread optimizerThread = new Thread(optimizer::startOptimizing); + optimizerThread.start(); + TimeUnit.SECONDS.sleep(1); + + TEST_AMS + .getOptimizerHandler() + .offerTask(TestOptimizerExecutor.TestOptimizingInput.successInput(1).toTask(0, 0)); + TimeUnit.MILLISECONDS.sleep(OptimizerTestHelpers.CALL_AMS_INTERVAL * 5); + + optimizer.stopOptimizing(); + optimizerThread.join(5000); + + Assert.assertFalse("Optimizer thread should have terminated", optimizerThread.isAlive()); + + String token = optimizer.getToucher().getToken(); + List taskResults = + TEST_AMS.getOptimizerHandler().getCompletedTasks().get(token); + Assert.assertNotNull("Task results should be reported before shutdown", taskResults); + Assert.assertEquals(1, taskResults.size()); + } + + @Test + public void testGracefulShutdownWaitsForInProgressTask() throws InterruptedException { + OptimizerConfig optimizerConfig = + OptimizerTestHelpers.buildOptimizerConfig(TEST_AMS.getServerUrl()); + Optimizer optimizer = new Optimizer(optimizerConfig); + Thread optimizerThread = new Thread(optimizer::startOptimizing); + optimizerThread.start(); + TimeUnit.SECONDS.sleep(1); + + long taskExecutionMs = 3_000; + TEST_AMS + .getOptimizerHandler() + .offerTask( + TestOptimizerExecutor.TestOptimizingInput.slowSuccessInput(1, taskExecutionMs) + .toTask(0, 0)); + + // Wait long enough for the executor to poll, ack and start executing the task, + // but short enough that it is still in the middle of execute() when we call stop. + TimeUnit.MILLISECONDS.sleep(OptimizerTestHelpers.CALL_AMS_INTERVAL * 3); + + long startStop = System.currentTimeMillis(); + optimizer.stopOptimizing(); + long elapsed = System.currentTimeMillis() - startStop; + optimizerThread.join(5_000); + + Assert.assertFalse("Optimizer thread should have terminated", optimizerThread.isAlive()); + Assert.assertTrue( + "stopOptimizing should block until in-progress task completes (elapsed=" + elapsed + "ms)", + elapsed >= 1_000); + + String token = optimizer.getToucher().getToken(); + List taskResults = + TEST_AMS.getOptimizerHandler().getCompletedTasks().get(token); + Assert.assertNotNull("Task results should be reported before shutdown", taskResults); + Assert.assertEquals(1, taskResults.size()); + Assert.assertNull( + "In-progress task must complete successfully, not be interrupted", + taskResults.get(0).getErrorMessage()); + } } diff --git a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java index 5bacb73645..3fb652a577 100644 --- a/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java +++ b/amoro-optimizer/amoro-optimizer-common/src/test/java/org/apache/amoro/optimizer/common/TestOptimizerExecutor.java @@ -106,10 +106,16 @@ public void testExecuteTaskFailed() throws InterruptedException, TException { public static class TestOptimizingInput extends BaseOptimizingInput { private final int inputId; private final boolean executeSuccess; + private final long executionTimeMs; private TestOptimizingInput(int inputId, boolean executeSuccess) { + this(inputId, executeSuccess, 0L); + } + + private TestOptimizingInput(int inputId, boolean executeSuccess, long executionTimeMs) { this.inputId = inputId; this.executeSuccess = executeSuccess; + this.executionTimeMs = executionTimeMs; } public static TestOptimizingInput successInput(int inputId) { @@ -120,10 +126,18 @@ public static TestOptimizingInput failedInput(int inputId) { return new TestOptimizingInput(inputId, false); } + public static TestOptimizingInput slowSuccessInput(int inputId, long executionTimeMs) { + return new TestOptimizingInput(inputId, true, executionTimeMs); + } + private int inputId() { return inputId; } + private long executionTimeMs() { + return executionTimeMs; + } + public OptimizingTask toTask(long processId, int taskId) { OptimizingTask optimizingTask = new OptimizingTask(new OptimizingTaskId(processId, taskId)); optimizingTask.setTaskInput(SerializationUtil.simpleSerialize(this)); @@ -157,6 +171,14 @@ private TestOptimizingExecutor(TestOptimizingInput input) { @Override public TestOptimizingOutput execute() { + if (input.executionTimeMs() > 0) { + try { + Thread.sleep(input.executionTimeMs()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Task was interrupted before completion", e); + } + } if (input.executeSuccess) { return new TestOptimizingOutput(input.inputId()); } else { diff --git a/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizer.java b/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizer.java index 2eb38c0283..0e67830bf3 100644 --- a/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizer.java +++ b/amoro-optimizer/amoro-optimizer-spark/src/main/java/org/apache/amoro/optimizer/spark/SparkOptimizer.java @@ -22,12 +22,15 @@ import org.apache.amoro.optimizer.common.OptimizerConfig; import org.apache.amoro.optimizer.common.OptimizerToucher; import org.apache.amoro.resource.Resource; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + /** The {@code SparkOptimizer} acts as an entrypoint of the spark program */ public class SparkOptimizer extends Optimizer { private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizer.class); @@ -65,6 +68,29 @@ public static void main(String[] args) throws Exception { OptimizerToucher toucher = optimizer.getToucher(); toucher.withRegisterProperty(Resource.PROPERTY_JOB_ID, spark.sparkContext().applicationId()); + // Register with Hadoop's ShutdownHookManager so that our hook runs to completion + // before downstream cleanup. ShutdownHookManager runs hooks in priority descending + // order, sequentially. Two cleanups must come AFTER our graceful shutdown: + // - Hadoop FileSystem cache close (priority FS_CACHE = 10) — would otherwise + // close in-flight HDFS writers and cause ClosedChannelException on flush. + // - SparkContext.stop (Spark's SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50) — would + // otherwise tear down executors mid-task, failing in-flight RDD actions. + // Use SPARK_CONTEXT_SHUTDOWN_PRIORITY + 10 to guarantee both ordering constraints + // in a single value (60 > 50 > 10). + // Pass an explicit timeout — the 2-arg overload uses hadoop.service.shutdown.timeout + // (default 30s), which would cancel our hook well before stopOptimizing's + // shutdownTimeoutMs deadline. + int shutdownPriority = + org.apache.spark.util.ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY() + 10; + ShutdownHookManager.get() + .addShutdownHook( + () -> { + LOG.info("Received shutdown signal, initiating graceful shutdown..."); + optimizer.stopOptimizing(); + }, + shutdownPriority, + config.getShutdownTimeoutMs() + 10_000L, + TimeUnit.MILLISECONDS); LOG.info("Starting the spark optimizer with configuration:{}", config); optimizer.startOptimizing(); } diff --git a/amoro-optimizer/amoro-optimizer-standalone/src/main/java/org/apache/amoro/optimizer/standalone/StandaloneOptimizer.java b/amoro-optimizer/amoro-optimizer-standalone/src/main/java/org/apache/amoro/optimizer/standalone/StandaloneOptimizer.java index b38d0fc42b..cc7723a80a 100644 --- a/amoro-optimizer/amoro-optimizer-standalone/src/main/java/org/apache/amoro/optimizer/standalone/StandaloneOptimizer.java +++ b/amoro-optimizer/amoro-optimizer-standalone/src/main/java/org/apache/amoro/optimizer/standalone/StandaloneOptimizer.java @@ -21,13 +21,20 @@ import org.apache.amoro.optimizer.common.Optimizer; import org.apache.amoro.optimizer.common.OptimizerConfig; import org.apache.amoro.resource.Resource; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.util.ShutdownHookManager; import org.kohsuke.args4j.CmdLineException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; +import java.util.concurrent.TimeUnit; public class StandaloneOptimizer { + private static final Logger LOG = LoggerFactory.getLogger(StandaloneOptimizer.class); + public static void main(String[] args) throws CmdLineException { OptimizerConfig optimizerConfig = new OptimizerConfig(args); Optimizer optimizer = new Optimizer(optimizerConfig); @@ -39,6 +46,23 @@ public static void main(String[] args) throws CmdLineException { RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); String processId = runtimeMXBean.getName().split("@")[0]; optimizer.getToucher().withRegisterProperty(Resource.PROPERTY_JOB_ID, processId); + + // Register with Hadoop's ShutdownHookManager (priority > FS_CACHE) so that our hook + // runs to completion before Hadoop closes cached FileSystems. JVM Runtime shutdown + // hooks fire concurrently and lose this race, causing in-flight writers to hit + // ClosedChannelException during row-group flush. + // Pass an explicit timeout — the 2-arg overload uses hadoop.service.shutdown.timeout + // (default 30s), which would cancel our hook well before stopOptimizing's + // shutdownTimeoutMs deadline. + ShutdownHookManager.get() + .addShutdownHook( + () -> { + LOG.info("Received shutdown signal, initiating graceful shutdown..."); + optimizer.stopOptimizing(); + }, + FileSystem.SHUTDOWN_HOOK_PRIORITY + 10, + optimizerConfig.getShutdownTimeoutMs() + 10_000L, + TimeUnit.MILLISECONDS); optimizer.startOptimizing(); } } diff --git a/dist/src/main/amoro-bin/bin/optimizer.sh b/dist/src/main/amoro-bin/bin/optimizer.sh index 8e01c619c6..30591bd907 100755 --- a/dist/src/main/amoro-bin/bin/optimizer.sh +++ b/dist/src/main/amoro-bin/bin/optimizer.sh @@ -96,7 +96,7 @@ start() { } start-foreground() { - $CMDS + exec $CMDS } #0:pid bad and proc OK; 1:pid ok and proc bad; 2:pid bad