【实现简单的逻辑】
Mysql数据同步到Hive,大致流程如下:
分为离线和实时两部分,我们先实现离线,需要以下内容:Flink,SeaTunnel,Mysql,Hive,Hadoop,Java。
离线Mysql到Hive数据同步
1)准备所需要的
2)开始
Mysql创建数据库及其内容
-- 创建数据库
create database seatunnel;
-- 进入seatunnel数据库
use seatunnel;
-- 创建表
create table day_test(
dname varchar(64),
dage int
);
-- 插入数据
insert into day_test values('张三',20);
insert into day_test values('李四',18);
insert into day_test values('王二',29);
insert into day_test values('麻子',22);
数据库有数据,没什么太大问题,懒得删。
Hive创建接收数据表
#打开hive
hive
#新建数据库
create database mydemo;
#进入库
use mydemo;
#新建表
create table hive_mysql(
hname varchar(64),
hage int
);
#查看当前表的内容
select * from hive_mysql;
Hive里有数据,这个没什么影响,不用管它!
错误一
Hive的开启顺序是:先启动Mysql,再启动Hadoop集群,再启动Hive。
错误二
如果出现以下错误:
这说明你的Hive服务器没开,新开个页面,输入:
hive --service metastore &
不关闭这个页面就行了,放后台,这时候就行了。
修改SeaTunnel配置文件
这是我的SeaTunnel安装路径,你们换成自己的就行!
#进入Seatunnel目录下的conf
cd seatunnel/apache-seatunnel-incubating-2.3.0/conf
#复制配置文件并改名,变为我们后面的启动文件
cp seatunnel/apache-seatunnel-incubating-2.3.0/conf/seatunnel.streaming.conf.template seatunnel/apache-seatunnel-incubating-2.3.0/conf/example01.conf
#打开文件修改
vi example01.conf
#保存并退出
:wq
example01.conf文件内容如下:
env {
execution.parallelism = 1
}
# 在source所属的块中配置数据源
source {
Jdbc {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seatunnel?serverTimezone=GMT%2b8&characterEncoding=utf-8"
user = "root"
password = "123456"
query = "select * from day_test"
}
}
# 在transform的块中声明转换插件
transform {
}
# 在sink块中声明要输出到哪
sink {
Hive {
table_name = "mydemo.hive_mysql"
metastore_uri = "thrift://127.0.0.1:9083"
schema {
fields {
hname = string
hage= int
}
}
}
}
使用Flink提交同步作业
cd seatunnel/apache-seatunnel-incubating-2.3.0
#用我们刚配置的文件去启动作业
./bin/start-seatunnel-flink-connector-v2.sh --config ./config/example01.conf
提交任务,任务完成。这时打开Flink的web页面:
这里就可以看见运行的结果,错误或者单纯查询可以显示在这,正确了没有显示,我这里是之前的测试,这时候再去Hive里查询字段:
select * from hive_mysql;
内容如下
自此离线完成,反过来也能同步,hive-->mysql
实时CDC挖取日志同步到Hive
Mysql CDC配置打开