kafka简介
Kafka是一个分布式流处理平台,最初由LinkedIn公司开发。它允许我们将消息从一个应用程序传递到另一个应用程序,同时提供可扩展性和容错性。Kafka的设计目标是处理大量数据流,并提供快速、可靠的数据传输。
Kafka的核心概念
Kafka的消息队列由以下几个核心概念组成:
Producer
生产者是一个应用程序,负责将消息发布到Kafka集群中的主题(Topic)中。生产者将消息发送到Kafka集群中的代理(Broker),代理负责将消息存储在磁盘上并将其传递给消费者。
Consumer
消费者是一个应用程序,负责从Kafka集群中的主题中读取消息。消费者从代理(Broker)中获取消息,并将其传递给应用程序进行处理。
Broker
代理是Kafka集群中的服务器,负责存储和传递消息。代理接收来自生产者的消息,并将其存储在磁盘上。消费者从代理中获取消息,并将其传递给应用程序进行处理。
Topic
主题是Kafka消息队列中的逻辑分类。生产者将消息发布到特定的主题中,消费者则从特定的主题中读取消息。主题可以被分区,每个分区都有自己的偏移量(Offset)。
Partition
分区是主题的一个子集,每个分区都有自己的偏移量(Offset)。分区允许Kafka在集群中进行数据并行处理,提高了系统的吞吐量和可扩展性
kafka的安装过程
我使用的是linux环境,所以直接执行yay -S kafka
即可
安装go的kafka库sarama
go get -u github.com/IBM/sarama
sarama的基本使用
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "test"
msg.Value = sarama.StringEncoder("this is a test log")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%vn", pid, offset)
}
运行结果如图
offset 为4是因为截图之前运行了3次
总结
Kafka是一个分布式流处理平台,允许您将消息从一个应用程序传递到另一个应用程序,并提供可扩展性和容错性。Kafka具有高吞吐量、可扩展性、容错性和灵活性等优势,适用于处理大量数据流的场景。