Canal 同步数据体验

2024年 4月 19日 54.7k 0

使用 Canal 同步数据

目标

  • 安装 canal-server、canal-admin、canal-adpater
  • 同步多个数据库的表到一个数据库
  • 操作步骤

  • 下载软件包
  • 按照官方教程进行安装
  • 进行必要的服务配置,启动服务,从canal-admin-web进行访问验证服务安装成功
  • 添加同步任务
  • 验证同步过程的正确性
  • 说明
    软件版本:canal.deployer-1.1.7、canal.admin-1.1.7、canal-adapter-1.1.7

    执行过程中发现该版本无法直接在 admin 启停 canal-server,确认为 bug,不想升级或更换版本的话,手动启停 canal-server 即可。

    本地安装方式

    1、安装启动canal-admin。

    成功后,可以通过 http://127.0.0.1:8089/ */ 访问,默认密码:admin/123456

    2、安装启动canal-server

    这里使用远程配置方式(即通过canal-admin进行配置的编辑管理,数据保存在数据库)。

    下载软件包到本地后,对 conf/canary.properties 进行必要的修改,如下:

    # 
    canal.register.ip = 127.0.0.1
    
    # canal admin config
    canal.admin.manager = 127.0.0.1:8089
    canal.admin.port = 11110
    canal.admin.user = admin
    # 对应密码123456
    canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
    #admin auto register
    # canal-server 启动后会从 canal-admin 拉取配置信息(canal.properties),如果为true,当配置不存在时会自动注册 canal-server 的信息,并自动在数据库中创建一个关联的配置记录
    canal.admin.register.auto = true
    canal.admin.register.cluster = 
    canal.admin.register.name = local_demo
    

    按照上面的配置,在 canal-server 启动成功后,会自动注册 server 到 canal-admin,然后就可以直接在 canal-admin 管理和配置该 canal-server 了。

    image.png

    注意:如果 canal-server 启动时拉取配置失败,记得检查 canal.register.ip 的配置,确保是 canal-admin 可以访问的 IP。

    3、安装启动 canal-adapter

    前面安装 canal-admin 时已经创建了 canal-manager 数据库,所以这里依然选择使用远程配置的方式:

    • 在 canal_config 表中创建 id=2 的数据对应 adapter 下的 application.yml 文件。
    • 在 canal_adapter_config 表对应每个adapter的子配置文件。

    注意: 目前这个远程配置功能似乎并没有实装,所以后面依然采用修改本地配置的方式。

    4、测试同步(全量+增量)

    准备两个数据库:test_a 和 test_b,在 test_a 创建一张 user 表并插入一批数据,在 test_b 创建同样的表但是不写入任何数据。现在需要将 test_a 中 user 表的数据同步到 test_b 的对应表,然后测试增量同步。

    操作步骤:

  • 准备数据库和表
  • 创建 instance,命名为 instance_test_a,表示同步源为 test_a
  • 添加 adapter(key 为 syncUser),用来将 test_a.user 中的数据同步到 test_b.user
  • 添加 adapter config,配置RDB映射关系,并通过 outerAdapterKey : syncUser 关联到 adapter
  • 调用接口查询所有的同步任务
  • 调用接口执行手动ETL
  • 验证结果
  • create schema test_a;
    -- 创建表
    CREATE TABLE test_a.user (
        id INT AUTO_INCREMENT PRIMARY KEY,
        username VARCHAR(50) NOT NULL,
        password VARCHAR(50) NOT NULL,
        email VARCHAR(100),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    -- 插入数据
    INSERT INTO test_a.user (username, password, email) VALUES
    ('john_doe', 'password123', 'john.doe@example.com'),
    ('jane_smith', 'securepass', 'jane.smith@example.com'),
    ('alice_jones', 'alice123', 'alice.jones@example.com'),
    ('bob_brown', 'password456', 'bob.brown@example.com'),
    ('carol_gray', 'carolpass', 'carol.gray@example.com');
    
    -- 创建备份库和表
    create schema test_b;
    
    CREATE TABLE test_b.user (
        id INT AUTO_INCREMENT PRIMARY KEY,
        username VARCHAR(50) NOT NULL,
        password VARCHAR(50) NOT NULL,
        email VARCHAR(100),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    

    application.yml

    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp #tcp kafka rocketMQ rabbitMQ
      flatMessage: true
      zookeeperHosts:
      syncBatchSize: 1000
      retries: -1
      timeout: 100
      accessKey:
      secretKey:
      consumerProperties:
        # canal tcp consumer
        canal.tcp.server.host: 127.0.0.1:11111
        canal.tcp.zookeeper.hosts:
        canal.tcp.batch.size: 500
        canal.tcp.username:
        canal.tcp.password:
        # kafka consumer
        kafka.bootstrap.servers: 127.0.0.1:9092
        kafka.enable.auto.commit: false
        kafka.auto.commit.interval.ms: 1000
        kafka.auto.offset.reset: latest
        kafka.request.timeout.ms: 40000
        kafka.session.timeout.ms: 30000
        kafka.isolation.level: read_committed
        kafka.max.poll.records: 1000
        # rocketMQ consumer
        rocketmq.namespace:
        rocketmq.namesrv.addr: 127.0.0.1:9876
        rocketmq.batch.size: 1000
        rocketmq.enable.message.trace: false
        rocketmq.customized.trace.topic:
        rocketmq.access.channel:
        rocketmq.subscribe.filter:
        # rabbitMQ consumer
        rabbitmq.host:
        rabbitmq.virtual.host:
        rabbitmq.username:
        rabbitmq.password:
        rabbitmq.resource.ownerId:
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://127.0.0.1:3306/test_a?useUnicode=true
          username: test
          password: test
      canalAdapters: # 适配器列表。
        - instance: instance_test_a # canal instance Name or mq topic name
          groups: # 一份数据可以被多个group同时消费, group之间并行执行, 一个group内部串行执行多个outerAdapters
            - groupId: g1
              outerAdapters:
                - name: rdb # 指定为rdb类型同步,同步到test_b库
                  key: syncUser # 指定adapter的唯一key, 与表映射配置中outerAdapterKey对应
                  properties:
                    jdbc.driverClassName: com.mysql.jdbc.Driver
                    jdbc.url: jdbc:mysql://127.0.0.1:3306/test_b?useUnicode=true
                    jdbc.username: test
                    jdbc.password: test
                    druid.stat.enable: false
                    druid.stat.slowSqlMillis: 1000
    

    test_user.yml

    dataSourceKey: defaultDS
    destination: instance_test_a
    groupId: g1
    outerAdapterKey: syncUser
    concurrent: true
    dbMapping:
      database: test_a # 源数据库
      table: user # 源表
      targetTable: user #目标表
      targetPk:
        id: id
      mapAll: true
      commitBatch: 3000 # 批量提交的大小
    

    注意
    测试发现当前版本(v1.1.7)adapter 远程配置不生效,所以依然采用编辑本地配置文件(application.yml 和 conf/rdb/test_user.yml)的方式。

    手动执行ETL:

    curl http://127.0.0.1:8081/etl/rdb/test_user.yml -X POST
    > {"succeeded":true,"resultMessage":"导入RDB 数据:6 条"}
    

    后面直接在 test_a.user 表执行 DML 操作就会自动同步到 test_b.user 了。注意,DDL 不会同步哦!

    相关文章

    Oracle如何使用授予和撤销权限的语法和示例
    Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
    下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
    社区版oceanbase安装
    Oracle 导出CSV工具-sqluldr2
    ETL数据集成丨快速将MySQL数据迁移至Doris数据库

    发布评论