CSI 驱动开发指南

2023年 7月 10日 54.1k 0

前言

外部存储接入 Kubernetes 的方式主要有两种:In-Tree 和 Out-of-Tree。其中 In-Tree 是指存储驱动的源码都在 Kubernetes 代码库中,与 Kubernetes 一起发布、迭代、管理,这种方式灵活性较差,且门槛较高。Out-of-Tree 是指存储插件由第三方编写、发布、管理,作为一种扩展与 Kubernetes 配合使用。Out-of-Tree 主要有 FlexVolume 和 CSI 两种实现方式,其中,FlexVolume 因为其命令式的特点,不易维护和管理,从 Kubernetes v1.23 版本开始已被弃用。因此 CSI 已经成为 Kubernetes 存储扩展( Out-of-Tree )的唯一方式。

CSI 组成

csi-architecture

csi-architecture

参考上图(图片出处),通常情况下:CSI Driver = DaemonSet + Deployment(StatefuleSet)。

其中:

  • 绿色部分:Identity、Node、Controller 是需要开发者自己实现的,被称为 Custom Components。
  • 粉色部分:node-driver-registrar、external-attacher、external-provisioner 组件是 Kubernetes 团队开发和维护的,被称为 External Components,它们都是以 sidecar 的形式与 Custom Components 配合使用的。

Custom Components

Custom Components 本质是3个 gRPC Services:

  • Identity Service

    顾名思义,主要用于对外暴露这个插件本身的信息,比如驱动的名称、驱动的能力等:

    service Identity {
      rpc GetPluginInfo(GetPluginInfoRequest)
        returns (GetPluginInfoResponse) {}
    
      rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
        returns (GetPluginCapabilitiesResponse) {}
    
      rpc Probe (ProbeRequest)
        returns (ProbeResponse) {}
    }
    
  • Controller Service

    主要定义一些无需在宿主机上执行的操作,这也是与下文的 Node Service 最根本的区别。以 CreateVolume 为例,k8s 通过调用该方法创建底层存储。比如底层使用了某云供应商的云硬盘服务,开发者在 CreateVolume 方法实现中应该调用云硬盘服务的创建/订购云硬盘的 API,调用 API 这个操作是不需要在特定宿主机上执行的。

    service Controller {
      rpc CreateVolume (CreateVolumeRequest)
        returns (CreateVolumeResponse) {}
    
      rpc DeleteVolume (DeleteVolumeRequest)
        returns (DeleteVolumeResponse) {}
    
      rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
        returns (ControllerPublishVolumeResponse) {}
    
      rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
        returns (ControllerUnpublishVolumeResponse) {}
    
      rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
        returns (ValidateVolumeCapabilitiesResponse) {}
    
      rpc ListVolumes (ListVolumesRequest)
        returns (ListVolumesResponse) {}
    
      rpc GetCapacity (GetCapacityRequest)
        returns (GetCapacityResponse) {}
    
      rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
        returns (ControllerGetCapabilitiesResponse) {}
    
      rpc CreateSnapshot (CreateSnapshotRequest)
        returns (CreateSnapshotResponse) {}
    
      rpc DeleteSnapshot (DeleteSnapshotRequest)
        returns (DeleteSnapshotResponse) {}
    
      rpc ListSnapshots (ListSnapshotsRequest)
        returns (ListSnapshotsResponse) {}
    
      rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
        returns (ControllerExpandVolumeResponse) {}
    
      rpc ControllerGetVolume (ControllerGetVolumeRequest)
        returns (ControllerGetVolumeResponse) {
            option (alpha_method) = true;
        }
    }
    
  • Node Service

    定义了需要在宿主机上执行的操作,比如:mount、unmount。在前面的部署架构图中,Node Service 使用 Daemonset 的方式部署,也是为了确保 Node Service 会被运行在每个节点,以便执行诸如 mount 之类的指令。

    service Node {
      rpc NodeStageVolume (NodeStageVolumeRequest)
        returns (NodeStageVolumeResponse) {}
    
      rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
        returns (NodeUnstageVolumeResponse) {}
    
      rpc NodePublishVolume (NodePublishVolumeRequest)
        returns (NodePublishVolumeResponse) {}
    
      rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
        returns (NodeUnpublishVolumeResponse) {}
    
      rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
        returns (NodeGetVolumeStatsResponse) {}
    
      rpc NodeExpandVolume(NodeExpandVolumeRequest)
        returns (NodeExpandVolumeResponse) {}
    
      rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
        returns (NodeGetCapabilitiesResponse) {}
    
      rpc NodeGetInfo (NodeGetInfoRequest)
        returns (NodeGetInfoResponse) {}
    }
    

