在SpringBoot中通过Canal实现MySQL与Redis的数据同步

2023年 9月 26日 162.8k 0

环境:Springboot2.7.12 + MySQL8

1 环境准备

master: 192.168.2.129

slave: 192.168.2.130

使用Docker安装MySQL,这里Docker安装省略,网上一堆教程。

Docker安装完成后,安装MySQL。

安装MySQL

步骤1:

[root@node150 kafka]# mkdir -p /root/software/mysql/conf /root/software/mysql/data
 [root@node150 kafka]# chmod -R 777 /root/software/mysql/

步骤2:

进入/root/software/mysql/conf 创建my.cnf 内容如下:

[client]
 #socket = /usr/mysql/mysqld.sock
 default-character-set = utf8mb4
 [mysqld]
 #pid-file        = /var/run/mysqld/mysqld.pid
 #socket          = /var/run/mysqld/mysqld.sock
 #datadir         = /var/lib/mysql
 #socket = /usr/mysql/mysqld.sock
 #pid-file = /usr/mysql/mysqld.pid 
 datadir = /var/lib/mysql
 character_set_server = utf8mb4
 collation_server = utf8mb4_bin
 secure-file-priv= NULL
 # Disabling symbolic-links is recommended to prevent assorted security risks
 symbolic-links=0
 # Custom config should go here
 !includedir /etc/mysql/conf.d/

步骤3:

docker run --name mysql8 --restart=always --privileged=true -v /root/software/mysql/conf/my.cnf:/etc/mysql/my.cnf -v /root/software/mysql/data:/var/lib/mysql -v /etc/localtime:/etc/localtime:ro -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123123 -d mysql --lower_case_table_names=1

完成分别在129,130上完成上面操作。

2 主从配置

129.168.2.129作为master节点,修改my.cnf配置,追加下面配置:

binlog_format=MIXED
log-bin=mysql-bin
server-id=1

129.168.2.130作为slave节点,修改my.cnf配置,追加下面配置:

log-bin=mysql-bin
server-id=2

修改完配置后分别重启mysql

3 配置从节点

3.1 查看master状态

mysql> show master status;
 +------------------+----------+--------------+------------------+-------------------+
 | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
 +------------------+----------+--------------+------------------+-------------------+
 | mysql-bin.000001 |      156 |              |                  |                   |
 +------------------+----------+--------------+------------------+-------------------+
 1 row in set (0.00 sec)

这里的File,Position列再配置从节点时需要用到。

3.2 配置从节点

CHANGE MASTER TO 
MASTER_HOST='192.168.2.129',
MASTER_PORT=3306,
MASTER_USER='root',
MASTER_PASSWORD='123123',
master_log_file='mysql-bin.000003',
master_log_pos=156,
master_connect_retry=60,
GET_MASTER_PUBLIC_KEY=1;

执行上面的命令如果报错如下:

This operation cannot be performed with a running slave io thread; run STOP SLAVE IO_THREAD FOR CHANNEL '' first.

这时需要执行stop slave

注意:上面的mysql-bin.000003 是首次配置的,如果master节点重启了,这个文件会递增变为mysql-bin.000004,这时我们的从节点会自动连上这mysql-bin.000004

查看从节点状态:

