kfaka消息队列安装和使用 | 青训营

2023年 8月 28日 73.8k 0

kafka简介

Kafka是一个分布式流处理平台,最初由LinkedIn公司开发。它允许我们将消息从一个应用程序传递到另一个应用程序,同时提供可扩展性和容错性。Kafka的设计目标是处理大量数据流,并提供快速、可靠的数据传输。

Kafka的核心概念

Kafka的消息队列由以下几个核心概念组成:

Producer

生产者是一个应用程序,负责将消息发布到Kafka集群中的主题(Topic)中。生产者将消息发送到Kafka集群中的代理(Broker),代理负责将消息存储在磁盘上并将其传递给消费者。

Consumer

消费者是一个应用程序,负责从Kafka集群中的主题中读取消息。消费者从代理(Broker)中获取消息,并将其传递给应用程序进行处理。

Broker

代理是Kafka集群中的服务器,负责存储和传递消息。代理接收来自生产者的消息,并将其存储在磁盘上。消费者从代理中获取消息,并将其传递给应用程序进行处理。

Topic

主题是Kafka消息队列中的逻辑分类。生产者将消息发布到特定的主题中,消费者则从特定的主题中读取消息。主题可以被分区,每个分区都有自己的偏移量(Offset)。

Partition

分区是主题的一个子集,每个分区都有自己的偏移量(Offset)。分区允许Kafka在集群中进行数据并行处理,提高了系统的吞吐量和可扩展性

kafka的安装过程

我使用的是linux环境,所以直接执行yay -S kafka即可

swappy-20230825_051519.png

安装go的kafka库sarama

go get -u github.com/IBM/sarama

sarama的基本使用

package main

import (
	"fmt"
	"github.com/IBM/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "test"
	msg.Value = sarama.StringEncoder("this is a test log")
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%vn", pid, offset)
}

运行结果如图
image.png
offset 为4是因为截图之前运行了3次

总结

Kafka是一个分布式流处理平台,允许您将消息从一个应用程序传递到另一个应用程序,并提供可扩展性和容错性。Kafka具有高吞吐量、可扩展性、容错性和灵活性等优势,适用于处理大量数据流的场景。

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论