【kafka】简单运用go语言操作kafka实现生产者和消费者功能的包,confluent-kafka-go和sarama

news/2024/12/17 13:35:25/

kafkagosarama_0">confluent-kafka-go和sarama对比

特性confluent-kafka-gosarama
底层实现基于 librdkafka C 库完全用 Go 实现
性能高吞吐量、低延迟吞吐量较低,适合常规应用
安装依赖需要 C 编译器和 librdkafka无需外部依赖,纯 Go 实现
功能支持 Kafka 所有功能,包括事务支持 Kafka 核心功能,事务支持较弱
使用难度配置复杂,需理解底层 C 库使用简便,快速上手
社区支持由 Confluent 官方支持社区驱动,文档丰富
错误处理和日志错误处理较为复杂,日志记录较为详细错误处理简单,日志记录清晰
适用场景高性能要求、高吞吐量的生产环境一般的生产和消费场景,快速开发

基础使用案例

  1. 使用 confluent-kafka-go 发送消息
    使用 confluent-kafka-go 库向 Kafka 发送消息。
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092", // Kafka服务器地址}producer, err := kafka.NewProducer(config)if err != nil {log.Fatal(err)}defer producer.Close()message := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &"test_topic", Partition: kafka.PartitionAny},Value:          []byte("Hello Kafka from Go!"),}err = producer.Produce(message, nil)if err != nil {log.Fatal("Failed to produce message:", err)} else {fmt.Println("Message sent successfully!")}producer.Flush(15 * 1000)
}
  1. 使用 confluent-kafka-go 消费消息
    使用 confluent-kafka-go 库从 Kafka 中消费消息。
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092", // Kafka服务器地址"group.id":          "test-group",     // 消费者组ID"auto.offset.reset": "earliest",        // 自动从最早的消息开始消费}consumer, err := kafka.NewConsumer(config)if err != nil {log.Fatal(err)}defer consumer.Close()err = consumer.Subscribe("test_topic", nil)if err != nil {log.Fatal("Failed to subscribe:", err)}for {msg, err := consumer.ReadMessage(-1)if err == nil {fmt.Printf("Consumed message: %s\n", string(msg.Value))} else {fmt.Printf("Error while consuming: %v\n", err)}}
}
  1. 使用 sarama 发送消息
    使用 sarama 库向 Kafka 发送消息。
package mainimport ("fmt""log""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)if err != nil {log.Fatal("Failed to create producer:", err)}defer producer.Close()message := &sarama.ProducerMessage{Topic: "test_topic",Value: sarama.StringEncoder("Hello Kafka from Go (Sarama)!"),}partition, offset, err := producer.SendMessage(message)if err != nil {log.Fatal("Failed to send message:", err)}fmt.Printf("Message sent to partition %d with offset %d\n", partition, offset)
}
  1. 使用 sarama 消费消息
    使用 sarama 库从 Kafka 中消费消息。
package mainimport ("fmt""log""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Consumer.Return.Errors = trueconsumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)if err != nil {log.Fatal("Failed to create consumer:", err)}defer consumer.Close()partitions, err := consumer.Partitions("test_topic")if err != nil {log.Fatal("Failed to get partitions:", err)}for _, partition := range partitions {pc, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetNewest)if err != nil {log.Fatal("Failed to start consumer for partition:", err)}defer pc.Close()for msg := range pc.Messages() {fmt.Printf("Consumed message: %s\n", string(msg.Value))}}
}

性能案例对比

  1. 性能
    confluent-kafka-go:
    由于底层使用了 librdkafka,confluent-kafka-go 通常在吞吐量、延迟和连接管理方面表现得更加优越。
    适合用于高吞吐量、低延迟的生产环境。
// 高吞吐量性能优化示例:
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092",}producer, _ := kafka.NewProducer(config)defer producer.Close()for i := 0; i < 10000; i++ {producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &"test_topic", Partition: kafka.PartitionAny},Value:          []byte(fmt.Sprintf("Message %d", i)),}, nil)}producer.Flush(15 * 1000)
}

sarama:
虽然 sarama 的性能不及 confluent-kafka-go,但它对于大多数常规用途仍然足够快,特别是在吞吐量要求不是特别高的场景中。

// sarama 吞吐量示例:
package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.Return.Successes = trueproducer, _ := sarama.NewSyncProducer([]string{"localhost:9092"}, config)defer producer.Close()for i := 0; i < 10000; i++ {producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic",Value: sarama.StringEncoder(fmt.Sprintf("Message %d", i)),})}
}
  1. 功能
    confluent-kafka-go:
    提供了丰富的功能,支持 Kafka 的所有核心功能,如生产者、消费者、消费者组管理、消息传递、事务支持、数据压缩等。
    支持 Kafka 的最新特性,如消息事务、压缩、性能调优等。
    由于是 librdkafka 的封装,confluent-kafka-go 与 Kafka 的版本兼容性更好,能够快速支持 Kafka 的新功能。
