diff --git a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md index 92d5199d559ab..ec199e13a4702 100644 --- a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md +++ b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md @@ -338,6 +338,116 @@ $ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\ The env variable `SECRET_USERNAME` contains the username and the env variable `SECRET_PASSWORD` contains the password of the secret `mysecret`. For more details see the [official Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables). +### 挂载持久卷声明(PVC) + +[Kubernetes PersistentVolumeClaims (PVCs)](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) 提供了一种在 Kubernetes 中请求和使用持久存储的方式。 +Flink on Kubernetes 支持通过配置选项直接将 PVC 挂载到 JobManager 和 TaskManager Pod 中。 + +#### 配置选项 + +Flink 提供了两个用于挂载 PVC 的配置选项: + + + + + + + + + + + + + + + + + + + + + + + + +
配置选项类型默认值描述
kubernetes.persistent-volume-claimsMap<String, String>(无)用户指定的 PersistentVolumeClaims,将被挂载到 Flink 容器中。 + 值的格式为 pvc-name:/mount/path。 + 多个 PVC 可以用逗号分隔。
kubernetes.persistent-volume-claims.read-onlyBooleanfalse是否以只读模式挂载 PersistentVolumeClaims。 + 设置为 true 时,所有 PVC 将以只读方式挂载。
+ +#### 使用示例 + +以下命令将 PVC `checkpoint-pvc` 挂载到启动 Pod 的 `/opt/flink/checkpoints` 路径: + +```bash +$ ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id=my-session-cluster \ + -Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints +``` + +您可以通过逗号分隔来挂载多个 PVC: + +```bash +$ ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id=my-session-cluster \ + -Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data +``` + +以只读模式挂载 PVC(适用于共享参考数据): + +```bash +$ ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id=my-session-cluster \ + -Dkubernetes.persistent-volume-claims=shared-data:/opt/flink/shared \ + -Dkubernetes.persistent-volume-claims.read-only=true +``` + +#### 示例:使用 PVC 进行 Checkpoint 存储 + +一个常见的用例是使用 PVC 存储 checkpoint。首先,在 Kubernetes 集群中创建一个 PVC: + +```yaml +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: flink-checkpoints-pvc +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 10Gi + storageClassName: standard +``` + +然后启动一个挂载 PVC 用于 checkpoint 存储的 Flink 集群: + +```bash +$ ./bin/flink run \ + --target kubernetes-application \ + -Dkubernetes.cluster-id=my-checkpoint-cluster \ + -Dkubernetes.container.image=flink:latest \ + -Dkubernetes.persistent-volume-claims=flink-checkpoints-pvc:/opt/flink/checkpoints \ + -Dstate.checkpoints.dir=file:///opt/flink/checkpoints \ + local:///opt/flink/usrlib/my-flink-job.jar +``` + +#### 前提条件 + +- PVC 必须在 Flink 集群部署之前存在于同一命名空间中。 +- PVC 必须具有适当的访问模式: + - **ReadWriteOnce (RWO)**:单个 Pod 访问。适用于独立的 JobManager 部署。 + - **ReadWriteMany (RWX)**:多个 Pod 访问同一存储。推荐用于高可用(HA)配置或当 JobManager 和 TaskManager 都需要写入同一存储时。 + - **ReadOnlyMany (ROX)**:多个 Pod 只读访问。适用于共享参考数据。 + +{{< hint warning >}} +对于具有多个 JobManager 的高可用(HA)配置,或当 JobManager 和 TaskManager 都需要对同一存储进行写入访问时,请确保您的 PVC 支持 ReadWriteMany (RWX) 访问模式。在此类场景下使用 ReadWriteOnce (RWO) 可能会导致挂载失败或 I/O 错误。 +{{< /hint >}} + +{{< hint info >}} +`kubernetes.persistent-volume-claims.read-only` 选项会全局应用于所有配置的 PVC。如果您需要为不同的 PVC 设置不同的访问模式(例如:checkpoint 使用读写模式,而参考数据使用只读模式),请使用 [Pod 模板]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#pod-template) 来定义细粒度的卷配置。 +{{< /hint >}} + ### High-Availability on Kubernetes For high availability on Kubernetes, you can use the [existing high availability services]({{< ref "docs/deployment/ha/overview" >}}). diff --git a/docs/content/docs/deployment/resource-providers/native_kubernetes.md b/docs/content/docs/deployment/resource-providers/native_kubernetes.md index 47f713cfb9df9..c8c06188296b3 100644 --- a/docs/content/docs/deployment/resource-providers/native_kubernetes.md +++ b/docs/content/docs/deployment/resource-providers/native_kubernetes.md @@ -350,6 +350,116 @@ $ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\ The env variable `SECRET_USERNAME` contains the username and the env variable `SECRET_PASSWORD` contains the password of the secret `mysecret`. For more details see the [official Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables). +### Mounting Persistent Volume Claims + +[Kubernetes PersistentVolumeClaims (PVCs)](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) provide a way to request and use persistent storage in Kubernetes. +Flink on Kubernetes supports mounting PVCs directly to JobManager and TaskManager pods via configuration options. + +#### Configuration Options + +Flink provides two configuration options for mounting PVCs: + + + + + + + + + + + + + + + + + + + + + + + + +
Configuration OptionTypeDefaultDescription
kubernetes.persistent-volume-claimsMap<String, String>(none)The user-specified PersistentVolumeClaims that will be mounted into Flink containers. + The value should be in the form of pvc-name:/mount/path. + Multiple PVCs can be specified by separating them with commas.
kubernetes.persistent-volume-claims.read-onlyBooleanfalseWhether to mount PersistentVolumeClaims as read-only. + When set to true, all PVCs will be mounted as read-only.
+ +#### Usage Examples + +The following command will mount the PVC `checkpoint-pvc` to the path `/opt/flink/checkpoints` in the started pods: + +```bash +$ ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id=my-session-cluster \ + -Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints +``` + +You can mount multiple PVCs by separating them with commas: + +```bash +$ ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id=my-session-cluster \ + -Dkubernetes.persistent-volume-claims=checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data +``` + +To mount PVCs as read-only (useful for shared reference data): + +```bash +$ ./bin/kubernetes-session.sh \ + -Dkubernetes.cluster-id=my-session-cluster \ + -Dkubernetes.persistent-volume-claims=shared-data:/opt/flink/shared \ + -Dkubernetes.persistent-volume-claims.read-only=true +``` + +#### Example: Using PVC for Checkpoint Storage + +A common use case is to use PVC for storing checkpoints. First, create a PVC in your Kubernetes cluster: + +```yaml +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: flink-checkpoints-pvc +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 10Gi + storageClassName: standard +``` + +Then start a Flink cluster with the PVC mounted for checkpoint storage: + +```bash +$ ./bin/flink run \ + --target kubernetes-application \ + -Dkubernetes.cluster-id=my-checkpoint-cluster \ + -Dkubernetes.container.image=flink:latest \ + -Dkubernetes.persistent-volume-claims=flink-checkpoints-pvc:/opt/flink/checkpoints \ + -Dstate.checkpoints.dir=file:///opt/flink/checkpoints \ + local:///opt/flink/usrlib/my-flink-job.jar +``` + +#### Prerequisites + +- The PVCs must exist in the same namespace as the Flink cluster before deployment. +- The PVCs must have appropriate access modes: + - **ReadWriteOnce (RWO)**: For single pod access. Suitable for standalone JobManager deployments. + - **ReadWriteMany (RWX)**: For multiple pods accessing the same storage. Recommended for HA setups or when both JobManager and TaskManagers need to write to the same storage. + - **ReadOnlyMany (ROX)**: For read-only access from multiple pods. Suitable for shared reference data. + +{{< hint warning >}} +For high availability (HA) setups with multiple JobManagers or when both JobManager and TaskManagers need write access to the same storage, ensure your PVC supports ReadWriteMany (RWX) access mode. Using ReadWriteOnce (RWO) in such scenarios may cause mount failures or I/O errors. +{{< /hint >}} + +{{< hint info >}} +The `kubernetes.persistent-volume-claims.read-only` option applies globally to all configured PVCs. If you need different access modes for different PVCs (e.g., read-write for checkpoints but read-only for reference data), use [Pod Templates]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#pod-template) instead to define fine-grained volume configurations. +{{< /hint >}} + ### High-Availability on Kubernetes For high availability on Kubernetes, you can use the [existing high availability services]({{< ref "docs/deployment/ha/overview" >}}). diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 5180b199160b9..c1b20abfdd532 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -586,6 +586,84 @@ public class KubernetesConfigOptions { "The node label whose value is the same as the node name. " + "Currently, this will only be used to set the node affinity of TM pods to avoid being scheduled on blocked nodes."); + /** + * The user-specified PersistentVolumeClaims (PVCs) that will be mounted into Flink containers. + * + *

