OGG从Oracle备库同步数据至kafka

2024年 7月 17日 46.3k 0

1 目的

测试物理standby 作为ogg 源端的可行性,效率及安全性。

2 环境及规划

以下数据库及OGG版本是实际目的的最低版本要求。

  • 环境

    服务器ip 作用
    10.10.100.91 Oracle_primary (zookeeper kafka)
    10.10.100.92 Oracle_standby ogg (zookeeper kafka)
    10.10.100.98 zookeeper kafka ogg
  • 版本及路径

    软件 版本 路径
    oracle 11.2.0.4 /u01/app/oracle
    zookeeper 3.4.13 /opt/zookeeper-3.4.13
    kafka 2.12-2.1.1 /opt/kafka_2.12-2.1.1
    ogg for bigdata 12.3.2.1.1 /u01/app/ogg
    ogg for oracle 12.3.0.1.4 /u01/app/ogg
    jdk 1.8u181 /opt/jdk1.8.0_181

3 安装配置JDK

在所有节点安装。由于OGG 12以上的版本都要求jdk1.8以上。因此需要单独安装jdk。

3.1 安装jdk

tar -xzvf /opt/jdk-8u181-linux-x64.tar.gz -C /opt/jdk1.8.0_181

3.2 配置环境变量

根据规则,现在我们将每个环境中的环境变量进行配置。 按如下说明修改 $HOME/.bash_profile:

#添加一行
export JAVA_HOME=/opt/jdk1.8.0_181
# 修改PATH变量
PATH=$JAVA_HOME/bin:$PATH

4 安装Dataguard

4.1 安装备库软件

  • 上传软件并解压 上传文件请使用ftp或者类ftp方式等。解压安装包示例:

    unzip p13390677_112040_Linux-x86-64_1of7.zip
    unzip p13390677_112040_Linux-x86-64_2of7.zip

  • 编辑响应文件 由于参数过多,此处只列出需要调整的内容. :

    oracle.install.option=INSTALL_DB_SWONLY
    ORACLE_HOSTNAME=pmo2
    UNIX_GROUP_NAME=oinstall
    INVENTORY_LOCATION=/u01/app/oraInventory
    ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
    ORACLE_BASE=/u01/app/oracle/
    oracle.install.db.DBA_GROUP=dba
    oracle.install.db.OPER_GROUP=oper
    oracle.install.db.config.starterdb.globalDBName=orcl
    oracle.install.db.config.starterdb.SID=orcl
    oracle.install.db.config.starterdb.characterSet=AL32UTF8
    oracle.install.db.config.starterdb.memoryOption=true
    oracle.install.db.config.starterdb.memoryLimit=1384
    oracle.install.db.config.starterdb.password.ALL=Sys123passwd
    oracle.install.db.config.starterdb.storageType=FILE_SYSTEM_STORAGE
    oracle.install.db.config.starterdb.fileSystemStorage.dataLocation=/u01/app/oracle/oradata
    oracle.install.db.config.starterdb.fileSystemStorage.recoveryLocation=/u01/app/oracle/arch

  • 安装软件

    cd ~/database/
    ./runInstaller -silent -force -ignoreSysprereqs -ignorePrereq -showProgress -responseFile /home/oracle/database/response/db_install.rsp

4.2 配置dataguard

4.2.1 主库

主库开启强制日志

alter database force logging;

主库调整参数

alter system set db_unique_name='orcl' scope=spfile;
alter system set log_archive_config='DG_CONFIG=(primary,standby)' scope=both;
alter system set log_archive_dest_1='LOCATION=/u01/app/oracle/arch VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=orcl' scope=both;
alter system set log_archive_dest_2='SERVICE=standby LGWR ASYNC VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=orcl' scope=both;
alter system set log_archive_format='%t_%s_%r.arc' scope=spfile;

重启主库, 使参数生效。

主库添加standby 日志

日志添加规则为原有日志组个数为N, 添加的standby 日志组个数为N+1. 默认数据库的日志为3个,大小为50M.

alter database add standby logfile group 4('/u01/app/oracle/oradata/redo04.log') size 50m;
alter database add standby logfile group 5('/u01/app/oracle/oradata/redo05.log') size 50m;
alter database add standby logfile group 6('/u01/app/oracle/oradata/redo06.log') size 50m;
alter database add standby logfile group 7('/u01/app/oracle/oradata/redo07.log') size 50m;

