Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -68,13 +69,21 @@ public class KubernetesOptimizerContainer extends AbstractOptimizerContainer {
public static final String PODTEMPLATE = "podTemplate";
public static final String PULL_SECRETS = "imagePullSecrets";
public static final String KUBE_CONFIG_PATH = "kube-config-path";
public static final String SERVICE_ACCOUNT = "serviceAccount";

private static final String NAME_PREFIX = "amoro-optimizer-";

private static final String KUBERNETES_NAME_PROPERTIES = "name";

private static final String EXTRA_PROPERTY_PREFIX = "extra.";

/**
* Buffer added on top of optimizer's shutdown-timeout when deriving K8s
* terminationGracePeriodSeconds, to leave room for thread join, best-effort task result
* reporting, and log flush before SIGKILL.
*/
static final long TERMINATION_GRACE_BUFFER_SECONDS = 30L;

private static final Map<String, String> EXTRA_PROPERTY_DEFAULTS = new HashMap<>();

static {
Expand All @@ -86,6 +95,30 @@ private String getExtraProperty(Map<String, String> properties, String key) {
EXTRA_PROPERTY_PREFIX + key, EXTRA_PROPERTY_DEFAULTS.getOrDefault(key, null));
}

static long parseShutdownTimeoutMs(String startUpArgs) {
if (startUpArgs == null) {
return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT;
}
String[] tokens = startUpArgs.split("\\s+");
String longFlag = "--" + OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS;
for (int i = 0; i < tokens.length - 1; i++) {
if ("-st".equals(tokens[i]) || longFlag.equals(tokens[i])) {
try {
return Long.parseLong(tokens[i + 1]);
} catch (NumberFormatException e) {
return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT;
}
}
}
return OptimizerProperties.OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT;
}

static long resolveTerminationGracePeriodSeconds(String startUpArgs) {
long shutdownTimeoutMs = parseShutdownTimeoutMs(startUpArgs);
long shutdownTimeoutSeconds = (shutdownTimeoutMs + 999L) / 1000L;
return shutdownTimeoutSeconds + TERMINATION_GRACE_BUFFER_SECONDS;
}

private KubernetesClient client;

@Override
Expand Down Expand Up @@ -138,6 +171,7 @@ protected Map<String, String> doScaleOut(Resource resource) {
memory,
imagePullSecretsList);
} else {
String serviceAccount = groupProperties.get(SERVICE_ACCOUNT);
deployment =
initPodTemplateWithoutConfig(
image,
Expand All @@ -147,7 +181,8 @@ protected Map<String, String> doScaleOut(Resource resource) {
resourceId,
startUpArgs,
memory,
imagePullSecretsList);
imagePullSecretsList,
serviceAccount);
}

