这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
kafka
在KIP-500
引入了KRaft
替代Zookeeper
来实现自我管理元数据
详细信息可以看原文链接
- KIP-500
KRaft简介
KRaft
是kafka用来取代zookeeper
的分布式协调管理组件。
架构改变
原先依赖于Zookeeper
选举出一个controller
现在由KRaft
集群中自己选举,产生一个controller
优点
- Kafka不用再依赖外部框架,能够做到独立运行
- Kafka集群扩展时不用再受到Zookeeper读写能力的限制
更多优点和缺点这里暂时不太多讨论主要以部署为主
部署3节点kafaka集群
KRaft
部署方式支持controller
和broker
在同一进程。也支持分开部署
线上推荐分开部署。这里由于是测试集群,打算controller
和broker
在同一进程部署
记得所有机器
9092
、9093
端口打开
下载Kafka
wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
这里第一次下载报错说证书已过期,添加证书忽略下载
wget --no-check-certificate https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
发现国内服务器下载国外软件还是非常慢。最终决定找国内镜像。
- 阿里云Kafka镜像:mirrors.aliyun.com/apache/kafk…
使用国内镜像下载
wget http://mirrors.aliyun.com/apache/kafka/3.5.0/kafka_2.13-3.5.0.tgz
三台机器都执行
解压
tar -xzf kafka_2.13-3.5.0.tgz
三台机器都执行
给集群生成一个UUID
我们进入到解压的bin
目录,我这里是/data/kafka_2.13-3.5.0/bin
然后执行如下命令
kafka_2.13-3.0.0/bin/kafka-storage.sh random-uuid
单台机器生成即可
执行完会生产一个字符串,类似这样xgK3spReSO7ijVK4rEbbbQ
格式化存储路径
sh kafka-storage.sh format -t xgK3spReSO7ijVK4rEbbbQ -c ../config/kraft/server.properties
三台机器都执行
修改配置
我们这里要修改三台机器的server.properties
配置
我这里的路径是在/data/kafka_2.13-3.5.0/config/kraft/server.properties
- node1
node.id = 1
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.1:9092,CONTROLLER://92.168.1.1:9093
log.dirs=/data/kakfa01/logs
- node2
node.id = 2
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.2:9092,CONTROLLER://92.168.1.2:9093
log.dirs=/data/kakfa02/logs
- node3
node.id = 3
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.3:9092,CONTROLLER://92.168.1.3:9093
log.dirs=/data/kakfa03/logs
启动集群
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"&&nohup sh /data/kafka_2.13-3.5.0/bin/kafka-server-start.sh /data/kafka_2.13-3.5.0/config/kraft/server.properties &
三台机器都执行
启动完我们就有了一个三节点的kafka
集群
测试
创建topic
sh kafka-topics.sh --create --topic xiaozou --partitions 1 --replication-factor 1 --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
查看topic
sh kafka-topics.sh --list --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
代码测试
- 生产消息
public class KafkaProducer {
private static final String TOPIC = "xiaozou";
private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";
public static void main(String[] args) {
// 生产消息
produceMessage();
}
private static void produceMessage() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
try {
for (int i = 0; i < 10; i++) {
String message = "小奏message " + i;
System.out.println("开始发送消息");
Future send = producer.send(new ProducerRecord(TOPIC, message));
RecordMetadata recordMetadata = send.get();
System.out.println("Produced message: " + message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
- 消费消息
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "xiaozou";
private static final String GROUP_ID = "xiaozou_gid";
private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 创建消费者实例
KafkaConsumer consumer = new KafkaConsumer(props);
// 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// 消费消息
try {
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.println("接收到消息:key = " + record.key() + ", value = " + record.value() +
", partition = " + record.partition() + ", offset = " + record.offset());
}
consumer.commitSync(); // 手动提交偏移量
}
} finally {
consumer.close();
}
}
}