今天配置的是一个2.5.0的一个kafka集群,新的版本将废弃zookeeper,今天不讨论新版本有一个私有的云环境,业务需求希望通过公网向kafka发送数据,使用SCRAM-SHA-256加密,内网仍然需要能够正常访问, 而其中,需要通过DNAT的方式来映射内网端口暴漏给互联网。而在做映射的时候,必然是一个端口对一个端口的,于是,大致的示意拓扑如下
如果你不是这种方式,可以尝试Kafka 实现内外网访问流量分离来解决问题
而在实际的生产中,你会发现,内网采用内网IP进行访问的时候,kafka是可以正常协商进行处理请求
而在公网通过6.78.5.32的9092,9093,9094
端口访问的时候会出现出现一个问题,客户端当请求A通过6.78.5.32:9092
发送,经过防火墙DNAT层后,发给后端kafka,而此时kafka收到消息后回复给发送者,而回复的时候是使用的172.16.100.7:9092
端口,你的客户端根本就不认识172.16.100.7
,因此发送失败
而这个现象在你只是向kafka发送消息,而不在乎他是否返回的时候,代码层面显示是成功的,但是数据并未成功插入。于是,就有了另外一种方式
消息发送后需要返回,服务端和客户端都分别写ip和hostname,通过域名和本地hosts的方式解析出ip,分别发送到代理服务器和客户端,而不是某一个固定的ip。无论来自公网的访问还是内网的访问,最终在本地的hosts各自指向一个可以被访问到的一个ip,从而完成响应。
这种形式在官网的某些字段中被解读为“防止中间人攻击”
如下
- version: kafka_2.12-2.5.0
- jdk: 1.8.0_211
先决条件:
- 同步时间
10 * * * * ntpdate ntp.aliyun.com
- 修改hosts并本地hosts
#172.16.100.7
hostnamectl set-hostname kafka1
#172.16.100.8
hostnamectl set-hostname kafka2
#172.16.100.9
hostnamectl set-hostname kafka3
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
准备工作
二进制安装java,或者rpm安装即可
tar xf jdk-8u211-linux-x64.tar.gz -C /usr/local/ cd /usr/local && ln -s jdk1.8.0_211 java cat > /etc/profile.d/java.sh /etc/systemd/system/zookeeper.service /usr/local/kafka/config/server-scram.properties /usr/local/kafka/config/server-scram.properties /etc/systemd/system/kafka.service /u01/data/zookeeper/myid
172.16.100.9
echo "2" > /u01/data/zookeeper/myid
- kafka
修改server-scram.properties文件内容,这四项修改
172.16.100.8
broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093
172.16.100.9
broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094
172.16.100.8 kafka配置修改后如下
broker.id=2
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.8:9093
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9093
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
172.16.100.9 kafka配置修改后如下
broker.id=3
##### Socket Server Settings 监听协议和端口#######
listeners=SASL_PLAINTEXT://172.16.100.9:9094
advertised.listeners=SASL_PLAINTEXT://kafka.linuxea.com:9094
######### Log Basics ##########
#日志路径
log.dirs=/u01/data/kafka/
#num.partitions=16
######## Zookeeper 集群信息 ##########
zookeeper.connect=172.16.100.7:2181,172.16.100.8:2181,172.16.100.9:2181
###### SCRAM Settings 认证部分########
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:markadmin;User:marksugar
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
num.partitions=3
#auto.create.topics.enable=true
default.replication.factor=2
zookeeper授权
先启动整个zookeeper集群,分别授权两个用户
- 如果环境变量有问题可以在脚本/usr/local/kafka/bin/kafka-run-class.sh里面添加
JAVA_HOME=/usr/local/java
systemctl start zookeeper
systemctl enable zookeeper
systemctl status zookeeper
开始创建用户
创建语句
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
# 开始创建markadmin
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG],SCRAM-SHA-256=[password=MwMzA0MGIwZjMwMjg3MjY4NWE2ZGFmOG]' \
--entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'markadmin'
# 开始创建marksugar
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --alter \
--add-config 'SCRAM-SHA-256=[iterations=8192,password=linuxea.com],SCRAM-SHA-256=[password=linuxea.com]' \
--entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Completed updating config for entity: user-principal 'marksugar'.
查看所有SCRAM证书
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users
如下
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096
查看单个用户的证书
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
/usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name markadmin
查看
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 172.16.100.9:2181 --describe --entity-type users --entity-name marksugar
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'marksugar' are SCRAM-SHA-256=salt=MTJnbGxpMWRzajZoMXRvcnBxcXF3b241MDY=,stored_key=mCocSbPBI0yPp12Kr9131nFDA6GIP11p++FQwp0+Ri4=,server_key=vzMqKkT+ZwaVWOryD2owVlMk5gMEaSW2wZI+s1x9Fd8=,iterations=4096
[root@kafka1 config]# /usr/local/kafka/bin/kafka-configs.sh --zookeeper 10.100.63.9:2181 --describe --entity-type users --entity-name markadmin
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
Use --bootstrap-server instead to specify a broker to connect to.
Configs for user-principal 'markadmin' are SCRAM-SHA-256=salt=MWtxOG56cHNybGhoank1Nmg2M3dsa2JwZGw=,stored_key=G6nlglpSF0uQDskBmV3uOrpuGwEcFKfeTOcaIpuqINY=,server_key=pBEXAihvOLqAGzns2fbu2p96LqLVLM78clUAyftpMjg=,iterations=4096
启动kafka
授权完成,启动第一台kafka。对目录进行授权
chown -R kafka.kafka /usr/local/kafka*
先手动启动测试是否正常
sudo -u kafka KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server-scram.properties
观察日志/usr/local/kafka/logs/server.log,正常情况下能够看到如下提示已经启动
[2021-05-21 17:12:03,524] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)
此时kafka需要配置hosts,hosts包含所有的主机名和代理主机名
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com
如果没用问题配置开启启动
systemctl enable kafka
systemctl start kafka
systemctl status kafka
并且以此启动其他两台
验证用户
创建主题
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 10.100.63.7:2181 --create --topic test --partitions 12 --replication-factor 3
发送消息
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
cat producer.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="xyt#*admin.com&!k4";
内网访问
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test --producer.config producer.conf
> hello
远程
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --producer.config producer.conf
消费消息
cat consumer.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin.com&!k4";
内网访问
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.100.7:9092,172.16.100.8:9093,172.16.100.9:9094 --topic test --from-beginning --consumer.config consumer.conf
hello
远程
/usr/local/kafka/bin/kafka-console-consumer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test --from-beginning --consumer.config consumer.conf
构建代理层
nginx stream配置
stream {
log_format proxy '$remote_addr [$time_local]'
'$protocol $status $bytes_sent $bytes_received'
'$session_time "$upstream_addr" '
'"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';
upstream kafka1 {
server kafka1:9092 weight=1;
}
server {
listen 9092;
proxy_pass kafka1;
access_log /data/logs/9092.log proxy ;
}
upstream kafka2 {
server kafka2:9093 weight=1;
}
server {
listen 9093;
proxy_pass kafka2;
access_log /data/logs/9093.log proxy ;
}
upstream kafka3 {
server kafka3:9094 weight=1;
}
server {
listen 9094;
proxy_pass kafka3;
access_log /data/logs/9094.log proxy ;
}
}
添加hosts,并且kafka节点也要如下配置
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
172.16.100.10 kafka.linuxea.com
测试kafka连通性
测试节点也需要配置hosts指向proxy
172.16.100.10 kafka.linuxea.com
安装 python 3.8 ,并且安装confluent_kafka
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple confluent_kafka
And
pip install -i https://mirrors.aliyun.com/pypi/simple confluent_kafka
脚本如下
# !/usr/bin/python
# #encoding=utf-8
from confluent_kafka import Producer
import json
from datetime import datetime
"""
def producer_demo():
# 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json
producer = KafkaProducer(bootstrap_servers=['IP:9092'],
security_protocol='SASL_PLAINTEXT',
#sasl_mechanism="SCRAM-SHA-256",
sasl_mechanism='PLAIN',
#sasl_kerberos_service_name='admin',
#sasl_kerberos_domain_name='hadoop.hadoop.com',
sasl_plain_username='admin',
sasl_plain_password="*admin.com",
#key_serializer=lambda k: json.dumps(k).encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
# ,api_version=(0, 10)
) # 连接kafka
msg_dict = "Hello World".encode('utf-8') # 发送内容,必须是bytes类型
for i in range(0, 3):
#msg = json.dumps(msg_dict)
future = producer.send('test', msg_dict, partition=0)
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
producer.close()
"""
def confluentKafkaDemo():
topic_name = 'test' ##
count = 100
start = 0
conf = {
'bootstrap.servers': 'kafka.linuxea.com:9092,kafka.linuxea.com:9093,kafka.linuxea.com:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256',
'sasl.username': 'linuxea', ## 用户名
'sasl.password': 'MwMzA0MGFmOG' ## 密码
}
producer = Producer(**conf)
data = {
'name': 'test1 is ok',
'time': str(datetime.now())
}
try:
while start < count:
producer.produce(topic_name, (json.dumps(data)).encode(), callback=delivery_report)
producer.flush()
start = start+1
except Exception as e:
print(e)
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
if __name__ == '__main__':
#producer_demo()
confluentKafkaDemo()
执行脚本查看是否插入成功
kafka-eagle
kafka-eagle在被使用了用户验证的集群将能不能够正常使用,总会有一些瑕疵
kafka-eagle上仍然需要做hosts解析
172.16.100.7 kafka1
172.16.100.8 kafka2
172.16.100.9 kafka3
下载2.0.5
tar xf kafka-eagle-bin-2.0.5.tar.gz -C /usr/local/
cd /usr/local/kafka-eagle-bin-2.0.5
tar xf kafka-eagle-web-2.0.5-bin.tar.gz
ln -s /usr/local/kafka-eagle-bin-2.0.5/ /usr/local/kafka-eagle
cp /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties /usr/local/kafka-eagle/kafka-eagle-web-2.0.5/conf/system-config.properties.bak
mkdir /data/kafka-eagle/db/ -p
为了方便,kafka-eagle必须修改hostname为ip地址
hostnamectl set-hostname 192.168.3.6
配置环境变量
cat > /etc/profile.d/kafka-eagle.sh