主库创建密码文件

orapwd file=$ORACLE_HOME/dbs/orapworcl entries=5 force=y ignorecase=y password=oracle

创建完密码文件后,记得在主库修改下sys用户密码。此时的密码,以简单便于使用为主。后期再调整为符合密码规则的密码。

sqlplus / as sysdba
alter user sys identified by oracle;
exit

TNS配置

TNS 文件: $ORACLE_HOME/network/admin/tnsnames.ora
文件内容如下:

primary =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = 10.10.100.91)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = orcl)
)
)

standby =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = 10.10.100.92)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = orcl)
(UA=R)
)
)

4.2.2 备库

参数

备库参数文件为$ORACLE_HOME/dbs/initorcl.ora. 内容如下:

*.db_name='orcl'
*.db_unique_name='orcls'
*.compatible='11.2.0.4.0'

*.fal_server='primary'
*.log_archive_config='DG_CONFIG=(orcls,orcl)'
*.log_archive_dest_1='LOCATION=/u01/app/oracle/oradata/arch VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=orcls'
*.log_archive_dest_state_1='ENABLE'
*.db_file_name_convert='/u01/app/oracle/oradata/orcl/','/u01/app/oracle/oradata/'
*.log_file_name_convert='/u01/app/oracle/oradata/orcl/','/u01/app/oracle/oradata/redo'
*.standby_file_management='AUTO'

启动实例

请注意,备库此时需要启动至nomount状态。

sqlplus / as sysdba
startup nomount;
exit

监听配置并启动

  • 监听配置

监听文件: $ORACLE_HOME/network/admin/listener.ora

sid_list_listener =
(sid_list =
(sid_desc =
(global_dbname=orcl )
(sid_name=orcl )
(oracle_home=/u01/app/oracle/product/11.2.0/dbhome_1)
)
)

LISTENER =
(DESCRIPTION_LIST =
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = 10.10.100.92)(PORT = 1521))
)
)

ADR_BASE_LISTENER = /u01/app/oracle

  • 启动监听

    lsnrctl start

密码文件

将主库的密码文件拷贝至备库的$ORACLE_HOME/dbs路径中。

scp 10.10.100.91:$ORACLE_HOME/dbs/orapworcl $ORACLE_HOME/dbs/

TNS配置

将主库的密码文件拷贝至备库的$ORACLE_HOME/network/admin路径中。 使用如下命令,或者通过复制粘贴待方式将文件内容复制到备库环境。

scp 10.10.100.91:$ORACLE_HOME/network/admin/tnsnames.ora $ORACLE_HOME/network/admin/

4.3 完成操作

在主库或者备库均可操作。

  • 连接 rman 环境

    rman target sys/oracle@primary auxiliary sys/oracle@setandby

  • 执行 rman 命令

    run
    {
    allocate channel cl1 type disk;
    allocate channel cl2 type disk;
    allocate channel cl3 type disk;
    allocate auxiliary channel c1 type disk;
    allocate auxiliary channel c2 type disk;
    allocate auxiliary channel c3 type disk;
    duplicate target database for standby from active database dorecover ;
    release channel c1;
    release channel c2;
    release channel c3;
    release channel cl1;
    release channel cl2;
    release channel cl3;
    }

4.4 启动实时复制

alter database recover managed standby database using current logfile disconnect from session;

5 zookeeper集群

5.1 上传并解压

可使用ftp 等工具或者其他工具上传。上传后解压. 注意,所有节点都需要安装。所以要上传至所有服务器并解压。

tar -xzvf ~/zookeeper-3.4.13.tar.gz -C /opt/

5.2 配置

zookeeper 集群中所有节点都需要进行配置。

  • 规则相关路径

    # 日志文件路径
    mkdir -p /opt/zookeeper/zkdatalog
    # 数据存放路径
    mkdir -p /opt/zookeeper/zkdata

  • 配置内容 进入到目录中, 查看配置文件.相关的配置文件存储于

    /zookeeper-/conf 路径中。

    # cd /opt/zookeeper-3.4.13/conf
    # ls
    configuration.xsl log4j.properties zoo_sample.cfg
    # 说明:
    # zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件,给他复制一份命名为zoo.cfg,zoo.cfg是官方指定的文件命名规则。\\

    文件内容如下:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/opt/zookeeper/zkdata
    dataLogDir=/opt/zookeeper/zkdatalog
    # 3.4 and later
    autopurge.snapRetainCount=60
    autopurge.purgeInterval=24
    clientPort=12181
    server.1=10.10.100.91:12888:13888
    server.2=10.10.100.92:12888:13888
    server.3=10.10.100.98:12888:13888