The value should be in the form of {@code pvc-name:/mount/path} separated by commas. + * Multiple PVCs can be specified by separating them with commas. + * + *

Example: {@code checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data} + * + *

Prerequisites: + * + *

+ * + *

Common use cases: + * + *

+ */ + public static final ConfigOption> KUBERNETES_PERSISTENT_VOLUME_CLAIMS = + key("kubernetes.persistent-volume-claims") + .mapType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "The user-specified %s that will be mounted into Flink containers. " + + "The value should be in the form of %s. " + + "Multiple PVCs can be specified, for example: %s. " + + "The PVCs must exist in the same namespace as the Flink cluster before deployment. " + + "For HA setups with multiple JobManagers or TaskManagers accessing the same storage, " + + "use PVCs with ReadWriteMany (RWX) or ReadOnlyMany (ROX) access modes.", + link( + "https://kubernetes.io/docs/concepts/storage/persistent-volumes/", + "PersistentVolumeClaims (PVCs)"), + code("pvc-name:/mount/path"), + code( + "checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data")) + .build()); + + /** + * Whether to mount PersistentVolumeClaims (PVCs) as read-only. + * + *

When set to true, all PVCs configured via {@link #KUBERNETES_PERSISTENT_VOLUME_CLAIMS} + * will be mounted as read-only. This is useful when the PVC contains shared data that should + * not be modified by Flink, such as reference datasets or pre-trained models. + * + *

Note: This setting applies globally to all PVCs configured via {@link + * #KUBERNETES_PERSISTENT_VOLUME_CLAIMS}. If you need different access modes for different PVCs, + * consider using pod templates instead. + * + *

Default: false (read-write mode) + */ + public static final ConfigOption KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY = + key("kubernetes.persistent-volume-claims.read-only") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Whether to mount PersistentVolumeClaims (PVCs) as read-only. " + + "When set to true, all PVCs configured via '%s' will be mounted as read-only. " + + "This is useful for shared data that should not be modified by Flink, " + + "such as reference datasets or pre-trained models. " + + "Note: This setting applies globally to all configured PVCs.", + text(KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key())) + .build()); + private static String getDefaultFlinkImage() { // The default container image that ties to the exact needed versions of both Flink and // Scala. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecorator.java new file mode 100644 index 0000000000000..c12aafc7a05e1 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecorator.java @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimVolumeSourceBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Decorator for mounting Kubernetes PersistentVolumeClaims (PVCs) to JobManager and TaskManager + * pods. + * + *

This decorator allows users to attach pre-existing PVCs to Flink pods, enabling persistent + * storage for checkpoints, savepoints, or other data that needs to survive pod restarts. + * + *

Configuration

+ * + *

Users can configure PVC mounting through the following configuration options: + * + *

+ * + *

Usage Example

