TiUP 源码初探

想必tidber都使用过tiup,tiup给部署带来很大的便利性,在日常开发中也会涉及到分布式的部署,自己难免也会写一些工具,本文抛砖引玉,分享一下自己看tiup源码的一些步骤,希望您在看源码或写工具时多一个思考方向。环境部署可参考:启航TiDB:调试环境搭建(vscode+wsl+pd)

Cobra

Cobra 是一个 Go 语言开发的命令行(CLI)框架,是由 Go 团队成员 spf13 为 Hugo 项目创建的,并已被许多流行的 Go 项目所采用,如 Kubernetes、Helm、Docker (distribution)、Etcd 等。而Tiup也是以cobra为基础,进行开发的。网上有大量的介绍文章,这里就不赘述了,直接放一个简单的demo。

demo
├── cmd
│   └── root.go
|   └── version.go
├── go.mod
├── go.sum
└── main.go
// root.go
var (
    rootCmd *cobra.Command
)

func init() {
    rootCmd = &cobra.Command{
        Use:   "demo",
        Short: "Demo is a Cobra application",
        Long:  `This is a demo application to illustrate the use of Cobra library.`,
        Run: func(cmd *cobra.Command, args []string) {
            fmt.Println("Hello, Cobra!")
        },
    }
    rootCmd.AddCommand(newVersionCmd())
}
func Execute() {
    if err := rootCmd.Execute(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
}

// version.go
func newVersionCmd() *cobra.Command {
    cmd := &cobra.Command{
        Use:   "version",
        Short: "Print the version number of Demo",
        Long:  `All software has versions. This is Demo's`,
        Run: func(cmd *cobra.Command, args []string) {
            fmt.Println("Demo v0.1 -- HEAD")
        },
    }
    return cmd
}

func init() {
    rootCmd.AddCommand(versionCmd)
}
// main.go
func main() {
    cmd.Execute()
}

编译后运行

> .\\demo.exe -h
This is a demo application to illustrate the use of Cobra library.

Usage:
  demo [flags]
  demo [command]

Available Commands:
  completion  Generate the autocompletion script for the specified shell
  help        Help about any command
  version     Print the version number of Demo

Flags:
  -h, --help   help for demo

Use "demo [command] --help" for more information about a command.

同样,Tiup的组织结构也很简单明确,在cmd文件夹下放着各种命令,root.go为注册命令的地方,目录结构如下:

├── cmd
│   ├── env.go
│   ├── list.go
│   ├── mirror.go
│   ├── root.go   # 命令注册
├── components  # 组件
│   ├── bench
│   ├── cluster   
|   |   |── command
|   |   │   ├── clean.go
|   |   │   ├── deploy.go
|   |   │   ├── root.go # cluster 命令注册
|   |   |—— main.go
│   └── playground
├── pkg
│   ├── cluster
│   │   ├── ansible
│   │   ├── api
│   │   ├── audit
│   │   ├── clusterutil
│   │   ├── ctxt
│   │   ├── executor   # 执行器,easyssh和nativessh
│   │   ├── manager    # 任务的生成,步骤的生成所有的管理都在这里
│   │   ├── module
│   │   ├── operation
│   │   ├── spec       # 拓扑结构--安装的说明书
│   │   ├── task       # 各种任务
│   │   └── template   # 
├── main.go

和demo不同的是,Tiup的二级命令是通过调用对应的执行文件来实现,这些执行文件在$TIUPHOME/components/cluster/vx.x.x/下,在执行的时候如果没有对应的子命令就会下载(只用tiup-cluster,就不会有tiup-dm),另一个好处是每一个子命令都是一个独立的命令行工具—可插拔组件。

// root.go
func init() {
    cobra.EnableCommandSorting = false
    _ = os.Setenv(localdata.EnvNameTelemetryEventUUID, eventUUID)

    rootCmd = &cobra.Command{
        Use: `tiup [flags]  [args...]`
      // ...
        },
        PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
            // InitEnv
            return nil
        },
        RunE: func(cmd *cobra.Command, args []string) error {
            if len(args) == 0 {
                return cmd.Help()
            }
            env := environment.GlobalEnv()
            // ...
            return tiupexec.RunComponent(env, tag, componentSpec, binPath, args)
        },
    }
}

而这些命令最小的执行单位是任务。

任务

任务会有执行和回滚两个操作,在操作的时候有顺序和并行的两种任务,builder来构建不同的任务

type (
    // Task represents a operation while TiUP execution
    Task interface {
        fmt.Stringer
        Execute(ctx context.Context) error
        Rollback(ctx context.Context) error
    }

    // Serial will execute a bundle of task in serialized way
    Serial struct {
        ignoreError       bool
        hideDetailDisplay bool
        inner             []Task
    }

    // Parallel will execute a bundle of task in parallelism way
    Parallel struct {
        ignoreError       bool
        hideDetailDisplay bool
        inner             []Task
    }
)

任务也可以作为一种方法为其它任务调用比如:CopyComponent,任务最后都会调用执行器来结束任务,其实就是linux上的命令。

func (m *Mkdir) Execute(ctx context.Context) error {
 exec, found := ctxt.GetInner(ctx).GetExecutor(m.host)
    // ...
    _, _, err := exec.Execute(ctx, cmd, m.sudo) // use root to create the dir
    if err != nil {
        return errors.Trace(err)
    }
    // ...
    return nil
}