5.3 创建myid文件

在每个服务器上,将server.N 中的N 写入dataDir 路径中的myid文件。

# server1
echo "1" > /opt/zookeeper/zkdata/myid
#server2
echo "2" > /opt/zookeeper/zkdata/myid
#server3
echo "3" > /opt/zookeeper/zkdata/myid

5.4 配置环境变量

为了日后管理方便,建议将zookeeper的位置信息配置到用户的环境变量中去。此处使用 .bash_profile ,在文件中添加以下内容。

export ZK_HOME=/opt/zookeeper-3.4.13
PATH=$JAVA_HOME:$ZK_HOME/bin:$PATH

通过 source ~/.bash_profile/ 使修改生效.

5.5 启动和查看服务

#cd $ZK_HOME/bin
zkServer.sh start
zkServer.sh status

经查看,zk 集中,server.2 为leader,server1,server3 为follower。 不同环境这个结果是不一样的。
如果没启动,可以使用./zkServer.sh start-foreground启动,屏幕上会显示日志信息,能看出哪块出了问题。

6 kafka集群

6.1 上传并解压

可使用ftp 等工具或者其他工具上传,然后用解压。所有节点都需要安装,所以要上传至所有服务器。

tar -xzvf kafka_2.12-2.1.1.tgz

6.2 配置

kafka集群所有节点都需要配置。

  • 规则相关路径

    #消息目录
    mkdir -p /opt/kafka_2.12-2.1.1/logs

  • 修改配置文件 kafka 的配置文件有很多,存放在

    /kafka-version/config中。
    对于本环境来讲,路径为:/opt/kafka_2.12-2.1.1/config。
    需要调整的配置文件为:server.properties

    # 每台服务器的broker.id都不能相同
    broker.id=0

    #hostname和port参数,在2.12 2.1.1 版本的server.properties 中已经找不到该参数配置了。
    #取而代之的是 listeners参数. 其中的IP 地址, 设置为本主机的IP。
    #host.name=10.10.100.91
    #port = 9092
    listeners=PLAINTEXT://10.10.100.92:9092
    # 日志路径
    log.dirs=/opt/kafka_2.12-2.1.1/logs
    #在log.retention.hours=168 下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880

    #设置zookeeper的连接端口,集群中所有节点都需要包含。
    zookeeper.connect=10.10.100.91:12181,10.10.100.92:12181,10.10.100.98:12181

6.3 配置环境变量

为了方便日后管理,需要配置kafka的环境变量。在$HOME/.bash_profile 文件中加入以下 两行:

export KFK_HOME=/opt/kafka_2.12-2.1.1/config
export PATCH=$KFK_HOME/bin:$PATH

或者直接修改为以下格式,包含zookeeper与kafka的环境变量:

export ZK_HOME=/opt/zookeeper-3.4.13
export KFK_HOME=/opt/kafka_2.12-2.1.1
export PATH=$JAVA_HOME:$ZK_HOME/bin:$KFK_HOME/bin:$PATH

配置完成后, 使用source 命令使配置生效。

6.4 服务管理

  • 启动服务

    kafka-server-start.sh -daemon $KFK_HOME/config/server.properties

  • 查看服务

    # 使用jps查看kafka进程是否有启动, 直接执行jps,正常如下:
    22707 QuorumPeerMain
    28439 Jps
    28314 Kafka

6.5 TOPIC

  • 创建Topic

    kafka-topics.sh --create --zookeeper 10.10.100.92:12181,10.10.100.91:12181,10.10.100.98:12181 --replication-factor 3 -partitions 3 --topic testogg

    成功时提示:Created topic “testogg”.

  • 查看Topic

    #列出所有可用的topics
    kafka-topics.sh --list --zookeeper 10.10.100.91:12181
    # 查看指定topic 信息
    kafka-topics.sh --describe --zookeeper 10.10.100.91:12181,10.10.100.92:12181,10.10.100.98:12181 --topic testogg

  • 创建consumer 这里创建一个consumer,以便在安装配置完ogg 后,进行验证。

    kafka-console-consumer.sh -–zookeeper 10.10.100.92:12181,10.10.100.91:12181,10.10.100.98:12181 -–from-beginning –-topic oggtest