client.apps().deployments().inNamespace(namespace).resource(deployment).create();
Expand Down Expand Up @@ -218,7 +253,10 @@ public Deployment initPodTemplateWithoutConfig(
String resourceId,
String startUpArgs,
long memory,
List<LocalObjectReference> imagePullSecretsList) {
List<LocalObjectReference> imagePullSecretsList,
String serviceAccount) {

long terminationGraceSeconds = resolveTerminationGracePeriodSeconds(startUpArgs);

DeploymentBuilder deploymentBuilder =
new DeploymentBuilder()
Expand All @@ -234,11 +272,13 @@ public Deployment initPodTemplateWithoutConfig(
.addToLabels("AmoroResourceId", resourceId)
.endMetadata()
.withNewSpec()
.withTerminationGracePeriodSeconds(terminationGraceSeconds)
.withServiceAccountName(serviceAccount)
.addNewContainer()
.withName("optimizer")
.withImage(image)
.withImagePullPolicy(pullPolicy)
.withCommand("sh", "-c", startUpArgs)
.withCommand("sh", "-c", "exec " + startUpArgs)
.withResources(
new ResourceRequirementsBuilder()
.withLimits(
Expand Down Expand Up @@ -305,7 +345,7 @@ public Deployment initPodTemplateFromFrontEnd(
container.setName("optimizer");
container.setImage(image);
container.setImagePullPolicy(pullPolicy);
container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", startUpArgs)));
container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", "exec " + startUpArgs)));

ResourceRequirements resourceRequirements = new ResourceRequirements();
resourceRequirements.setLimits(
Expand All @@ -324,6 +364,13 @@ public Deployment initPodTemplateFromFrontEnd(
podTemplate.getTemplate().getSpec().setImagePullSecrets(imagePullSecretsList);
}

if (podTemplate.getTemplate().getSpec().getTerminationGracePeriodSeconds() == null) {
podTemplate
.getTemplate()
.getSpec()
.setTerminationGracePeriodSeconds(resolveTerminationGracePeriodSeconds(startUpArgs));
}

DeploymentSpec deploymentSpec = new DeploymentSpec();
deploymentSpec.setTemplate(podTemplate.getTemplate());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,189 @@ public void testBuildPodTemplateConfig() {
.toString());
}

@Test
public void testContainerCommandUsesExecForSignalForwarding() {
ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateWithoutConfig(
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList,
null);

List<String> command =
deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand();
Assert.assertEquals(Arrays.asList("sh", "-c", "exec " + startUpArgs), command);
}

@Test
public void testContainerCommandUsesExecWithPodTemplate() {
PodTemplate podTemplate =
kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties);

ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateFromFrontEnd(
podTemplate,
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);

List<String> command =
deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand();
Assert.assertEquals(Arrays.asList("sh", "-c", "exec " + startUpArgs), command);
}

@Test
public void testTerminationGracePeriodFromDefaultShutdownTimeout() {
ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateWithoutConfig(
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList,
null);

Long grace = deployment.getSpec().getTemplate().getSpec().getTerminationGracePeriodSeconds();
Assert.assertNotNull(grace);
// default shutdown-timeout = 600_000ms → 600s + 30s buffer
Assert.assertEquals(630L, grace.longValue());
}

@Test
public void testTerminationGracePeriodFromShutdownTimeoutArg() {
String startUpArgs = "/entrypoint.sh optimizer 1024 -a thrift://x:1261 -p 1 -st 120000";
long grace = KubernetesOptimizerContainer.resolveTerminationGracePeriodSeconds(startUpArgs);
// 120_000ms → 120s + 30s buffer
Assert.assertEquals(150L, grace);
}

@Test
public void testTerminationGracePeriodFromUserPodTemplateRespected() {
PodTemplate podTemplate =
kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties);
podTemplate.getTemplate().getSpec().setTerminationGracePeriodSeconds(900L);

ResourceType resourceType = ResourceType.OPTIMIZER;
Map<String, String> properties = Maps.newHashMap();
properties.put("memory", "1024");
Resource resource =
new Resource.Builder("KubernetesContainer", "k8s", resourceType)
.setMemoryMb(1024)
.setThreadCount(1)
.setProperties(properties)
.build();
groupProperties.putAll(resource.getProperties());

Map<String, Object> argsList =
kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

Deployment deployment =
kubernetesOptimizerContainer.initPodTemplateFromFrontEnd(
podTemplate,
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);

Long grace = deployment.getSpec().getTemplate().getSpec().getTerminationGracePeriodSeconds();
Assert.assertEquals(Long.valueOf(900L), grace);
}

@Test
public void testAMSWithConfigMap() throws Exception {
ConfigMap configMap = buildConfigMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ public class OptimizerProperties {
public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min";
public static final String OPTIMIZER_MASTER_SLAVE_MODE_ENABLED = "master-slave-mode-enabled";
public static final String OPTIMIZER_SHUTDOWN_TIMEOUT_MS = "shutdown-timeout-ms";
public static final long OPTIMIZER_SHUTDOWN_TIMEOUT_MS_DEFAULT = 600_000L; // 10 min
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ protected void waitAShortTime(long waitTime) {
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
// ignore
Thread.currentThread().interrupt();
}
}

Expand Down
Loading
Loading