以上定义取自 csi.proto,使用的是 Protocol Buffers 描述语言。

External Components

External Components 都是以 sidecar 的方式提供使用的。当开发完三个 Custom Components 之后,开发者需要根据存储的特点,选择合适的 sidecar 容器注入到 Pod 中。这里的 External Components 除了前面图中提到的 node-driver-registrar、external-attacher、external-provisioner 还有很多,可以参考官方文档,这里对常用的 sidecars 做一些简单介绍:

  • livenessprobe

    liveessprobe 监视 CSI 驱动程序的运行状况,并将其报告给 Kubernetes。这使得 Kubernetes 能够自动检测驱动程序的问题,并重新启动 pod 来尝试修复问题。

  • node-driver-registrar

    node-driver-registrar 可从 CSI driver 获取驱动程序信息(通过 NodeGetInfo 方法),并使用 kubelet 插件注册机制在该节点上的 kubelet 中对其进行注册。

  • external-provisioner

    external-provisioner 组件对于块存储(如 ceph)非常关键。它监听 PersistentVolumeClaim 创建,调用 CSI 驱动的 CreateVolume 方法创建对应的底层存储(如 ceph image),一旦创建成功,provisioner 会创建一个 PersistentVolume 资源。当监听到 PersistentVolumeClaim 删除时,它会调用 CSI 的 DeleteVolume 方法删除底层存储,如果成功,则删除 PersistentVolume

  • external-attacher

    用于监听 Kubernetes VolumeAttachment 对象并触发 CSI 的 Controller[Publish|Unpublish]Volume 操作。

  • external-resizer

    监听 PersistentVolumeClaim 资源修改,调用 CSI ControllerExpandVolume 方法,来调整 volume 的大小。

External Components 与 Custom Components 共同组成部署 yaml ,可以参考 ceph-csi 的部署yaml:

  • csi-rbdplugin-provisioner.yaml
  • csi-rbdplugin.yaml

ps:其中 cephcsi 镜像是开发者实现的,包含所提的3个 gRPC 服务。

动态卷供应(Dynamic Volume Provisioning)执行过程

为了实现 Identity、Node、Controller 3个服务,需要清楚动态卷供应的执行过程。

dynamic-provisioning-timeline

dynamic-provisioning-timeline

(图片出处)

	 CreateVolume +------------+ DeleteVolume
 +------------->|  CREATED   +--------------+
 |              +---+----^---+              |
 |       Controller |    | Controller       v
+++         Publish |    | Unpublish       +++
|X|          Volume |    | Volume          | |
+-+             +---v----+---+             +-+
                | NODE_READY |
                +---+----^---+
               Node |    | Node
              Stage |    | Unstage
             Volume |    | Volume
                +---v----+---+
                |  VOL_READY |
                +---+----^---+
               Node |    | Node
            Publish |    | Unpublish
             Volume |    | Volume
                +---v----+---+
                | PUBLISHED  |
                +------------+

(内容取自csi-spec)