6.6 测试集群

  1. 发布消息 利用kafka自身提供的发布消息的脚本, 来创建发布消息.

    kafka-console-producer.sh --broker-list 10.10.100.91:9092,10.10.100.92:9092,10.10.100.98:9092 --topic testogg

  2. 接收消息

    kafka-console-consumer.sh --topic testogg --bootstrap-server 10.10.100.92:9092,10.10.100.91:9092,10.10.100.98:9092 --group testogg1 --from-beginning

  3. 测试结果

    • 消息发送端

      [root@pmo02 config]# kafka-console-producer.sh --broker-list 10.10.100.91:9092,10.10.100.92:9092,10.10.100.98:9092 --topic testogg
      >1,'This is a test message', `date`
      >2,'Second test message',boooooooo
      >一人

    • 消息接收端

      [root@app-01 ~]# kafka-console-consumer.sh --topic testogg --bootstrap-server 10.10.100.92:9092,10.10.100.91:9092,10.10.100.98:9092 --group testogg1 --from-beginning
      2,'Second test message',boooooooo
      1�,'This is a test message', `date`
      一人

7 OGG 安装配置

OGG针对不同的软件有不同的版本,我们需要安装针对Oracle 版本和针对大数据的版本。 本次测试的是, oracle 从备库同步数据至kafka集群。因此我们需要将ogg for oracle 安装至备库(10.10.100.92)。

7.1 源端

7.1.1 准备

准备OGG软件

将文件上传至服务器oracle用户的家目录. 并保证Oracle用户对安装包有操作权限:

chown -R oracle:oinstall /home/oracle/123014_fbo_ggs_Linux_x64_shiphome.zip
unzip 123014_fbo_ggs_Linux_x64_shiphome.zip
# 软件解压后的路径为fbo_ggs_Linux_x64_shiphome

修改OGG安装的配置文件

软件安装的配置文件存放于 =

/fbo_ggs_Linux_x64_shiphome/Disk1/response/= 中. 名为oggcore.rsp. 请按下面要求进行设置:

# 保持不变
oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
# 与数据库版本保持一致,分为ORA12c与ORA11g
INSTALL_OPTION=ORA11g
# 配置ogg 的安装路径
SOFTWARE_LOCATION=/u01/app/ogg
# 如果需要启动mgr 需要配置以下 三项,如果不需要启动,则不需要配置,此处不进行配置。
START_MANAGER=
MANAGER_PORT=
DATABASE_LOCATION=
# windows 平台以下两项不需要配置。其他平台需要配置。
INVENTORY_LOCATION=/u01/app/oraInventory
UNIX_GROUP_NAME=oinstall

创建OGG安装路径

# 用Oracle用户创建
mkdir -p /u01/app/ogg

静默安装OGG

# 进入相关路径
cd ~/fbo_ggs_Linux_x64_shiphome/Disk1/
# 执行静默安装
./runInstaller -silent -responseFile `pwd`/response/oggcore.rsp
# 安装过程中会提示安装日志, 可通过日志查看安装过程是否正常。

配置环境变量

#添加OGG_HOME
export OGG_HOME=/u01/app/ogg
# 将OGG_HOME路径添加至PATH
export PATH=$PATH:$OGG_HOME

创建相关路径

ogg 提供了一个命令,用于创建相关的子目录 ,比如数据目录,配置文件存放目录等。

ggsci 'OGG',
privilege_type => 'CAPTURE',
grant_select_privileges => TRUE,
do_grants => TRUE);
END;
/

配置ddl同步

cd $GGATE
sqlplus / as sysdba
@marker_setup.sql;
@ddl_setup.sql;
@role_setup.sql;
grant GGS_GGSUSER_ROLE to ogg;
@ddl_enable.sql;

性能优化

cd $OGG_HOME
sqlplus / as sysdba
@?/rdbms/admin/dbmspool
-- ddl_pin将触发器用到的plsql包放进内存中
sqlplus / as sysdba
@ddl_pin ogg;
exit

7.1.3 配置OGG

配置MGR

edit params mgr
# 输入如下内容,第三行,一般不配置.
PORT 7809
DYNAMICPORTLIST 7810-7860
# AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