+ * + *
{@code
+ * # Mount a single PVC for checkpoint storage
+ * kubernetes.persistent-volume-claims: checkpoint-pvc:/opt/flink/checkpoints
+ *
+ * # Mount multiple PVCs
+ * kubernetes.persistent-volume-claims: checkpoint-pvc:/opt/flink/checkpoints,data-pvc:/opt/flink/data
+ *
+ * # Mount PVCs as read-only
+ * kubernetes.persistent-volume-claims.read-only: true
+ * }
+ * + *

Validation

+ * + *

This decorator validates that: + * + *

+ * + *

Volume Name Generation

+ * + *

Volume names are generated from PVC names by: + * + *

    + *
  1. Replacing '.' with '-' (DNS-1123 label requirement) + *
  2. Adding '-pvc' suffix + *
  3. Truncating to 63 characters if necessary (with hash suffix for uniqueness) + *
+ * + *

Important Notes

+ * + * + * + * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIMS + * @see KubernetesConfigOptions#KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY + */ +@Internal +public class PersistentVolumeClaimMountDecorator extends AbstractKubernetesStepDecorator { + + private static final Logger LOG = + LoggerFactory.getLogger(PersistentVolumeClaimMountDecorator.class); + + /** Suffix appended to sanitized PVC names to generate volume names. */ + @VisibleForTesting static final String VOLUME_NAME_SUFFIX = "-pvc"; + + /** + * Maximum length for Kubernetes volume names (DNS-1123 label standard). Must be 63 characters + * or less. + */ + @VisibleForTesting static final int MAX_VOLUME_NAME_LENGTH = 63; + + /** + * Maximum length for Kubernetes resource names (DNS-1123 subdomain standard). Must be 253 + * characters or less. + */ + @VisibleForTesting static final int MAX_PVC_NAME_LENGTH = 253; + + /** Length reserved for hash suffix when truncating volume names. */ + private static final int HASH_SUFFIX_LENGTH = 6; // "-" + 5 hex chars + + /** + * Pattern for validating DNS-1123 subdomain names. Used for PVC names. Must consist of lower + * case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric + * character. + */ + @VisibleForTesting + static final Pattern DNS_1123_SUBDOMAIN_PATTERN = + Pattern.compile("^[a-z0-9]([-a-z0-9.]*[a-z0-9])?$"); + + /** + * Pattern for validating DNS-1123 label names. Used for volume names. Must consist of lower + * case alphanumeric characters or '-', and must start and end with an alphanumeric character. + */ + @VisibleForTesting + static final Pattern DNS_1123_LABEL_PATTERN = + Pattern.compile("^[a-z0-9]([-a-z0-9]*[a-z0-9])?$"); + + private final Map pvcNamesToMountPaths; + private final boolean readOnly; + + /** + * Creates a new PersistentVolumeClaimMountDecorator. + * + * @param kubernetesComponentConf the Kubernetes parameters containing PVC configuration + * @throws IllegalArgumentException if the PVC configuration is invalid + */ + public PersistentVolumeClaimMountDecorator( + AbstractKubernetesParameters kubernetesComponentConf) { + checkNotNull(kubernetesComponentConf, "kubernetesComponentConf must not be null"); + this.pvcNamesToMountPaths = kubernetesComponentConf.getPersistentVolumeClaimsToMountPaths(); + this.readOnly = kubernetesComponentConf.isPersistentVolumeClaimReadOnly(); + + // Validate PVC configuration + validatePvcConfiguration(pvcNamesToMountPaths); + } + + /** + * Validates the PVC configuration. + * + * @param pvcNamesToMountPaths the map of PVC names to mount paths + * @throws IllegalArgumentException if validation fails + */ + @VisibleForTesting + static void validatePvcConfiguration(Map pvcNamesToMountPaths) { + if (pvcNamesToMountPaths.isEmpty()) { + return; + } + + Set mountPaths = new HashSet<>(); + Set volumeNames = new HashSet<>(); + + for (Map.Entry entry : pvcNamesToMountPaths.entrySet()) { + String pvcName = entry.getKey(); + String mountPath = entry.getValue(); + + // Validate PVC name (DNS-1123 subdomain) + validatePvcName(pvcName); + + // Validate mount path + validateMountPath(mountPath, mountPaths); + mountPaths.add(mountPath); + + // Check for volume name conflicts (after sanitization) + String volumeName = getVolumeName(pvcName); + checkArgument( + !volumeNames.contains(volumeName), + "Volume name conflict detected: PVC '%s' generates volume name '%s' which conflicts with another PVC. " + + "Please use more distinct PVC names.", + pvcName, + volumeName); + volumeNames.add(volumeName); + } + } + + /** + * Validates that a PVC name conforms to DNS-1123 subdomain standard. + * + *

DNS-1123 subdomain allows: + * + *

    + *
  • Lowercase alphanumeric characters, '-' and '.' + *
  • Must start and end with an alphanumeric character + *
  • Maximum length of 253 characters + *
