kafka在私有云DNAT环境中集群的典型应用

2023年 7月 15日 52.6k 0

今天配置的是一个2.5.0的一个kafka集群,新的版本将废弃zookeeper,今天不讨论新版本有一个私有的云环境,业务需求希望通过公网向kafka发送数据,使用SCRAM-SHA-256加密,内网仍然需要能够正常访问, 而其中,需要通过DNAT的方式来映射内网端口暴漏给互联网。而在做映射的时候,必然是一个端口对一个端口的,于是,大致的示意拓扑如下

如果你不是这种方式,可以尝试Kafka 实现内外网访问流量分离来解决问题

image-20220311232204412.png

而在实际的生产中,你会发现,内网采用内网IP进行访问的时候,kafka是可以正常协商进行处理请求

而在公网通过6.78.5.32的9092,9093,9094端口访问的时候会出现出现一个问题,客户端当请求A通过6.78.5.32:9092发送,经过防火墙DNAT层后,发给后端kafka,而此时kafka收到消息后回复给发送者,而回复的时候是使用的172.16.100.7:9092端口,你的客户端根本就不认识172.16.100.7,因此发送失败

image-20220311233012743.png

而这个现象在你只是向kafka发送消息,而不在乎他是否返回的时候,代码层面显示是成功的,但是数据并未成功插入。于是,就有了另外一种方式

image-20220311234318947.png

消息发送后需要返回,服务端和客户端都分别写ip和hostname,通过域名和本地hosts的方式解析出ip,分别发送到代理服务器和客户端,而不是某一个固定的ip。无论来自公网的访问还是内网的访问,最终在本地的hosts各自指向一个可以被访问到的一个ip,从而完成响应。

这种形式在官网的某些字段中被解读为“防止中间人攻击”

如下

image-20220311234535042.png

  • version: kafka_2.12-2.5.0
  • jdk: 1.8.0_211

先决条件:

  • 同步时间
10 * * * * ntpdate ntp.aliyun.com
  • 修改hosts并本地hosts
#172.16.100.7
hostnamectl set-hostname   kafka1
#172.16.100.8
hostnamectl set-hostname   kafka2
#172.16.100.9
hostnamectl set-hostname   kafka3
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3

准备工作

二进制安装java,或者rpm安装即可

tar xf jdk-8u211-linux-x64.tar.gz -C /usr/local/
cd /usr/local && ln -s jdk1.8.0_211 java
cat > /etc/profile.d/java.sh  /etc/systemd/system/zookeeper.service  /usr/local/kafka/config/server-scram.properties  /usr/local/kafka/config/server-scram.properties  /etc/systemd/system/kafka.service  /u01/data/zookeeper/myid

172.16.100.9

echo "2" > /u01/data/zookeeper/myid

  • kafka

修改server-scram.properties文件内容,这四项修改

172.16.100.8

broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093

172.16.100.9

broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094

172.16.100.8 kafka配置修改后如下

broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar


num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2

172.16.100.9 kafka配置修改后如下

broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar


num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2

zookeeper授权

先启动整个zookeeper集群,分别授权两个用户

  • 如果环境变量有问题可以在脚本/usr/local/kafka/bin/kafka-run-class.sh里面添加
JAVA_HOME=/usr/local/java
systemctl start zookeeper
systemctl enable zookeeper
systemctl status zookeeper

开始创建用户

创建语句

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
# 开始创建markadmin
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'markadmin'

# 开始创建marksugar
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'marksugar'.

查看所有SCRAM证书

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users

如下