数据初始化配置

数据初始化,指的是从源端Oracle 数据库将已存在的需要的数据同步至目标端.
此节中,操作都可在OGG 环境中进行。对OGG的内部逻辑了解的,也可以在shell环境中配置.

配置初始化进程

```
add extract init01, sourceistable
EDIT PARAMS init01
# 编辑参数内容如下:
EXTRACT init01
SETENV (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK)
USERID ogg,PASSWORD ogg123
#USERID c##ggadmin,PASSWORD ggadmin # for oracle 12C,注意用户名格式
RMTHOST 10.10.100.98, MGRPORT 7809
RMTFILE ./dirdat/in,maxfiles 999, megabytes 500,format release 12.3
# SOURCECATALOG PDBNAME # FOR ORACLE 12C,11g does not need.
table oggtest.*;
```

生成表结构

GoldenGate 提供了一个名为 DEFGEN 的专用工具,用于生成数据定义,当源表和目标表中 的定义不同时,Oracle GoldenGate 进程将引用该专用工具。在运行 DEFGEN 之前,需要 为其创建一个参数文件:

```
edit param defgen
USERID ogg,PASSWORD ogg123
defsfile ./dirdef/defgen.def,format release 12.2 # format release 指的是OGG的版本
# SOURCECATALOG orclpdb # for oracle 12C 。11g及之前版本不需要
table oggtest.*;
```

- note

配置中每个 table 行尾都要加上分号。不然会报错。如上面配置:table oggtest.*;

生成表结构文件,需要执行shell命令,如果配置中的文件已经存在,执行下面命令会报错,所以 在执行前需要先删除:

```
rm -f $OGG_HOME/dirdef/defgen.def
defgen paramfile dirprm/defgen.prm
```

将生成的定义文件传送到目标端, 目标端的replicate进程会使用这个文件。

```
scp $OGG_HOME/dirdef/defgen.def 10.10.100.98:/u01/app/ogg/dirdef/
```

实时同步配置

关于集成模式

standby 实例 不支持集成模式 集成模式关键的三个命令:

```
add extract ext_kfk,integrated tranlog,threads 1,begin now
DBLOGIN USERID ,PASSWORD

# for 12C
register extract ext_kfk database container (orclpdb)
# for 11g/ 10G
register extract ext_kfk
```

配置抽取进程

添加抽取进程,OGG命令行:

```
add extract ext_kfk,tranlog,threads 1,begin now
add exttrail ./dirdat/kf, extract ext_kfk, megabytes 500
```

配置抽取进程参数,OGG命令行:

```
edit params ext_kfk
```

以下内容为参数明细

```
EXTRACT ext_kfk
USERID ogg, PASSWORD ogg123
Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
gettruncates
DISCARDFILE ./dirrpt/ext_kfk.dsc, APPEND, MEGABYTES 1024
# RMTHOST 10.10.100.98, MGRPORT 9178
# RMTFILE ./dirdat/kf,maxfiles 999, megabytes 500,format release 12.2
DBOPTIONS ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY 1 MINUTES, RATE
WARNLONGTRANS 2h,CHECKINTERVAL 300
FETCHOPTIONS NOUSESNAPSHOT
EXTTRAIL ./dirdat/kf
TRANLOGOPTIONS MINEFROMACTIVEDG
# TRANLOGOPTIONS altarchivelogdest primary instance orcl /u01/app/oracle/oradata/arch 配置此行时,将读取归档日志
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
table oggtest.*;
```

若配置rmthost rmtfile 参数, 可不配置分发进程(pump).

- TABLE 关键词,必须以分号结束。
- TRANLOGOPTIONS altarchivelogdest 是standby 端作为ogg 源端, ogg 读取归档的关键。也适用于RAC环境,不过RAC环境不建议配置。
- 一般情况下,ogg 同步数据有5秒左右的延迟. 为了解决这个问题,有 的同学可能会配置 EOFDELAY或者EOFDELAYCSECS,以及FLUSHCSECS. 有 些情况,配置确实可以减少延迟,但不是所有情况都有用。而且配置 以后,很容易产生control file sequence read 等待。

配置分发进程

OGG 环境添加分发进程:

```
add extract pmp_kfk,exttrailsource ./dirdat/kf
add rmttrail ./dirdat/kf,EXTRACT PMP_kfk,MEGABYTES 500
```