+ * + * @param pvcName the PVC name to validate + * @throws IllegalArgumentException if the PVC name is invalid + */ + @VisibleForTesting + static void validatePvcName(String pvcName) { + checkArgument( + pvcName != null && !pvcName.trim().isEmpty(), + "PVC name must not be null or empty."); + + checkArgument( + pvcName.length() <= MAX_PVC_NAME_LENGTH, + "PVC name '%s' exceeds maximum length of %d characters.", + pvcName, + MAX_PVC_NAME_LENGTH); + + checkArgument( + DNS_1123_SUBDOMAIN_PATTERN.matcher(pvcName).matches(), + "PVC name '%s' is invalid. Must conform to DNS-1123 subdomain standard: " + + "lowercase alphanumeric characters, '-' or '.', must start and end with alphanumeric.", + pvcName); + } + + /** + * Validates the mount path. + * + * @param mountPath the mount path to validate + * @param existingPaths existing mount paths to check for duplicates + * @throws IllegalArgumentException if the mount path is invalid + */ + @VisibleForTesting + static void validateMountPath(String mountPath, Set existingPaths) { + checkArgument( + mountPath != null && !mountPath.trim().isEmpty(), + "Mount path must not be null or empty."); + + checkArgument( + mountPath.startsWith("/"), + "Mount path '%s' must be an absolute path starting with '/'.", + mountPath); + + checkArgument( + !existingPaths.contains(mountPath), + "Duplicate mount path detected: '%s'. Each PVC must have a unique mount path.", + mountPath); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + if (pvcNamesToMountPaths.isEmpty()) { + return flinkPod; + } + + // Check for conflicts with existing volumes (fail-fast for volume name conflicts) + checkVolumeConflicts(flinkPod); + + final Pod podWithVolumes = decoratePod(flinkPod.getPodWithoutMainContainer()); + final Container containerWithMounts = decorateMainContainer(flinkPod.getMainContainer()); + + return new FlinkPod.Builder(flinkPod) + .withPod(podWithVolumes) + .withMainContainer(containerWithMounts) + .build(); + } + + /** + * Checks for conflicts between PVC volumes and existing Pod volumes/mounts. + * + *

For volume name conflicts, this method throws an exception (fail-fast) because Kubernetes + * does not allow duplicate volume names in a Pod spec. For mount path conflicts, this method + * logs a warning since Kubernetes allows multiple volume mounts, though the behavior may be + * unexpected. + * + * @param flinkPod the FlinkPod to check + * @throws IllegalArgumentException if a volume name conflict is detected + */ + private void checkVolumeConflicts(FlinkPod flinkPod) { + // Get existing volume names from pod spec + Set existingVolumeNames = new HashSet<>(); + if (flinkPod.getPodWithoutMainContainer().getSpec() != null + && flinkPod.getPodWithoutMainContainer().getSpec().getVolumes() != null) { + flinkPod.getPodWithoutMainContainer().getSpec().getVolumes().stream() + .map(Volume::getName) + .forEach(existingVolumeNames::add); + } + + // Get existing mount paths from container + Set existingMountPaths = new HashSet<>(); + if (flinkPod.getMainContainer().getVolumeMounts() != null) { + flinkPod.getMainContainer().getVolumeMounts().stream() + .map(VolumeMount::getMountPath) + .forEach(existingMountPaths::add); + } + + // Check for conflicts + for (Map.Entry entry : pvcNamesToMountPaths.entrySet()) { + String volumeName = getVolumeName(entry.getKey()); + String mountPath = entry.getValue(); + + // Volume name conflict: fail-fast (K8s does not allow duplicate volume names) + checkArgument( + !existingVolumeNames.contains(volumeName), + "Volume name conflict: PVC '%s' generates volume name '%s' which already exists in the Pod spec. " + + "This conflict may be caused by Pod Template configuration. " + + "Please use a different PVC name or remove the conflicting volume from Pod Template.", + entry.getKey(), + volumeName); + + // Mount path conflict: warn only (K8s allows but behavior may be unexpected) + if (existingMountPaths.contains(mountPath)) { + LOG.warn( + "Mount path '{}' for PVC '{}' conflicts with an existing mount in the Pod spec. " + + "The PVC mount will be added alongside the existing mount. " + + "Consider using unique mount paths to avoid unexpected behavior.", + mountPath, + entry.getKey()); + } + } + } + + /** + * Decorates the main container with volume mounts for all configured PVCs. + * + * @param container the original main container + * @return a new container with PVC volume mounts added + */ + private Container decorateMainContainer(Container container) { + final List volumeMounts = buildVolumeMounts(); + return new ContainerBuilder(container).addAllToVolumeMounts(volumeMounts).build(); + } + + /** + * Decorates the pod specification with volumes for all configured PVCs. + * + * @param pod the original pod without main container + * @return a new pod with PVC volumes added + */ + private Pod decoratePod(Pod pod) { + final List volumes = buildVolumes(); + return new PodBuilder(pod).editOrNewSpec().addAllToVolumes(volumes).endSpec().build(); + } + + /** + * Builds the list of volume mounts for all configured PVCs. + * + * @return list of volume mounts + */ + @VisibleForTesting + List buildVolumeMounts() { + return pvcNamesToMountPaths.entrySet().stream() + .map(this::buildVolumeMount) + .collect(Collectors.toList()); + } + + /** + * Builds a single volume mount for a PVC. + * + * @param pvcNameToMountPath entry containing PVC name and mount path + * @return the volume mount + */ + private VolumeMount buildVolumeMount(Map.Entry pvcNameToMountPath) { + return new VolumeMountBuilder() + .withName(getVolumeName(pvcNameToMountPath.getKey())) + .withMountPath(pvcNameToMountPath.getValue()) + .withReadOnly(readOnly) + .build(); + } + + /** + * Builds the list of volumes for all configured PVCs. + * + * @return list of volumes + */ + @VisibleForTesting + List buildVolumes() { + return pvcNamesToMountPaths.keySet().stream() + .map(this::buildVolume) + .collect(Collectors.toList()); + } + + /** + * Builds a single volume for a PVC. + * + * @param pvcName the name of the PVC + * @return the volume + */ + private Volume buildVolume(String pvcName) { + return new VolumeBuilder() + .withName(getVolumeName(pvcName)) + .withPersistentVolumeClaim( + new PersistentVolumeClaimVolumeSourceBuilder() + .withClaimName(pvcName) + .withReadOnly(readOnly) + .build()) + .build(); + } + + /** + * Generates a valid Kubernetes volume name from the PVC name. + * + *

Volume names must conform to DNS-1123 label standard: + * + *

    + *
  • Lowercase alphanumeric characters or '-' + *
  • Must start and end with an alphanumeric character + *
  • Maximum 63 characters + *
+ * + *

The conversion process: + * + *

    + *
  1. Replace '.' with '-' (dots are allowed in PVC names but not in volume names) + *
  2. Append '-pvc' suffix + *
  3. If length exceeds 63, truncate and append hash for uniqueness + *
+ * + * @param pvcName the name of the PVC + * @return a valid volume name + */ + @VisibleForTesting + static String getVolumeName(String pvcName) { + // Sanitize: replace '.' with '-' to conform to DNS-1123 label + String sanitized = pvcName.replace('.', '-'); + + // Add suffix + String volumeName = sanitized + VOLUME_NAME_SUFFIX; + + // Truncate if exceeds max length + if (volumeName.length() > MAX_VOLUME_NAME_LENGTH) { + volumeName = truncateWithHash(sanitized, pvcName); + } + + // Final validation: ensure it doesn't end with '-' + while (volumeName.endsWith("-")) { + volumeName = volumeName.substring(0, volumeName.length() - 1); + } + + return volumeName; + } + + /** + * Truncates the volume name and appends a hash suffix for uniqueness. + * + *

Format: {truncated-name}-{hash}-pvc + * + * @param sanitized the sanitized PVC name (with '.' replaced by '-') + * @param original the original PVC name (used for hash calculation) + * @return truncated volume name with hash suffix + */ + private static String truncateWithHash(String sanitized, String original) { + // Calculate hash from original PVC name for uniqueness + String hash = String.format("%05x", original.hashCode() & 0xFFFFF); + + // Calculate available length for the base name + // Format: {base}-{hash}-pvc, where hash is 5 chars + int availableLength = + MAX_VOLUME_NAME_LENGTH - HASH_SUFFIX_LENGTH - VOLUME_NAME_SUFFIX.length(); + + // Truncate base name + String truncatedBase = + sanitized.substring(0, Math.min(sanitized.length(), availableLength)); + + // Remove trailing hyphens from truncated base + while (truncatedBase.endsWith("-")) { + truncatedBase = truncatedBase.substring(0, truncatedBase.length() - 1); + } + + return truncatedBase + "-" + hash.toLowerCase(Locale.ROOT) + VOLUME_NAME_SUFFIX; + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java index fd8f50b6476ca..48c36cc8e469d 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java @@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.PersistentVolumeClaimMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.PodTemplateMountDecorator; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesOwnerReference; @@ -72,6 +73,8 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi new InitJobManagerDecorator(kubernetesJobManagerParameters), new EnvSecretsDecorator(kubernetesJobManagerParameters), new MountSecretsDecorator(kubernetesJobManagerParameters), + new PersistentVolumeClaimMountDecorator( + kubernetesJobManagerParameters), new CmdJobManagerDecorator(kubernetesJobManagerParameters), new InternalServiceDecorator(kubernetesJobManagerParameters), new ExternalServiceDecorator(kubernetesJobManagerParameters))); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java index 368850d3ba81f..05ed8c91cbf4d 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator; import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator; +import org.apache.flink.kubernetes.kubeclient.decorators.PersistentVolumeClaimMountDecorator; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters; import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod; import org.apache.flink.util.Preconditions; @@ -55,6 +56,8 @@ public static KubernetesPod buildTaskManagerKubernetesPod( new InitTaskManagerDecorator(kubernetesTaskManagerParameters), new EnvSecretsDecorator(kubernetesTaskManagerParameters), new MountSecretsDecorator(kubernetesTaskManagerParameters), + new PersistentVolumeClaimMountDecorator( + kubernetesTaskManagerParameters), new CmdTaskManagerDecorator(kubernetesTaskManagerParameters))); Configuration configuration = kubernetesTaskManagerParameters.getFlinkConfiguration(); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java index da74e9d12585e..46c83b28b503c 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java @@ -205,4 +205,26 @@ public List> getEnvironmentsFromSecrets() { public boolean isHostNetworkEnabled() { return flinkConfig.get(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED); } + + /** + * Gets the map of PVC names to their mount paths. + * + * @return a map where keys are PVC names and values are mount paths; empty map if not + * configured + */ + public Map getPersistentVolumeClaimsToMountPaths() { + return flinkConfig + .getOptional(KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS) + .orElse(Collections.emptyMap()); + } + + /** + * Checks whether PVCs should be mounted as read-only. + * + * @return true if PVCs should be mounted as read-only, false otherwise + */ + public boolean isPersistentVolumeClaimReadOnly() { + return flinkConfig.get( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecoratorTest.java new file mode 100644 index 0000000000000..2b91839d4d5e6 --- /dev/null +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/PersistentVolumeClaimMountDecoratorTest.java @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import org.apache.flink.kubernetes.VolumeTestUtils; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** General tests for the {@link PersistentVolumeClaimMountDecorator}. */ +class PersistentVolumeClaimMountDecoratorTest extends KubernetesJobManagerTestBase { + + private static final String PVC_NAME_1 = "checkpoint-pvc"; + private static final String PVC_MOUNT_PATH_1 = "/opt/flink/checkpoints"; + private static final String PVC_NAME_2 = "data-pvc"; + private static final String PVC_MOUNT_PATH_2 = "/opt/flink/data"; + + private PersistentVolumeClaimMountDecorator pvcDecorator; + + @Override + protected void setupFlinkConfig() { + super.setupFlinkConfig(); + // Configure single PVC by default + this.flinkConfig.setString( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key(), + PVC_NAME_1 + ":" + PVC_MOUNT_PATH_1); + } + + @Override + protected void onSetup() throws Exception { + super.onSetup(); + this.pvcDecorator = new PersistentVolumeClaimMountDecorator(kubernetesJobManagerParameters); + } + + @Test + void testNoPvcConfigured() { + // Reset config to have no PVC + this.flinkConfig.removeConfig(KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS); + this.pvcDecorator = new PersistentVolumeClaimMountDecorator(kubernetesJobManagerParameters); + + final FlinkPod resultFlinkPod = pvcDecorator.decorateFlinkPod(baseFlinkPod); + + // Should return the same pod without modifications + assertThat(resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes()) + .isEqualTo(baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes()); + assertThat(resultFlinkPod.getMainContainer().getVolumeMounts()) + .isEqualTo(baseFlinkPod.getMainContainer().getVolumeMounts()); + } + + @Test + void testSinglePvcMount() { + final FlinkPod resultFlinkPod = pvcDecorator.decorateFlinkPod(baseFlinkPod); + + final String expectedVolumeName = + PVC_NAME_1 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX; + + // Verify volume is added to pod + assertThat( + VolumeTestUtils.podHasVolume( + resultFlinkPod.getPodWithoutMainContainer(), expectedVolumeName)) + .isTrue(); + + // Verify volume mount is added to container + assertThat( + VolumeTestUtils.containerHasVolume( + resultFlinkPod.getMainContainer(), + expectedVolumeName, + PVC_MOUNT_PATH_1)) + .isTrue(); + + // Verify PVC claim name in volume + final Volume volume = + resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().stream() + .filter(v -> v.getName().equals(expectedVolumeName)) + .findFirst() + .orElseThrow(() -> new AssertionError("Volume not found")); + assertThat(volume.getPersistentVolumeClaim().getClaimName()).isEqualTo(PVC_NAME_1); + } + + @Test + void testMultiplePvcMount() { + // Configure multiple PVCs + this.flinkConfig.setString( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key(), + PVC_NAME_1 + ":" + PVC_MOUNT_PATH_1 + "," + PVC_NAME_2 + ":" + PVC_MOUNT_PATH_2); + this.pvcDecorator = new PersistentVolumeClaimMountDecorator(kubernetesJobManagerParameters); + + final FlinkPod resultFlinkPod = pvcDecorator.decorateFlinkPod(baseFlinkPod); + + final String expectedVolumeName1 = + PVC_NAME_1 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX; + final String expectedVolumeName2 = + PVC_NAME_2 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX; + + // Verify both volumes are added + assertThat( + VolumeTestUtils.podHasVolume( + resultFlinkPod.getPodWithoutMainContainer(), expectedVolumeName1)) + .isTrue(); + assertThat( + VolumeTestUtils.podHasVolume( + resultFlinkPod.getPodWithoutMainContainer(), expectedVolumeName2)) + .isTrue(); + + // Verify both volume mounts are added + assertThat( + VolumeTestUtils.containerHasVolume( + resultFlinkPod.getMainContainer(), + expectedVolumeName1, + PVC_MOUNT_PATH_1)) + .isTrue(); + assertThat( + VolumeTestUtils.containerHasVolume( + resultFlinkPod.getMainContainer(), + expectedVolumeName2, + PVC_MOUNT_PATH_2)) + .isTrue(); + } + + @Test + void testReadOnlyMount() { + // Configure PVC with read-only flag + this.flinkConfig.set( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY, true); + this.pvcDecorator = new PersistentVolumeClaimMountDecorator(kubernetesJobManagerParameters); + + final FlinkPod resultFlinkPod = pvcDecorator.decorateFlinkPod(baseFlinkPod); + + final String expectedVolumeName = + PVC_NAME_1 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX; + + // Verify volume mount is read-only + final VolumeMount volumeMount = + resultFlinkPod.getMainContainer().getVolumeMounts().stream() + .filter(vm -> vm.getName().equals(expectedVolumeName)) + .findFirst() + .orElseThrow(() -> new AssertionError("VolumeMount not found")); + assertThat(volumeMount.getReadOnly()).isTrue(); + + // Verify PVC volume source is read-only + final Volume volume = + resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().stream() + .filter(v -> v.getName().equals(expectedVolumeName)) + .findFirst() + .orElseThrow(() -> new AssertionError("Volume not found")); + assertThat(volume.getPersistentVolumeClaim().getReadOnly()).isTrue(); + } + + @Test + void testReadWriteMount() { + // Ensure read-only is false (default) + this.flinkConfig.set( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIM_READ_ONLY, false); + this.pvcDecorator = new PersistentVolumeClaimMountDecorator(kubernetesJobManagerParameters); + + final FlinkPod resultFlinkPod = pvcDecorator.decorateFlinkPod(baseFlinkPod); + + final String expectedVolumeName = + PVC_NAME_1 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX; + + // Verify volume mount is not read-only + final VolumeMount volumeMount = + resultFlinkPod.getMainContainer().getVolumeMounts().stream() + .filter(vm -> vm.getName().equals(expectedVolumeName)) + .findFirst() + .orElseThrow(() -> new AssertionError("VolumeMount not found")); + assertThat(volumeMount.getReadOnly()).isFalse(); + } + + @Test + void testVolumeNameGeneration() { + final String pvcName = "my-test-pvc"; + final String expectedVolumeName = + pvcName + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX; + + assertThat(PersistentVolumeClaimMountDecorator.getVolumeName(pvcName)) + .isEqualTo(expectedVolumeName); + } + + @Test + void testBuildVolumeMounts() { + final List volumeMounts = pvcDecorator.buildVolumeMounts(); + + assertThat(volumeMounts).hasSize(1); + assertThat(volumeMounts.get(0).getName()) + .isEqualTo(PVC_NAME_1 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX); + assertThat(volumeMounts.get(0).getMountPath()).isEqualTo(PVC_MOUNT_PATH_1); + } + + @Test + void testBuildVolumes() { + final List volumes = pvcDecorator.buildVolumes(); + + assertThat(volumes).hasSize(1); + assertThat(volumes.get(0).getName()) + .isEqualTo(PVC_NAME_1 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX); + assertThat(volumes.get(0).getPersistentVolumeClaim().getClaimName()).isEqualTo(PVC_NAME_1); + } + + // ==================== PVC Name Validation Tests (DNS-1123 Subdomain) ==================== + + @Test + void testValidatePvcName_ValidNames() { + // These should not throw - DNS-1123 subdomain allows '-' and '.' + PersistentVolumeClaimMountDecorator.validatePvcName("my-pvc"); + PersistentVolumeClaimMountDecorator.validatePvcName("pvc1"); + PersistentVolumeClaimMountDecorator.validatePvcName("a"); + PersistentVolumeClaimMountDecorator.validatePvcName("my-long-pvc-name-123"); + PersistentVolumeClaimMountDecorator.validatePvcName("a1b2c3"); + // DNS-1123 subdomain allows dots + PersistentVolumeClaimMountDecorator.validatePvcName("pvc.data-01"); + PersistentVolumeClaimMountDecorator.validatePvcName("my.pvc.name"); + // Long names up to 253 chars are allowed for subdomain + PersistentVolumeClaimMountDecorator.validatePvcName("a".repeat(253)); + } + + @Test + void testValidatePvcName_NullOrEmpty() { + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null or empty"); + + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName("")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null or empty"); + + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName(" ")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null or empty"); + } + + @Test + void testValidatePvcName_InvalidDns1123Subdomain() { + // Uppercase letters + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName("My-PVC")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("DNS-1123 subdomain standard"); + + // Starts with hyphen + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName("-my-pvc")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("DNS-1123 subdomain standard"); + + // Ends with hyphen + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName("my-pvc-")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("DNS-1123 subdomain standard"); + + // Contains underscore + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName("my_pvc")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("DNS-1123 subdomain standard"); + + // Starts with dot + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName(".my-pvc")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("DNS-1123 subdomain standard"); + + // Ends with dot + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName("my-pvc.")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("DNS-1123 subdomain standard"); + } + + @Test + void testValidatePvcName_ExceedsMaxLength() { + String longName = "a".repeat(254); // 254 chars, exceeds 253 limit + assertThatThrownBy(() -> PersistentVolumeClaimMountDecorator.validatePvcName(longName)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exceeds maximum length"); + } + + // ==================== Mount Path Validation Tests ==================== + + @Test + void testValidateMountPath_ValidPaths() { + HashSet existingPaths = new HashSet<>(); + // These should not throw + PersistentVolumeClaimMountDecorator.validateMountPath("/opt/flink", existingPaths); + PersistentVolumeClaimMountDecorator.validateMountPath("/", new HashSet<>()); + PersistentVolumeClaimMountDecorator.validateMountPath("/a/b/c/d", new HashSet<>()); + } + + @Test + void testValidateMountPath_NullOrEmpty() { + assertThatThrownBy( + () -> + PersistentVolumeClaimMountDecorator.validateMountPath( + null, new HashSet<>())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null or empty"); + + assertThatThrownBy( + () -> + PersistentVolumeClaimMountDecorator.validateMountPath( + "", new HashSet<>())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null or empty"); + } + + @Test + void testValidateMountPath_NotAbsolute() { + assertThatThrownBy( + () -> + PersistentVolumeClaimMountDecorator.validateMountPath( + "relative/path", new HashSet<>())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be an absolute path"); + } + + @Test + void testValidateMountPath_Duplicate() { + HashSet existingPaths = new HashSet<>(); + existingPaths.add("/opt/flink"); + + assertThatThrownBy( + () -> + PersistentVolumeClaimMountDecorator.validateMountPath( + "/opt/flink", existingPaths)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Duplicate mount path"); + } + + // ==================== Volume Name Generation Tests ==================== + + @Test + void testVolumeNameSanitization_DotsReplacedWithHyphens() { + // PVC name with dots should have them replaced with hyphens in volume name + String pvcWithDots = "my.pvc.name"; + String volumeName = PersistentVolumeClaimMountDecorator.getVolumeName(pvcWithDots); + + // Dots should be replaced with hyphens + assertThat(volumeName).doesNotContain("."); + assertThat(volumeName).isEqualTo("my-pvc-name-pvc"); + } + + @Test + void testVolumeNameTruncationWithHash() { + // PVC name that would result in volume name > 63 chars + String longPvcName = "a".repeat(100); + + String volumeName = PersistentVolumeClaimMountDecorator.getVolumeName(longPvcName); + + // Should be truncated to max 63 chars + assertThat(volumeName.length()) + .isLessThanOrEqualTo(PersistentVolumeClaimMountDecorator.MAX_VOLUME_NAME_LENGTH); + + // Should contain hash suffix for uniqueness + assertThat(volumeName).endsWith("-pvc"); + + // Should not end with hyphen (before suffix) + assertThat(volumeName).doesNotEndWith("--pvc"); + } + + @Test + void testVolumeNameUniquenessWithHash() { + // Two long PVC names that would be truncated to the same prefix without hash + String pvc1 = "a".repeat(60) + "x"; + String pvc2 = "a".repeat(60) + "y"; + + String vol1 = PersistentVolumeClaimMountDecorator.getVolumeName(pvc1); + String vol2 = PersistentVolumeClaimMountDecorator.getVolumeName(pvc2); + + // Volume names should be different due to hash + assertThat(vol1).isNotEqualTo(vol2); + + // Both should be valid length + assertThat(vol1.length()) + .isLessThanOrEqualTo(PersistentVolumeClaimMountDecorator.MAX_VOLUME_NAME_LENGTH); + assertThat(vol2.length()) + .isLessThanOrEqualTo(PersistentVolumeClaimMountDecorator.MAX_VOLUME_NAME_LENGTH); + } + + @Test + void testVolumeNameValidDns1123Label() { + // Even with dots in PVC name, volume name should be valid DNS-1123 label + String pvcWithDots = "my.namespace.pvc-01"; + String volumeName = PersistentVolumeClaimMountDecorator.getVolumeName(pvcWithDots); + + // Should match DNS-1123 label pattern (no dots) + assertThat(volumeName) + .matches(PersistentVolumeClaimMountDecorator.DNS_1123_LABEL_PATTERN.pattern()); + } + + // ==================== Volume Conflict Detection Tests (Fail-Fast) ==================== + + @Test + void testVolumeNameConflict_FailFast() { + // Create a FlinkPod with existing volume that has the same name as our PVC would generate + String conflictingVolumeName = + PVC_NAME_1 + PersistentVolumeClaimMountDecorator.VOLUME_NAME_SUFFIX; + + Volume existingVolume = + new VolumeBuilder() + .withName(conflictingVolumeName) + .withNewEmptyDir() + .endEmptyDir() + .build(); + + Pod podWithConflictingVolume = + new PodBuilder(baseFlinkPod.getPodWithoutMainContainer()) + .editOrNewSpec() + .addToVolumes(existingVolume) + .endSpec() + .build(); + + FlinkPod flinkPodWithConflict = + new FlinkPod.Builder() + .withPod(podWithConflictingVolume) + .withMainContainer(baseFlinkPod.getMainContainer()) + .build(); + + // Should throw IllegalArgumentException due to volume name conflict + assertThatThrownBy(() -> pvcDecorator.decorateFlinkPod(flinkPodWithConflict)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Volume name conflict") + .hasMessageContaining(conflictingVolumeName); + } + + @Test + void testPvcNameWithDotsConflictDetection() { + // Configure PVC with dots + this.flinkConfig.setString( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key(), + "my.pvc.name:/opt/flink/data"); + this.pvcDecorator = new PersistentVolumeClaimMountDecorator(kubernetesJobManagerParameters); + + // Create a FlinkPod with existing volume that matches sanitized name + String sanitizedVolumeName = "my-pvc-name-pvc"; // dots replaced with hyphens + suffix + + Volume existingVolume = + new VolumeBuilder() + .withName(sanitizedVolumeName) + .withNewEmptyDir() + .endEmptyDir() + .build(); + + Pod podWithConflictingVolume = + new PodBuilder(baseFlinkPod.getPodWithoutMainContainer()) + .editOrNewSpec() + .addToVolumes(existingVolume) + .endSpec() + .build(); + + FlinkPod flinkPodWithConflict = + new FlinkPod.Builder() + .withPod(podWithConflictingVolume) + .withMainContainer(baseFlinkPod.getMainContainer()) + .build(); + + // Should throw due to conflict with sanitized volume name + assertThatThrownBy(() -> pvcDecorator.decorateFlinkPod(flinkPodWithConflict)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Volume name conflict"); + } + + // ==================== Configuration Validation Tests ==================== + + @Test + void testValidatePvcConfiguration_ValidConfig() { + Map validPvcs = new HashMap<>(); + validPvcs.put("pvc1", "/path1"); + validPvcs.put("pvc2", "/path2"); + validPvcs.put("pvc.with.dots", "/path3"); + + // Should not throw + PersistentVolumeClaimMountDecorator.validatePvcConfiguration(validPvcs); + } + + @Test + void testValidatePvcConfiguration_EmptyMap() { + // Should not throw + PersistentVolumeClaimMountDecorator.validatePvcConfiguration(Collections.emptyMap()); + } + + @Test + void testInvalidPvcName_FailsOnConstruction() { + // Configure an invalid PVC name + this.flinkConfig.setString( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key(), + "Invalid_PVC_Name:/opt/flink"); + + assertThatThrownBy( + () -> + new PersistentVolumeClaimMountDecorator( + kubernetesJobManagerParameters)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("DNS-1123 subdomain standard"); + } + + @Test + void testInvalidMountPath_FailsOnConstruction() { + // Configure a relative mount path + this.flinkConfig.setString( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key(), + "my-pvc:relative/path"); + + assertThatThrownBy( + () -> + new PersistentVolumeClaimMountDecorator( + kubernetesJobManagerParameters)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be an absolute path"); + } + + @Test + void testDuplicateMountPath_FailsOnConstruction() { + // Configure duplicate mount paths + this.flinkConfig.setString( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key(), + "pvc1:/opt/flink,pvc2:/opt/flink"); + + assertThatThrownBy( + () -> + new PersistentVolumeClaimMountDecorator( + kubernetesJobManagerParameters)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Duplicate mount path"); + } + + @Test + void testPvcWithDotsValidAndMountsCorrectly() { + // Configure PVC with dots (valid DNS-1123 subdomain) + String pvcWithDots = "my.namespace.pvc-data"; + this.flinkConfig.setString( + KubernetesConfigOptions.KUBERNETES_PERSISTENT_VOLUME_CLAIMS.key(), + pvcWithDots + ":" + PVC_MOUNT_PATH_1); + this.pvcDecorator = new PersistentVolumeClaimMountDecorator(kubernetesJobManagerParameters); + + final FlinkPod resultFlinkPod = pvcDecorator.decorateFlinkPod(baseFlinkPod); + + // Volume name should have dots replaced with hyphens + String expectedVolumeName = "my-namespace-pvc-data-pvc"; + + // Verify volume is added with sanitized name + assertThat( + VolumeTestUtils.podHasVolume( + resultFlinkPod.getPodWithoutMainContainer(), expectedVolumeName)) + .isTrue(); + + // Verify PVC claim name is preserved (original with dots) + final Volume volume = + resultFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().stream() + .filter(v -> v.getName().equals(expectedVolumeName)) + .findFirst() + .orElseThrow(() -> new AssertionError("Volume not found")); + assertThat(volume.getPersistentVolumeClaim().getClaimName()).isEqualTo(pvcWithDots); + } +}