第三期【Demo教程】教你使用SeaTunnel把数据从MySQL导到Hive

2024年 6月 21日 50.0k 0

随着数据技术的快速发展,了解并掌握各种工具和技术变得尤为重要。为此,我们准备在Apache SeaTunnel社区发起如何使用连接器的Demo演示计划,邀请所有热爱数据同步技术的同学分享他们的知识和实操经验!

第三期【Demo教程】教你使用SeaTunnel把数据从MySQL导到Hive-1

我们第三期主题是:如何使用SeaTunnel从MySQL同步到Hive,如果您对此计划感兴趣,也欢迎联系社区运营同学(微信号seatunnel1)参与Demo录制!无论您是数据工程师、开发者还是技术爱好者,都欢迎您参与并展示您的技术才能。

敲重点~敲重点~如果你是用户,想看什么同步场景的Demo!请下滑到最底部留言,我们优先录制大家呼声最高的同步场景Demo!

Demo计划目标

我们的目标是创建一个共享和学习的平台,通过具体的Demo演示和对应的文档帮助社区成员更好地理解和应用各种数据连接器。这些Demo可以帮助新手快速学习,同时也为资深专家提供一个展示创新解决方案的舞台。

关于从MySQL同步到Hive,前段时间也有用户投稿,感兴趣的同学可以看看:

【最佳实践】2个步骤教你从Mysql同步到Hive

如何使用 SeaTunnel 同步 MySQL 数据到 Hive

Mysql Source连接器相关请参考之前的教程:

全方位解读SeaTunnel MySQL CDC连接器:实现数据高效同步的强大工具

历史Demo教程:

  • 【第一期Demo 视频教程】使用SeaTunnel从MySQL同步到Doris

  • 【第二期Demo教程】教你使用MySQL CDC连接器,把数据从MySQL导到Doris

如何使用 Hive Sink 连接器进行高效数据同步

需要参考的文档及代码原文链接:https://seatunnel.apache.org/docs/2.3.5/connector-v2/sink/Hive (预计2.3.6版本才能正式使用)

描述

将数据写入到 Hive。

要使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hive。

如果您使用 SeaTunnel Zeta Engine,则需要将 seatunnel-hadoop3-3.1.4-uber.jar
hive-exec-3.1.3.jar
libfb303-0.9.3.jar
放置在 $SEATUNNEL_HOME/lib/
目录下。:::

关键特性

  • [x] 精确一次

默认情况下,我们使用两阶段提交(2PC)来确保 精确一次

  • [x] 文件格式
    • [x] text
    • [x] csv
    • [x] parquet
    • [x] orc
    • [x] json
  • [x] 压缩编码
    • [x] lzo

选项

名称 类型 必需 默认值
table_name string -
metastore_uri string -
compress_codec string none
hdfs_site_path string -
hive_site_path string -
hive.hadoop.conf Map -
hive.hadoop.conf-path string -
krb5_path string /etc/krb5.conf
kerberos_principal string -
kerberos_keytab_path string -
abort_drop_partition_metadata boolean true
common-options -

table_name [string]

目标 Hive 表的名称,例如:db1.table1
。如果源是多模式的,您可以使用 ${database_name}.${table_name}
来生成表名,它会用源中生成的 CatalogTable 的值替换 ${database_name}
${table_name}

metastore_uri [string]

Hive Metastore 的 URI。

hdfs_site_path [string]

hdfs-site.xml
的路径,用于加载 namenodes 的高可用配置。

hive_site_path [string]

hive-site.xml
的路径。

hive.hadoop.conf [map]

Hadoop 配置文件中的属性(core-site.xml
hdfs-site.xml
hive-site.xml
)。

hive.hadoop.conf-path [string]

core-site.xml
hdfs-site.xml
hive-site.xml
文件的指定加载路径。

krb5_path [string]

krb5.conf
的路径,用于 Kerberos 认证。

kerberos_principal [string]

Kerberos 的 principal。

kerberos_keytab_path [string]

Kerberos 的 keytab 路径。

abort_drop_partition_metadata [list]

决定在中止操作期间是否从 Hive Metastore 中删除分区元数据的标志。

注意:这仅影响 metastore 中的元数据,同步过程中生成的数据将始终被删除。

common options

Sink 插件的常用参数,请参阅 Sink Common Options 获取详细信息。

示例

Hive {
  table_name = "default.seatunnel_orc"
  metastore_uri = "thrift://namenode001:9083"
}

示例 1

我们有一个源表,如下所示:

create table test_hive_source(
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_int INT,
  test_bigint BIGINT,
  test_boolean BOOLEAN,
  test_float FLOAT,
  test_double DOUBLE,
  test_string STRING,
  test_binary BINARY,
  test_timestamp TIMESTAMP,
  test_decimal DECIMAL(8,2),
  test_char CHAR(64),
  test_varchar VARCHAR(64),
  test_date DATE,
  test_array ARRAY,
  test_map MAP,
  test_struct STRUCT
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

我们需要从源表读取数据并写入到另一个表中:

create table test_hive_sink_text_simple(
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_int INT,
  test_bigint BIGINT,
  test_boolean BOOLEAN,
  test_float FLOAT,
  test_double DOUBLE,
  test_string STRING,
  test_binary BINARY,
  test_timestamp TIMESTAMP,
  test_decimal DECIMAL(8,2),
  test_char CHAR(64),
  test_varchar VARCHAR(64),
  test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

作业配置文件如下:

env {
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    table_name = "test_hive.test_hive_source"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_text_simple"
    metastore_uri = "thrift://ctyun7:9083"
    hive.hadoop.conf = {
      bucket = "s3a://mybucket"
    }
  }
}

Hive on S3

1、为 EMR 的 Hive 创建 lib 目录

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

2、从 Maven 中心获取 jar 到 lib 目录

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

3、从 EMR 环境中复制 jar 到 lib 目录

cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib

4、运行测试用例

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_s3"
    metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
    hive.hadoop.conf = {
       bucket="s3://ws-package"
    }
  }
}

Hive on OSS

1、为 EMR 的 Hive 创建 lib 目录

mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

2、从 Maven 中心获取 jar 到 lib 目录

cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

3、从 EMR 环境中复制 jar 到 lib 目录并删除冲突的 jar

cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar

4、运行测试用例

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_oss"
    metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
    hive.hadoop.conf-path = "/tmp/hadoop"
    hive.hadoop.conf = {
        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
    }
  }
}

示例 2

我们有多个源表,如下所示:

create table test_1(
)
PARTITIONED BY (xx);

create table test_2(
)
PARTITIONED BY (xx);
...

我们需要从这些源表读取数据并写入到其他表中:

作业配置文件如下:

env {
  # 在这里设置 Flink 配置
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    tables_configs = [
      {
        table_name = "test_hive.test_1"
        metastore_uri = "thrift://ctyun6:9083"
      },
      {
        table_name = "test_hive.test_2"
        metastore_uri = "thrift://ctyun7:9083"
      }
    ]
  }
}

sink {
  Hive {
    table_name = "${database_name}.${table_name}"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

通过视频教程,我们探讨了如何使用 Apache SeaTunnel 的 Hive Sink Connector 将数据高效地写入 Hive 表。

无论是在本地环境还是云上部署,使用 Hive Sink Connector 都能够帮助企业构建高效、可靠的数据处理流程。希望通过本文的指导,您能更好地理解和应用这一强大的工具,以满足您的数据处理需求。

如果您对本文内容有任何疑问或建议,欢迎在评论区分享您的想法。让我们共同探讨和进步,不断推动数据技术的边界。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论