编辑分发进程参数:

```
edit param PMP_KAF1
#内容如下:
extract pmp_kfk
USERID ogg, password ogg123
PASSTHRU
RMTHOST 10.10.100.98, MGRPORT 7809
RMTTRAIL ./dirdat/kf,format release 12.3
# 下面一行适用于12C
# SOURCECATALOG orclpdb
table oggtest.*;
```

7.2 目标端(kafka)

7.2.1 安装

oracle 软件统一安装到/u01/app/路径中,方便日后统一管理.OGG解压到路径/u01/app/ogg.

unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar -xvf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /u01/app/ogg/

7.2.2 配置环境变量

#su - oracle
#vi .bash_profile
# 内容如下:
JAVA_HOME=/opt/jdk1.8.0_181/
OGG_HOME=/u01/app/ogg
LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server/:$JAVA_HOME/lib:$LD_LIBRARY_PATH
PATH=$JAVA_HOME/bin:$PATH:$HOME/bin:$OGG_HOME

export PATH JAVA_HOME ORACLE_HOME OGG_HOME LD_LIBRARY_PATH

修改完环境变量执行: source .bash_profile

7.2.3 目标端OGG配置

创建相关路径

ggsci
create subdirs

示例:

$ ggsci

Oracle GoldenGate Command Interpreter
Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.

GGSCI (app-01) 1> create subdirs

Creating subdirectories under current directory /home/oracle

Parameter file /u01/app/ogg/dirprm: created.
Report file /u01/app/ogg/dirrpt: created.
Checkpoint file /u01/app/ogg/dirchk: created.
Process status files /u01/app/ogg/dirpcs: created.
SQL script files /u01/app/ogg/dirsql: created.
Database definitions files /u01/app/ogg/dirdef: created.
Extract data files /u01/app/ogg/dirdat: created.
Temporary files /u01/app/ogg/dirtmp: created.
Credential store files /u01/app/ogg/dircrd: created.
Masterkey wallet files /u01/app/ogg/dirwlt: created.
Dump files /u01/app/ogg/dirdmp: created.

复制参数文件

cd $OGG_HOME/AdapterExamples/big-data/kafka
cp * $OGG_HOME/dirprm

参数配置

custom_kafka_producer.properties

```
bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=10000
```

kafka.props

该配置文件主要控制着消息输出格式。由参数: *gg.handler.kafkahandler.format* 控制。

- text mode

```
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= testogg -- 此处指定为要同步到的目标testogg名字
gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=|
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.SchemaTopicName= testogg --此处指定为要同步到的目标topic名字
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
```

- json mode

```
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=xiamen_dev
#The following selects the message key using the concatenated primary keys
gg.handler.kafkahandler.keyMappingTemplate=xiamen_dev
gg.handler.kafkahandler.format=json
gg.handler.hdfs.format.jsonDelimiter=CDATA[]
#gg.handler.kafkahandler.format.fieldDelimiter=
#gg.handler.name.format.pkUpdateHandling=delete-insert
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.SchemaTopicName=xiamen_dev
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op

goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/kafka_2.12-2.1.1/libs/*:/u01/app/ogg/*:/u01/app/ogg/lib/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
```

配置MGR

edit params mgr
# 内容如下(第三行在实际配置时已删除,一般不配置成自动启动):
PORT 7809
DYNAMICPORTLIST 7810-7860
-- AUTORESTART REPLICAT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

配置初始化进程

  • 添加并配置初始化进程

    # 添加进程
    ADD replicat init01, specialrun
    # 配置进程
    edit params init01

    参数内容:

    SPECIALRUN
    end runtime
    setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
    targetdb libfile libggjava.so set property=./dirprm/kafka.props
    SOURCEDEFS ./dirdef/defgen.def --- 如果有多个map ,则需要配置sourcedefs,如果只有一个则可以不配置。
    EXTFILE ./dirdat/in
    reportcount every 1 minutes, rate
    grouptransops 10000
    map oggtest.*,target oggtest.*;

    如果是12C 数据库, map 后面的格式为pdb.schema.table , target schema.table

  • 示例

    GGSCI (app-01) 1> ADD replicat init01, specialrun
    REPLICAT added.
    GGSIC (app-01) 2> exit
    cd $OGG_HOME/dirprm
    cat >> init01.prm rep1.prm

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论