CSI Dynamic Volume Provisioning 大致流程如下:

  • 用户创建 pod + pvc。
  • VolumeController 的 PersistentVolumeController 控制循环监听到 pvc 创建,发现使用的是 Out-of-Tree 模式,跳过。该控制循环主要负责 In-Tree 模式下 pv 和 pvc 的绑定。
  • external-provisioner 监听到 pvc 创建:
    • 调用 Controller Service 的 CreateVolume 方法创建底层存储 Volume。此时 Volume 属于 CREATED 状态,仅在存储系统中存在,对于所有的 Node 或者 Container 都是不可感知的。
    • 创建 pv。
    • 将 pv 与 pvc 绑定(绑定:将这个 pv 对象的名字填在 pvc 对象的 spec.volumeName 字段上)。
  • VolumeController 的 AttachDetachController 控制循环发现 Volume 未被挂载到宿主机,需要 Attach 操作,于是创建 VolumeAttachment 对象。
  • external-attacher 监听到 VolumeAttachment 资源创建后,调用 Controller Service 的 ControllerPublishVolume 方法。此时,Volume 处于 NODE_READY 状态,即:Node 可以感知到 Volume,但是容器内依然不可见。
  • kubelet 的 VolumeManagerReconciler 控制循环:
    • 执行 MountDevice 操作,调用 Node Service 的 NodeStageVolume 方法。该方法主要实现对 Volume 格式化,然后挂载到一个临时目录(Staging 目录)上,经过此操作后,Volume 进入 VOL_READY 状态。
    • 执行 SetUp 操作,调用 Node Service 的 NodePublishVolume 方法:将 Staging 目录,绑定挂载到 Volume 对应的宿主机目录上,Volume 进入 PUBLISHED 状态,用户此时可以正常使用。

    CSI 开发

    从零开始实现一个 nfs-csi

    直接实现 ceph-csi 代码量比较多,且需要基于 ceph 集群调试,对新手不太友好。所以本文先从一个简单的 nfs-csi 入手,再分析 ceph-csi 的核心代码。首先是框架搭建,因为逻辑并不复杂,所以采用一种比较扁平的目录设计:

    .
    ├── Dockerfile
    ├── LICENSE
    ├── Makefile
    ├── README.md
    ├── driver
    │   ├── controller_server.go
    │   ├── driver.go
    │   ├── identity_server.go
    │   ├── node_server.go
    │   ├── server.go  // 对 gRPC Server 的一个简单封装
    │   └── utils.go   // 工具函数
    ├── yamls    // 存放部署和测试yaml
    ├── go.mod
    ├── go.sum
    └── main.go  // 入口文件
    

    起一个 gRPC Server,把3个服务注册到 CSI:

    func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
    
    	proto, addr, err := ParseEndpoint(endpoint)
    	if err != nil {
    		klog.Fatal(err.Error())
    	}
    
    	if proto == "unix" {
    		addr = "/" + addr
    		if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
    			klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
    		}
    	}
    
    	listener, err := net.Listen(proto, addr)
    	if err != nil {
    		klog.Fatalf("Failed to listen: %v", err)
    	}
    
    	opts := []grpc.ServerOption{
    		grpc.UnaryInterceptor(LogGRPC),
    	}
    	server := grpc.NewServer(opts...)
    	s.server = server
    
    	if ids != nil {
    		csi.RegisterIdentityServer(server, ids)
    	}
    	if cs != nil {
    		csi.RegisterControllerServer(server, cs)
    	}
    	if ns != nil {
    		csi.RegisterNodeServer(server, ns)
    	}
    
    	klog.Infof("Listening for connections on address: %#v", listener.Addr())
    	reflection.Register(server)
    
    	err = server.Serve(listener)
    	if err != nil {
    		klog.Fatalf("Failed to serve grpc server: %v", err)
    	}
    }
    

    不同于平时写 gRPC 服务所用的 endpoint tcp://127.0.0.1:10000,csi gRPC 是基于 UDS(UNIX Domain Socket)unix://tmp/csi.sock。UDS 是一种 IPC 通信机制,不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,在同一台主机的通信中具有更高的性能。

    关于 Debug

    因为开发 csi 驱动本质是开发 gPRC 服务,常用的 gPRC 调试工具:grpcui、grpcurl 都是可以用来调试的,下图是 grpcui 使用界面 :

    grpcui

    grpcui

    除此之外还有一款 csc 工具,配置一些简单的环境变量后,可以通过命令行的方式调用接口(使用文档):

    $ csc identity plugin-info --endpoint "$endpoint"
    "nfs.csi.k8s.io"    "v2.0.0"
    

    Identity 实现

    gPRC 架子搭好后可以开始实现接口,从最简单的 Identity 服务开始:

    func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
    	return &csi.GetPluginInfoResponse{
    		Name:          ids.Driver.name,
    		VendorVersion: ids.Driver.version,
    	}, nil
    }
    
    func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
    	return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
    }
    
    func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
    	return &csi.GetPluginCapabilitiesResponse{
    		Capabilities: []*csi.PluginCapability{
    			{
    				Type: &csi.PluginCapability_Service_{
    					Service: &csi.PluginCapability_Service{
    						Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
    					},
    				},
    			},
    		},
    	}, nil
    }
    

    其中:

    • GetPluginInfo 接口返回驱动的名称和版本信息,比如 ceph-csi 的名称:rbd.csi.ceph.com,该名称与 StorageClass yaml 中的 provisioner 字段对应:

      ---
      apiVersion: storage.k8s.io/v1
      kind: StorageClass
      metadata:
         name: csi-rbd-sc
         provisioner: rbd.csi.ceph.com
      parameters:
      	 ...
      reclaimPolicy: Delete
      allowVolumeExpansion: true
      mountOptions:
         - discard
      
    • GetPluginCapabilities 接口返回插件的能力,具体可以阅读 csi.proto 中关于 PluginCapability 的注释

    Controller 实现

    接下来实现 Controller 服务。在 nfs CSI Controller 中,我们只需要实现 CreateVolumeDeleteVolumeControllerGetCapabilities 即可。细心的伙伴可能会想到,nfs 其实也不用创建存储,直接挂载就可以使用:

    mount -t nfs server:/root/nfsroot /root/mnt/nfs
    

    但是,为什么需要实现 CreateVolume 方法呢?

    因为我们不能将同一个 nfs 目录挂载给多个 pod 使用,不是功能上不允许,而是权限上不应该。我们应该给每个 pv 建立一个独立的目录,这样互相之间不会影响。具体做法类似于执行:

    # 先把 nfs root mount 到宿主机某一临时目录
    mount -t nfs nfsServer:/root/nfs /tmp/nfs-root-78y88
    
    # 进入 root 目录创建子目录
    mkdir /tmp/nfs-root-78y88/pvc-31bf63ad-80c2-451c-a9c3-f80b9bad302c
    
    # 卸载
    umount /tmp/nfs-root-78y88
    

    这样,在后续使用时直接挂载 nfsServer:/root/nfs/pvc-31bf63ad-80c2-451c-a9c3-f80b9bad302c 即可

    CreateVolume 核心代码如下:

    func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
      ...
    	nfsVol, err := cs.newNFSVolume(name, reqCapacity, req.GetParameters())
    	if err != nil {
    		return nil, status.Error(codes.InvalidArgument, err.Error())
    	}
    
      ...
    	// 把nfs root挂载到一个临时目录
    	if err = cs.internalMount(ctx, nfsVol, volCap); err != nil {
    		return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
    	}
    	// 取消挂载(defer)
    	defer func() {
    		if err = cs.internalUnmount(ctx, nfsVol); err != nil {
    			klog.Warningf("failed to unmount nfs server: %v", err.Error())
    		}
    	}()
    
      // 获取权限和子目录名
    	fileMode := os.FileMode(cs.Driver.mountPermissions)
    	internalVolumePath := cs.getInternalVolumePath(nfsVol)
    
      // 在nfs root目录中创建子目录
    	if err = os.Mkdir(internalVolumePath, fileMode); err != nil && !os.IsExist(err) {
    		return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error())
    	}
    	
    	...
    	return &csi.CreateVolumeResponse{Volume: cs.nfsVolToCSI(nfsVol)}, nil
    }
    

    DeleteVolume 基本同 CreateVolume,只是 mkdir 换成 rm 即可。

    ControllerGetCapabilities 返回 Controller 的能力:

    func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
    	return &csi.ControllerGetCapabilitiesResponse{
    		Capabilities: []*csi.ControllerServiceCapability{
    			{
    				Type: &csi.ControllerServiceCapability_Rpc{
    					Rpc: &csi.ControllerServiceCapability_RPC{
    						Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
    					},
    				},
    			},
    			{
    				Type: &csi.ControllerServiceCapability_Rpc{
    					Rpc: &csi.ControllerServiceCapability_RPC{
    						Type: csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
    					},
    				},
    			},
    		},
    	}, nil
    }
    

    Node 实现

    对于 nfs,Node 服务需要实现:NodePublishVolumeNodeUnpublishVolumeNodeGetInfoNodeGetCapabilities

    NodeGetInfo 返回当前 node 服务运行在哪个节点,其中,nodeId 是通过环境变量 fieldRef 注入的:

    nfs-csi-yaml

    nfs-csi-yaml

    func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
    	return &csi.NodeGetInfoResponse{
    		NodeId: ns.Driver.nodeID,
    	}, nil
    }
    

    NodeGetCapabilities 需要配置以下 3 种能力:

    csi.NodeServiceCapability_RPC_GET_VOLUME_STATS
    csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER
    csi.NodeServiceCapability_RPC_UNKNOWN
    

    Node 服务的核心是 NodePublishVolume 方法,需要执行 mount 操作,等价于执行:

    mount -t nfs nfsServer:/root/nfs/pvc-31bf63ad-80c2-451c-a9c3-f80b9bad302c /var/lib/kubelet/pods/aad09eed-e4a2-42ca-84e4-d8301c7d6257/volumes/kubernetes.io~csi/pvc-31bf63ad-80c2-451c-a9c3-f80b9bad302c/mount
    

    NodePublishVolume 核心代码如下:

    // NodePublishVolume mount the volume
    func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
    	 // 根据req获取source、targetPath
       ...
       
       // 把 nfsServer:/root/nfs/pvc-31bf63ad-80c2-451c-a9c3-f80b9bad302c 挂载到 /var/lib/kubelet/pods/aad09eed-e4a2-42ca-84e4-d8301c7d6257/volumes/kubernetes.io~csi/pvc-31bf63ad-80c2-451c-a9c3-f80b9bad302c/mount
       err = ns.mounter.Mount(source, targetPath, "nfs", mountOptions)
       if err != nil {
    			// 错误处理
    			// ...
       }
    	 
    	 // 修改目录权限
       if err := os.Chmod(targetPath, os.FileMode(ns.Driver.mountPermissions)); err != nil {
          return nil, status.Error(codes.Internal, err.Error())
       }
       return &csi.NodePublishVolumeResponse{}, nil
    }
    

    至此,一个简单的 nfs-csi 就实现完成了。

    Ceph-csi 核心源码解析

    nfs 与 ceph 这类块存储不太相同,不需要创建磁盘(Provision)、把磁盘挂载到宿主机(Attach)操作,比如 ceph 中的:

    rbd create -p pool-01 --image rbd-demo.img --size 10G
    rbd map pool-01/rbd-demo.img
    

    接下来我们简单看一下 ceph-csi 是如何用代码实现上述操作的。首先,在之前的 Dynamic Volume Provisioning 执行过程中已经讲过,Provision 应该是在 CreateVolume 方法中实现的:

    // CreateVolume creates the volume in backend
    func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
    
    	// 请求参数校验
    	if err := cs.validateVolumeReq(ctx, req); err != nil {
    		return nil, err
    	}
    
    	// 根据secret构建ceph请求凭证
    	cr, err := util.NewUserCredentials(req.GetSecrets())
    	if err != nil {
    		return nil, status.Error(codes.Internal, err.Error())
    	}
    	defer cr.DeleteCredentials()
    
    	// 处理请求参数,并转换为rbdVol结构体
    	rbdVol, err := cs.parseVolCreateRequest(ctx, req)
    	if err != nil {
    		return nil, err
    	}
    	defer rbdVol.Destroy()
    
    	// Existence and conflict checks
    	// 检查并获取锁(同名存储在同一时间,只能做创建、删除等操作中的一个)
    	if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired {
    		klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), req.GetName())
    		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetName())
    	}
    	defer cs.VolumeLocks.Release(req.GetName())
    
    	// 幂等性
    	found, err := checkVolExists(ctx, rbdVol, cr)
    	if err != nil {
    		if _, ok := err.(ErrVolNameConflict); ok {
    			return nil, status.Error(codes.AlreadyExists, err.Error())
    		}
    
    		return nil, status.Error(codes.Internal, err.Error())
    	}
    	// 已经存在同名卷
    	if found {
    		if rbdVol.Encrypted {
    			err = ensureEncryptionMetadataSet(ctx, cr, rbdVol)
    			if err != nil {
    				klog.Errorf(util.Log(ctx, err.Error()))
    				return nil, err
    			}
    		}
    
    		volumeContext := req.GetParameters()
    		volumeContext["pool"] = rbdVol.Pool
    		volumeContext["journalPool"] = rbdVol.JournalPool
    		volume := &csi.Volume{
    			VolumeId:      rbdVol.VolID,
    			CapacityBytes: rbdVol.VolSize,
    			VolumeContext: volumeContext,
    			ContentSource: req.GetVolumeContentSource(),
    		}
    		if rbdVol.Topology != nil {
    			volume.AccessibleTopology =
    				[]*csi.Topology{
    					{
    						Segments: rbdVol.Topology,
    					},
    				}
    		}
    		return &csi.CreateVolumeResponse{Volume: volume}, nil
    	}
    
    	// 快照
    	rbdSnap, err := cs.checkSnapshotSource(ctx, req, cr)
    	if err != nil {
    		return nil, err
    	}
    
    	// 预定 rbdVolume name ,并创建 volumeID
    	err = reserveVol(ctx, rbdVol, rbdSnap, cr)
    	if err != nil {
    		return nil, status.Error(codes.Internal, err.Error())
    	}
    	defer func() {
    		if err != nil {
    			errDefer := undoVolReservation(ctx, rbdVol, cr)
    			if errDefer != nil {
    				klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer)
    			}
    		}
    	}()
    
    	// 创建image
    	err = createBackingImage(ctx, cr, rbdVol, rbdSnap)
    	if err != nil {
    		return nil, err
    	}
    
    	if rbdVol.Encrypted {
    		err = ensureEncryptionMetadataSet(ctx, cr, rbdVol)
    		if err != nil {
    			klog.Errorf(util.Log(ctx, "failed to save encryption status, deleting image %s"),
    				rbdVol.RbdImageName)
    			if deleteErr := deleteImage(ctx, rbdVol, cr); err != nil {
    				klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s/%s with error: %v"),
    					rbdVol.Pool, rbdVol.RbdImageName, deleteErr)
    				return nil, deleteErr
    			}
    			return nil, err
    		}
    	}
    
    	volumeContext := req.GetParameters()
    	volumeContext["pool"] = rbdVol.Pool
    	volumeContext["journalPool"] = rbdVol.JournalPool
    	volume := &csi.Volume{
    		VolumeId:      rbdVol.VolID,
    		CapacityBytes: rbdVol.VolSize,
    		VolumeContext: volumeContext,
    		ContentSource: req.GetVolumeContentSource(),
    	}
    	if rbdVol.Topology != nil {
    		volume.AccessibleTopology =
    			[]*csi.Topology{
    				{
    					Segments: rbdVol.Topology,
    				},
    			}
    	}
    	return &csi.CreateVolumeResponse{Volume: volume}, nil
    }
    

    创建磁盘镜像:

    import librbd "github.com/ceph/go-ceph/rbd"
    
    func createImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) error {
    	...
    
    	err = librbd.CreateImage(ioctx, pOpts.RbdImageName,
    		uint64(util.RoundOffVolSize(pOpts.VolSize)*util.MiB), options)
    	if err != nil {
    		return errors.Wrapf(err, "failed to create rbd image")
    	}
    
    	return nil
    }
    

    可以看到 ceph-csi 中是使用 librbd 库连接 ceph 集群进行 image 创建。除此之外,还需要注意 CreateVolume 实现的幂等性。因为 k8s 可能因为一些其他原因(比如网络超时重试)重复发送请求,实现中需要对此进行判断,不可重复创建。

    接下来看 Attach,理论上 Attach 操作应该是在 ControllerPublishVolume 方法中实现的,但实际上 ceph 的 map 命令需要到对应的宿主机上才能执行,所以实现是在 NodeStageVolume 方法:

    func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
    	
      ...
    
    	// perform the actual staging and if this fails, have undoStagingTransaction
    	// cleans up for us
      transaction, err = ns.stageTransaction(ctx, req, volOptions, staticVol)
    	if err != nil {
    		return nil, status.Error(codes.Internal, err.Error())
    	}
    
    	...
    
    	return &csi.NodeStageVolumeResponse{}, nil
    }
    
    func (ns *NodeServer) stageTransaction(ctx context.Context, req *csi.NodeStageVolumeRequest, volOptions *rbdVolume, staticVol bool) (stageTransaction, error) {
    	...
    
    	// Mapping RBD image
    	var devicePath string
    	devicePath, err = attachRBDImage(ctx, volOptions, cr)
      ...  
    
    	stagingTargetPath := getStagingTargetPath(req)
      ...
    
    	// 挂载到 nodeStage
    	err = ns.mountVolumeToStagePath(ctx, req, staticVol, stagingTargetPath, devicePath)
    	...
    
    	// 设置权限
    	err = os.Chmod(stagingTargetPath, 0777)
    
    	return transaction, err
    }
    
    func attachRBDImage(ctx context.Context, volOptions *rbdVolume, cr *util.Credentials) (string, error) {
    	...
    
    	// 判断 image 是否已经 map 到 node 上
    	devicePath, found := waitForPath(ctx, volOptions.Pool, image, 1, useNBD)
    	if !found {
    		backoff := wait.Backoff{
    			Duration: rbdImageWatcherInitDelay,
    			Factor:   rbdImageWatcherFactor,
    			Steps:    rbdImageWatcherSteps,
    		}
    
    		err = waitForrbdImage(ctx, backoff, volOptions, cr)
    
    		if err != nil {
    			return "", err
    		}
    
    		// 执行 map 操作
    		devicePath, err = createPath(ctx, volOptions, cr)
    	}
    
    	return devicePath, err
    }
    
    func createPath(ctx context.Context, volOpt *rbdVolume, cr *util.Credentials) (string, error) {
    
    	// Map options
    	mapOptions := []string{
    		"--id", cr.ID,
    		"-m", volOpt.Monitors,
    		"--keyfile=" + cr.KeyFile,
    		"map", imagePath,
    	}
      
      ...
    
    	// Execute map
    	output, err := execCommand(rbd, mapOptions)
      
      ...
    
    	return devicePath, nil
    }
    

    可以看到,attach实际就是执行 rbd map 指令,与之前的设想一致。函数嵌套比较深,此处只保留了核心代码,完整代码可以参考 ceph-csi。

    参考资料

    • https://github.com/kubernetes-csi/csi-driver-nfs
    • https://kubernetes-csi.github.io/docs/introduction.html
    • https://github.com/container-storage-interface/spec/blob/master/spec.md
    • https://kingjcy.github.io/post/cloud/paas/base/kubernetes/k8s-store-csi/
    • https://github.com/ceph/ceph-csi
    • 《深入剖析 Kubernetes》— 张磊

    相关文章

    KubeSphere 部署向量数据库 Milvus 实战指南
    探索 Kubernetes 持久化存储之 Longhorn 初窥门径
    征服 Docker 镜像访问限制!KubeSphere v3.4.1 成功部署全攻略
    那些年在 Terraform 上吃到的糖和踩过的坑
    无需 Kubernetes 测试 Kubernetes 网络实现
    Kubernetes v1.31 中的移除和主要变更

    发布评论