mysql> show slave status\G
 *************************** 1. row ***************************
                Slave_IO_State: Waiting for source to send event
                   Master_Host: 192.168.2.129
                   Master_User: root
                   Master_Port: 3306
                 Connect_Retry: 60
               Master_Log_File: mysql-bin.000005
           Read_Master_Log_Pos: 156
                Relay_Log_File: 85acaa370429-relay-bin.000014
                 Relay_Log_Pos: 371
              # 该文件会自动与master节点同步
         Relay_Master_Log_File: mysql-bin.000005
              # 主要看这里的两个Running是否为Yes
              Slave_IO_Running: Yes
             Slave_SQL_Running: Yes
               Replicate_Do_DB: 
           Replicate_Ignore_DB: 
            Replicate_Do_Table: 
        Replicate_Ignore_Table: 
       Replicate_Wild_Do_Table: 
   Replicate_Wild_Ignore_Table: 
                    Last_Errno: 0
                    Last_Error: 
                  Skip_Counter: 0
           Exec_Master_Log_Pos: 156
               Relay_Log_Space: 755
               Until_Condition: None
                Until_Log_File: 
                 Until_Log_Pos: 0
            Master_SSL_Allowed: No
            Master_SSL_CA_File: 
            Master_SSL_CA_Path: 
               Master_SSL_Cert: 
             Master_SSL_Cipher: 
                Master_SSL_Key: 
         Seconds_Behind_Master: 0
 Master_SSL_Verify_Server_Cert: No
                 Last_IO_Errno: 0
                 Last_IO_Error: 
                Last_SQL_Errno: 0
                Last_SQL_Error: 
   Replicate_Ignore_Server_Ids: 
              Master_Server_Id: 1
                   Master_UUID: 71bb9106-a9a9-11ed-9031-0242ac110002
              Master_Info_File: mysql.slave_master_info
                     SQL_Delay: 0
           SQL_Remaining_Delay: NULL
       Slave_SQL_Running_State: Replica has read all relay log; waiting for more updates
            Master_Retry_Count: 86400
                   Master_Bind: 
       Last_IO_Error_Timestamp: 
      Last_SQL_Error_Timestamp: 
                Master_SSL_Crl: 
            Master_SSL_Crlpath: 
            Retrieved_Gtid_Set: 
             Executed_Gtid_Set: 
                 Auto_Position: 0
          Replicate_Rewrite_DB: 
                  Channel_Name: 
            Master_TLS_Version: 
        Master_public_key_path: 
         Get_master_public_key: 1
             Network_Namespace: 
 1 row in set, 1 warning (0.00 sec)

3.3 测试

在主节点上执行创建数据库,建表等操作都会自动同步到slave节点上。

4 创建Canal

docker run --name canal -p 11111:11111 -v /opt/canal/conf:/home/admin/canal-server/conf -v /opt/canal/logs:/home/admin/canal-server/logs -d canal/canal-server

5 Springboot整合Canal

添加依赖


  
    org.springframework.boot
    spring-boot-starter-data-redis
  
  
    org.springframework.boot
    spring-boot-starter-web
  
  
    top.javatool
    canal-spring-boot-starter
    1.2.1-RELEASE
  

配置文件

spring:
  redis:
    host: localhost
    port: 6379
    password: 123123
    database: 8
    lettuce:
      pool:
        maxActive: 8
        maxIdle: 100
        minIdle: 10
        maxWait: -1
---
canal:
  server: 192.168.2.130:11111
  destination: redis #可设置*个,逗号隔开,对应需要创建*文件夹,且文件夹下有instance.properties文件

数据模型

public class Users {


  private Integer id ;
  private String name ;
  private Integer age ;
  @Override
  public String toString() {
    return "Users [id=" + id + ", name=" + name + ", age=" + age + "]";
  }
  
}

具体服务组件

@Component
@CanalTable(value = "users")
public class UserServiceImpl implements EntryHandler {
  
  private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
  
  private final StringRedisTemplate stringRedisTemplate ;
  
  public UserServiceImpl(StringRedisTemplate stringRedisTemplate) {
    this.stringRedisTemplate = stringRedisTemplate ;
  }


  @Override
  public void insert(Users user) {
    logger.info("新增数据为{}", user);
    try {
      this.stringRedisTemplate.opsForValue().set("users:" + user.getId(), new ObjectMapper().writeValueAsString(user)) ;
    } catch (JsonProcessingException e) {
      e.printStackTrace();
    }
  }


  @Override
  public void update(Users before, Users after) {
    logger.info("原来数据为{}", before);
    logger.info("更新数据为{}", after);
    try {
      this.stringRedisTemplate.opsForValue().set("users:" + after.getId(), new ObjectMapper().writeValueAsString(after)) ;
    } catch (JsonProcessingException e) {
      e.printStackTrace();
    }
  }


  @Override
  public void delete(Users user) {
    logger.info("删除的数据为{}", user);
    this.stringRedisTemplate.delete("users:" + user.getId()) ;
  }


}

以上通过增,删,改数据就能同步到Redis中了。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论