diff --git a/pkg/nfs/nfs.go b/pkg/nfs/nfs.go index 919d078d1..ef71de669 100644 --- a/pkg/nfs/nfs.go +++ b/pkg/nfs/nfs.go @@ -113,6 +113,7 @@ func NewDriver(options *DriverOptions) *Driver { n.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{ csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, csi.NodeServiceCapability_RPC_UNKNOWN, }) n.volumeLocks = NewVolumeLocks() diff --git a/pkg/nfs/nodeserver.go b/pkg/nfs/nodeserver.go index eebab4832..fe6bb8d9d 100644 --- a/pkg/nfs/nodeserver.go +++ b/pkg/nfs/nodeserver.go @@ -55,6 +55,7 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV if len(targetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path not provided") } + stagingTargetPath := req.GetStagingTargetPath() lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath) if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired { @@ -105,15 +106,6 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV if baseDir == "" { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare)) } - server = getServerFromSource(server) - source := fmt.Sprintf("%s:%s", server, baseDir) - if subDir != "" { - // replace pv/pvc name namespace metadata in subDir - subDir = replaceWithMap(subDir, subDirReplaceMap) - - source = strings.TrimRight(source, "/") - source = fmt.Sprintf("%s/%s", source, subDir) - } notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) if err != nil { @@ -130,19 +122,72 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV return &csi.NodePublishVolumeResponse{}, nil } - klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions) - execFunc := func() error { - return ns.mounter.Mount(source, targetPath, "nfs", mountOptions) - } - timeoutFunc := func() error { return fmt.Errorf("time out") } - if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil { - if os.IsPermission(err) { - return nil, status.Error(codes.PermissionDenied, err.Error()) + // If stagingTargetPath is provided, bind mount from staging to target + // and remount with security options to ensure they are applied + if stagingTargetPath != "" { + klog.V(2).Infof("NodePublishVolume: volumeID(%v) bind mounting from stagingPath(%s) to targetPath(%s) with mountflags(%v)", volumeID, stagingTargetPath, targetPath, mountOptions) + + // Perform bind mount + if err := ns.mounter.Mount(stagingTargetPath, targetPath, "", []string{"bind"}); err != nil { + if os.IsPermission(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) } - if strings.Contains(err.Error(), "invalid argument") { - return nil, status.Error(codes.InvalidArgument, err.Error()) + + // Remount with security options to ensure they are applied to the bind mount + // Extract security-related mount options that need to be re-applied + securityOpts := []string{"remount"} + for _, opt := range mountOptions { + // Include security options and readonly flag + if opt == "noexec" || opt == "nosuid" || opt == "nodev" || opt == "ro" { + securityOpts = append(securityOpts, opt) + } + } + + // Only remount if there are security options to apply + if len(securityOpts) > 1 { + klog.V(2).Infof("NodePublishVolume: remounting targetPath(%s) with security options(%v)", targetPath, securityOpts) + if err := ns.mounter.Mount("", targetPath, "", securityOpts); err != nil { + // Attempt to cleanup the bind mount on failure + forceUnmounter, ok := ns.mounter.(mount.MounterForceUnmounter) + if ok { + mount.CleanupMountWithForce(targetPath, forceUnmounter, false, 30*time.Second) + } else { + mount.CleanupMountPoint(targetPath, ns.mounter, false) + } + if os.IsPermission(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + } + } else { + // Legacy path: direct NFS mount (for backward compatibility when staging is not used) + server = getServerFromSource(server) + source := fmt.Sprintf("%s:%s", server, baseDir) + if subDir != "" { + // replace pv/pvc name namespace metadata in subDir + subDir = replaceWithMap(subDir, subDirReplaceMap) + + source = strings.TrimRight(source, "/") + source = fmt.Sprintf("%s/%s", source, subDir) + } + + klog.V(2).Infof("NodePublishVolume: volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", volumeID, source, targetPath, mountOptions) + execFunc := func() error { + return ns.mounter.Mount(source, targetPath, "nfs", mountOptions) + } + timeoutFunc := func() error { return fmt.Errorf("time out") } + if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil { + if os.IsPermission(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + if strings.Contains(err.Error(), "invalid argument") { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) } - return nil, status.Error(codes.Internal, err.Error()) } if mountPermissions > 0 { @@ -152,7 +197,7 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV } else { klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath) } - klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath) + klog.V(2).Infof("volume(%s) mounted to %s successfully", volumeID, targetPath) return &csi.NodePublishVolumeResponse{}, nil } @@ -286,13 +331,135 @@ func (ns *NodeServer) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolu } // NodeUnstageVolume unstage volume -func (ns *NodeServer) NodeUnstageVolume(_ context.Context, _ *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *NodeServer) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + volumeID := req.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + stagingTargetPath := req.GetStagingTargetPath() + if len(stagingTargetPath) == 0 { + return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request") + } + + lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath) + if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired { + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID) + } + defer ns.Driver.volumeLocks.Release(lockKey) + + klog.V(2).Infof("NodeUnstageVolume: unmounting volume %s on %s", volumeID, stagingTargetPath) + var err error + extensiveMountPointCheck := true + forceUnmounter, ok := ns.mounter.(mount.MounterForceUnmounter) + if ok { + klog.V(2).Infof("force unmount %s on %s", volumeID, stagingTargetPath) + err = mount.CleanupMountWithForce(stagingTargetPath, forceUnmounter, extensiveMountPointCheck, 30*time.Second) + } else { + err = mount.CleanupMountPoint(stagingTargetPath, ns.mounter, extensiveMountPointCheck) + } + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err) + } + klog.V(2).Infof("NodeUnstageVolume: unmount volume %s on %s successfully", volumeID, stagingTargetPath) + + return &csi.NodeUnstageVolumeResponse{}, nil } // NodeStageVolume stage volume -func (ns *NodeServer) NodeStageVolume(_ context.Context, _ *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *NodeServer) NodeStageVolume(_ context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + volCap := req.GetVolumeCapability() + if volCap == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") + } + volumeID := req.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + stagingTargetPath := req.GetStagingTargetPath() + if len(stagingTargetPath) == 0 { + return nil, status.Error(codes.InvalidArgument, "Staging target path not provided") + } + + lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath) + if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired { + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID) + } + defer ns.Driver.volumeLocks.Release(lockKey) + + mountOptions := volCap.GetMount().GetMountFlags() + + var server, baseDir, subDir string + subDirReplaceMap := map[string]string{} + + for k, v := range req.GetVolumeContext() { + switch strings.ToLower(k) { + case paramServer: + server = v + case paramShare: + baseDir = v + case paramSubDir: + subDir = v + case pvcNamespaceKey: + subDirReplaceMap[pvcNamespaceMetadata] = v + case pvcNameKey: + subDirReplaceMap[pvcNameMetadata] = v + case pvNameKey: + subDirReplaceMap[pvNameMetadata] = v + case mountOptionsField: + if v != "" { + mountOptions = append(mountOptions, v) + } + } + } + + if server == "" { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramServer)) + } + if baseDir == "" { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("%v is a required parameter", paramShare)) + } + server = getServerFromSource(server) + source := fmt.Sprintf("%s:%s", server, baseDir) + if subDir != "" { + // replace pv/pvc name namespace metadata in subDir + subDir = replaceWithMap(subDir, subDirReplaceMap) + + source = strings.TrimRight(source, "/") + source = fmt.Sprintf("%s/%s", source, subDir) + } + + notMnt, err := ns.mounter.IsLikelyNotMountPoint(stagingTargetPath) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(stagingTargetPath, os.FileMode(0755)); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + notMnt = true + } else { + return nil, status.Error(codes.Internal, err.Error()) + } + } + if !notMnt { + return &csi.NodeStageVolumeResponse{}, nil + } + + klog.V(2).Infof("NodeStageVolume: volumeID(%v) source(%s) stagingTargetPath(%s) mountflags(%v)", volumeID, source, stagingTargetPath, mountOptions) + execFunc := func() error { + return ns.mounter.Mount(source, stagingTargetPath, "nfs", mountOptions) + } + timeoutFunc := func() error { return fmt.Errorf("time out") } + if err := WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil { + if os.IsPermission(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + if strings.Contains(err.Error(), "invalid argument") { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } // NodeExpandVolume node expand volume diff --git a/pkg/nfs/nodeserver_test.go b/pkg/nfs/nodeserver_test.go index 0ae5bd47c..843f118c0 100644 --- a/pkg/nfs/nodeserver_test.go +++ b/pkg/nfs/nodeserver_test.go @@ -372,3 +372,159 @@ func TestNodeGetVolumeStats(t *testing.T) { err = os.RemoveAll(fakePath) assert.NoError(t, err) } + +func TestNodeStageVolume(t *testing.T) { + ns, err := getTestNodeServer() + if err != nil { + t.Fatalf("%v", err.Error()) + } + + params := map[string]string{ + "server": "server", + "share": "share", + } + + volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER} + stagingTargetPath := testutil.GetWorkDirPath("staging_target_test", t) + lockKey := fmt.Sprintf("%s-%s", "vol_1", stagingTargetPath) + + tests := []struct { + desc string + setup func() + req *csi.NodeStageVolumeRequest + expectedErr error + cleanup func() + }{ + { + desc: "[Error] Volume capabilities missing", + req: &csi.NodeStageVolumeRequest{}, + expectedErr: status.Error(codes.InvalidArgument, "Volume capability missing in request"), + }, + { + desc: "[Error] Volume ID missing", + req: &csi.NodeStageVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}}, + expectedErr: status.Error(codes.InvalidArgument, "Volume ID missing in request"), + }, + { + desc: "[Error] Staging target path missing", + req: &csi.NodeStageVolumeRequest{ + VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, + VolumeId: "vol_1", + }, + expectedErr: status.Error(codes.InvalidArgument, "Staging target path not provided"), + }, + { + desc: "[Error] Volume operation in progress", + setup: func() { + ns.Driver.volumeLocks.TryAcquire(lockKey) + }, + req: &csi.NodeStageVolumeRequest{ + VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, + VolumeId: "vol_1", + VolumeContext: params, + StagingTargetPath: stagingTargetPath, + }, + expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")), + cleanup: func() { + ns.Driver.volumeLocks.Release(lockKey) + }, + }, + { + desc: "[Success] Valid request", + req: &csi.NodeStageVolumeRequest{ + VolumeContext: params, + VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, + VolumeId: "vol_1", + StagingTargetPath: stagingTargetPath, + }, + expectedErr: nil, + }, + } + + // setup + _ = makeDir(stagingTargetPath) + + for _, tc := range tests { + if tc.setup != nil { + tc.setup() + } + _, err := ns.NodeStageVolume(context.Background(), tc.req) + if !reflect.DeepEqual(err, tc.expectedErr) { + t.Errorf("Desc:%v\nUnexpected error: %v\nExpected: %v", tc.desc, err, tc.expectedErr) + } + if tc.cleanup != nil { + tc.cleanup() + } + } + + // Clean up + err = os.RemoveAll(stagingTargetPath) + assert.NoError(t, err) +} + +func TestNodeUnstageVolume(t *testing.T) { + ns, err := getTestNodeServer() + if err != nil { + t.Fatalf("%v", err.Error()) + } + + stagingTargetPath := testutil.GetWorkDirPath("staging_target_test", t) + lockKey := fmt.Sprintf("%s-%s", "vol_1", stagingTargetPath) + + tests := []struct { + desc string + setup func() + req *csi.NodeUnstageVolumeRequest + expectedErr error + cleanup func() + }{ + { + desc: "[Error] Volume ID missing", + req: &csi.NodeUnstageVolumeRequest{StagingTargetPath: stagingTargetPath}, + expectedErr: status.Error(codes.InvalidArgument, "Volume ID missing in request"), + }, + { + desc: "[Error] Staging target path missing", + req: &csi.NodeUnstageVolumeRequest{VolumeId: "vol_1"}, + expectedErr: status.Error(codes.InvalidArgument, "Staging target path missing in request"), + }, + { + desc: "[Error] Volume operation in progress", + setup: func() { + ns.Driver.volumeLocks.TryAcquire(lockKey) + }, + req: &csi.NodeUnstageVolumeRequest{StagingTargetPath: stagingTargetPath, VolumeId: "vol_1"}, + expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")), + cleanup: func() { + ns.Driver.volumeLocks.Release(lockKey) + }, + }, + { + desc: "[Success] Valid request", + req: &csi.NodeUnstageVolumeRequest{StagingTargetPath: stagingTargetPath, VolumeId: "vol_1"}, + expectedErr: nil, + }, + } + + // Setup + _ = makeDir(stagingTargetPath) + + for _, tc := range tests { + if tc.setup != nil { + tc.setup() + } + _, err := ns.NodeUnstageVolume(context.Background(), tc.req) + if !reflect.DeepEqual(err, tc.expectedErr) { + if err == nil || tc.expectedErr == nil || !strings.Contains(err.Error(), tc.expectedErr.Error()) { + t.Errorf("Desc:%v\nUnexpected error: %v\nExpected: %v", tc.desc, err, tc.expectedErr) + } + } + if tc.cleanup != nil { + tc.cleanup() + } + } + + // Clean up + err = os.RemoveAll(stagingTargetPath) + assert.NoError(t, err) +}