func (c *CopyComponent) Execute(ctx context.Context) error {
    // ...
    install := &InstallPackage{
        srcPath: srcPath,
        host:    c.host,
        dstDir:  c.dstDir,
    }
    return install.Execute(ctx)
}

func (c *InstallPackage) Execute(ctx context.Context) error {
    // Install package to remote server
    exec, found := ctxt.GetInner(ctx).GetExecutor(c.host)
    if !found {
        return ErrNoExecutor
    }
    dstDir := filepath.Join(c.dstDir, "bin")
    dstPath := filepath.Join(dstDir, path.Base(c.srcPath))

    err := exec.Transfer(ctx, c.srcPath, dstPath, false, 0, false)
    if err != nil {
        return errors.Annotatef(err, "failed to scp %s to %s:%s", c.srcPath, c.host, dstPath)
    }
    cmd := fmt.Sprintf(`tar --no-same-owner -zxf %s -C %s && rm %s`, dstPath, dstDir, dstPath)

    _, stderr, err := exec.Execute(ctx, cmd, false)
    if err != nil {
        return errors.Annotatef(err, "stderr: %s", string(stderr))
    }
    return nil
}

执行器

执行器放在上下文上的,它包含两个方法,执行器的设置是在需要SSH的时候,比如下面的RootSSH

Executor interface {
        Execute(ctx context.Context, cmd string, sudo bool, timeout ...time.Duration) (stdout []byte, stderr []byte, err error)
        Transfer(ctx context.Context, src, dst string, download bool, limit int, compress bool) error
    }

// Execute implements the Task interface
func (s *RootSSH) Execute(ctx context.Context) error {
    // ...
    e, err := executor.New(s.sshType, s.sudo, sc)
    if err != nil {
        return err
    }

    ctxt.GetInner(ctx).SetExecutor(s.host, e)
    return nil
}

执行器有easyssh和nativessh两种

// Deploy a cluster.
func (m *Manager) Deploy(
    name string,
    clusterVersion string,
    topoFile string,
    opt DeployOptions,
    afterDeploy func(b *task.Builder, newPart spec.Topology, gOpt operator.Options),
    skipConfirm bool,
    gOpt operator.Options,
) error

操作

可以看到tiup的基础就是任务,任务结束于执行器—ssh,多个任务就合并成一个步骤,多个步骤组合成一个操作:task.NewBuilder(m.logger).Atask().Btask.AS,下面我们看一个实际的操作Deploy

每一个操作都放在manager文件夹下,这里以Deploy为例,可以看到任务组一般都是以ssh任务开始,显示任务结束。

// command/root.go
func init() {
    rootCmd.AddCommand(
        newCheckCmd(),
        newDeploy(),
        // ...
        )
}
// command/deploy.go
func newDeploy() *cobra.Command {
    opt := manager.DeployOptions{
        IdentityFile: path.Join(utils.UserHome(), ".ssh", "id_rsa"),
    }
    cmd := &cobra.Command{
        Use:          "deploy   ",
        Short:        "Deploy a cluster for production",
        Long:         "Deploy a cluster for production. SSH connection will be used to deploy files, as well as creating system users for running the service.",
        SilenceUsage: true,
        RunE: func(cmd *cobra.Command, args []string) error {
          // ...
            return cm.Deploy(clusterName, version, topoFile, opt, postDeployHook, skipConfirm, gOpt)
        },
    }
    // ...
    cmd.Flags().BoolVarP(&opt.NoLabels, "no-labels", "", false, "Don't check TiKV labels")
    return cmd
}
// manager/deploy.go
func (m *Manager) Deploy(
    name string,
    clusterVersion string,
    topoFile string,
    opt DeployOptions,
    afterDeploy func(b *task.Builder, newPart spec.Topology, gOpt operator.Options),
    skipConfirm bool,
    gOpt operator.Options,
) error {
// ...
var (
        envInitTasks      []*task.StepDisplay // tasks which are used to initialize environment
        downloadCompTasks []*task.StepDisplay // tasks which are used to download components
        deployCompTasks   []*task.StepDisplay // tasks which are used to copy components to remote host
    )
// ... 
        t := task.NewBuilder(m.logger).
            RootSSH(
                host,
                hostInfo.ssh,
                opt.User,
                sshConnProps.Password,
                // ...,
            ).
            EnvInit(host, globalOptions.User, globalOptions.Group, opt.SkipCreateUser || globalOptions.User == opt.User, sudo).
            Mkdir(globalOptions.User, host, sudo, dirs...).
            BuildAsStep(fmt.Sprintf("  - Prepare %s:%d", host, hostInfo.ssh))
        envInitTasks = append(envInitTasks, t)
    // ...
    builder := task.NewBuilder(m.logger).
        Step("+ Generate SSH keys",
            task.NewBuilder(m.logger).
                SSHKeyGen(m.specManager.Path(name, "ssh", "id_rsa")).
                Build(),
            m.logger).
        ParallelStep("+ Download TiDB components", false, downloadCompTasks...).
        ParallelStep("+ Initialize target host environments", false, envInitTasks...).
        ParallelStep("+ Deploy TiDB instance", false, deployCompTasks...).
        ParallelStep("+ Copy certificate to remote host", gOpt.Force, certificateTasks...).
        ParallelStep("+ Init instance configs", gOpt.Force, refreshConfigTasks...).
        ParallelStep("+ Init monitor configs", gOpt.Force, monitorConfigTasks...)
        // ...
        m.logger.Infof("Cluster `%s` deployed successfully, you can start it with command: `%s`", name, hint)
    return nil
}