[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096

查看单个用户的证书

/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name markadmin

查看

[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 10.100.63.9:2181 --describe --entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096

启动kafka

授权完成,启动第一台kafka。对目录进行授权

chown -R kafka.kafka /usr/local/kafka*

先手动启动测试是否正常

sudo -u kafka KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties

观察日志/usr/local/kafka/logs/server.log,正常情况下能够看到如下提示已经启动

[2021-05-21 17:12:03,524] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

此时kafka需要配置hosts,hosts包含所有的主机名和代理主机名

172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com

如果没用问题配置开启启动

systemctl enable kafka
systemctl start kafka
systemctl status kafka

并且以此启动其他两台

验证用户

创建主题

/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.100.63.7:2181 --create --topic test --partitions 12  --replication-factor 3

发送消息

export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"

cat producer.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xyt#*admin.com&!k4";

内网访问

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test  --producer.config producer.conf
> hello

远程

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test  --producer.config producer.conf

消费消息

cat consumer.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin.com&!k4";

内网访问

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094  --topic test --from-beginning --consumer.config consumer.conf
hello

远程

/usr/local/kafka/bin/kafka-console-consumer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning --consumer.config consumer.conf

构建代理层

nginx stream配置

stream {
log_format proxy '$remote_addr [$time_local]'
                '$protocol $status $bytes_sent $bytes_received'
                '$session_time "$upstream_addr" '
                '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';

    upstream kafka1 {
        server kafka1:9092 weight=1;
    }
    server {
        listen 9092;
        proxy_pass kafka1;
        access_log /data/logs/9092.log proxy ;
    }

    upstream kafka2 {
        server kafka2:9093 weight=1;
    }
    server {
        listen 9093;
        proxy_pass kafka2;
        access_log /data/logs/9093.log proxy ;
    }

    upstream kafka3 {
        server kafka3:9094 weight=1;
    }
    server {
        listen 9094;
        proxy_pass kafka3;
        access_log /data/logs/9094.log proxy ;
    }
}

添加hosts,并且kafka节点也要如下配置

172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com

测试kafka连通性

测试节点也需要配置hosts指向proxy

172.16.100.10 kafka.linuxea.com

安装 python 3.8 ,并且安装confluent_kafka

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple confluent_kafka 

And

pip install -i https://mirrors.aliyun.com/pypi/simple confluent_kafka

脚本如下

# !/usr/bin/python
# #encoding=utf-8

from confluent_kafka import Producer

import json
from datetime import datetime

"""
def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
    producer = KafkaProducer(bootstrap_servers=['IP:9092'],
                             security_protocol='SASL_PLAINTEXT',
                             #sasl_mechanism="SCRAM-SHA-256",
                             sasl_mechanism='PLAIN',
                             #sasl_kerberos_service_name='admin',
                             #sasl_kerberos_domain_name='hadoop.hadoop.com',
                             sasl_plain_username='admin',
                             sasl_plain_password="*admin.com",
                             #key_serializer=lambda k: json.dumps(k).encode('utf-8'),
                             value_serializer=lambda v: json.dumps(v).encode('utf-8')
                            # ,api_version=(0, 10)
                             )  # 连接kafka


    msg_dict = "Hello World".encode('utf-8')  # 发送内容,必须是bytes类型

    for i in range(0, 3):
        #msg = json.dumps(msg_dict)
        future = producer.send('test', msg_dict, partition=0)

        try:
            future.get(timeout=10)  # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()

    producer.close()

"""
def confluentKafkaDemo():
    topic_name = 'test'  ## 
    count = 100
    start = 0
    conf = {
        'bootstrap.servers': 'kafka.linuxea.com:9092,kafka.linuxea.com:9093,kafka.linuxea.com:9094',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': 'linuxea',  ## 用户名
        'sasl.password': 'MwMzA0MGFmOG'  ## 密码
    }
    producer = Producer(**conf)
    data = {
        'name': 'test1 is ok',
        'time': str(datetime.now())
    }

    try:
        while start < count:
            producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)
            producer.flush()
            start = start+1
    except Exception as e:
        print(e)

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


if __name__ == '__main__':
    #producer_demo()
    confluentKafkaDemo()

执行脚本查看是否插入成功

image-20220311235259397.png

kafka-eagle

kafka-eagle在被使用了用户验证的集群将能不能够正常使用,总会有一些瑕疵

kafka-eagle上仍然需要做hosts解析

172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3

下载2.0.5

tar xf kafka-eagle-bin-2.0.5.tar.gz  -C /usr/local/
cd /usr/local/kafka-eagle-bin-2.0.5
tar xf kafka-eagle-web-2.0.5-bin.tar.gz
ln -s /usr/local/kafka-eagle-bin-2.0.5/ /usr/local/kafka-eagle
cp /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties.bak
mkdir /data/kafka-eagle/db/ -p

为了方便,kafka-eagle必须修改hostname为ip地址

hostnamectl set-hostname 192.168.3.6

配置环境变量

cat > /etc/profile.d/kafka-eagle.sh

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论