// 使用事务的生产者示例:
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","acks":               "all",}producer, _ := kafka.NewProducer(config)defer producer.Close()// 开启事务producer.BeginTransaction()producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &"test_topic", Partition: kafka.PartitionAny},Value:          []byte("Transactional Message"),}, nil)// 提交事务producer.CommitTransaction()
}

sarama:
提供了 Kafka 的核心功能,但可能在一些高级特性上不如 confluent-kafka-go 丰富。例如,sarama 对事务支持相对较弱,尽管在常规的生产/消费场景中功能足够。
支持 Kafka 的基本功能,如生产者、消费者组、消息传递等,但对一些高级功能(如流控、集群管理等)的支持可能稍有不足。

// sarama 事务支持相对较弱,但基本生产和消费功能已足够:
package mainimport ("fmt""log""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.Return.Successes = trueproducer, _ := sarama.NewSyncProducer([]string{"localhost:9092"}, config)defer producer.Close()// 模拟事务:sarama 本身不直接支持事务,通常通过事务标记和重试来实现message := &sarama.ProducerMessage{Topic: "test_topic",Value: sarama.StringEncoder("Transactional Message"),}partition, offset, _ := producer.SendMessage(message)fmt.Printf("Message sent to partition %d with offset %d\n", partition, offset)
}
  1. 总结:
    如果你需要高性能和 Kafka 高级特性,选择 confluent-kafka-go。
    如果你追求易用性和快速开发,或者不希望依赖 C 库,选择 sarama。
  • 选择 confluent-kafka-go:适用于高性能、高吞吐量的场景。

  • 选择 sarama:适用于不需要复杂配置和高级特性的场景。


http://www.ppmy.cn/news/1555866.html

相关文章

旅游资源系统|Java|SSM|VUE| 前后端分离

【技术栈】 1⃣️&#xff1a;架构: B/S、MVC 2⃣️&#xff1a;系统环境&#xff1a;Windowsh/Mac 3⃣️&#xff1a;开发环境&#xff1a;IDEA、JDK1.8、Maven、Mysql5.7 4⃣️&#xff1a;技术栈&#xff1a;Java、Mysql、SSM、Mybatis-Plus、VUE、jquery,html 5⃣️数据库可…

【C++】11___模板(1)

目录 一、模板的概念 二、函数模板 三、普通函数与函数模板 3.1区别 3.2调用规则 一、模板的概念 模板不能直接使用&#xff0c;它只是一个框架模板的通用并不是万能的分为两类&#xff1a;函数模板、类模板 二、函数模板 函数模板语法&#xff1a; template<typename T…

R环境配置 以及Debug方法 (VSCode, conda, 远程R)

生物信息学中的R环境配置 以及Debug方法 开始设置1、建议使用VSCode conda 远程R2、 VSCode配置安装插件安装好插件后&#xff0c;远程设置链接成功后&#xff0c;设置项目 3、 linux conda 和 远程R配置4、VScode 远程访问R环境下面配置远程R 5、开始Debug新建个R文件&#…

day13 python(1)——python基础

【没有所谓的运气&#x1f36c;&#xff0c;只有绝对的努力✊】 1、python简介 1.1 为什么学习python 1.2 python发展历史 python2.x和python3.x 版本里面有些是不兼容的。&#xff08;我自己本地版本 3.11&#xff09; 2、语言的分类 &#xff08;1&#xff09;编译型 …

MySQL的历史和地位

秋招之后&#xff0c;开始深入学习后端开发知识啦。把学到的东西分享给大家最开心啦。就从MySQL开始吧。 首先说一下MySQL的历史和地位。主要是看一下我们为什么要学习&#xff0c;而不是说让我们学什么我们就学什么。 地位 这张图是我从DB-Engines截取的2024年12月最新的数据…

百度智能云千帆AppBuilder升级,百度AI搜索组件上线,RAG支持无限容量向量存储!

百度智能云千帆 AppBuilder 发版升级&#xff01; 进一步降低开发门槛&#xff0c;落地大模型到应用的最后一公里。在千帆 AppBuilder 最新升级的 V1.1版本中&#xff0c;企业级 RAG 和 Agent 能力再度提升&#xff0c;同时组件生态与应用集成分发更加优化。 • 企业级 RAG&am…

前端项目初始化搭建(二)

一、使用 Vite 创建 Vue 3 TypeScript 项目 PS E:\web\cursor-project\web> npm create vitelatest yf-blog -- --template vue-ts> npx > create-vite yf-blog --template vue-tsScaffolding project in E:\web\cursor-project\web\yf-blog...Done. Now run:cd yf-…

LWIP数据包管理

一、LWIP数据包简介 具体流程为&#xff1a; 用户要发送的数据&#xff1b;申请pbuf内存&#xff1a;一般使用的是内存堆&#xff08;内存池也可以&#xff09;。内存堆包含了pbuf结构体、以及后面要拷贝的数据和三种层的首部&#xff1b;将数据拷贝到pbuf数据缓冲区&#xf…