Kafka-go语言一命速通

embedded/2025/1/13 15:01:41/

记录

命令(终端操作kafka)

# 验证kafka是否启动
ps -ef | grep kafka # ps -ef 命令用于显示所有正在运行的进程的详细信息
lsof -i :9092# 启动kafka
brew services start zookeeper
brew services start kafka# 创建topic
kafka-topics --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
解释:kafka-topics:用于管理主题。–create:创建一个新的主题。–topic test:主题的名称为 test–partitions 1:有 1 个分区(partition)。–replication-factor 1:主题的副本因子为 1。表示没有冗余,数据仅存储在一个节点上。–bootstrap-server localhost:9092:localhost:9092 表示 Kafka 服务器运行在本地主机的 9092 端口。# 查看主题
kafka-topics --list --bootstrap-server localhost:9092#订阅(消费者) 新建一个终端,输入
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning#发布(生产者) 新建一个终端,输入
kafka-console-producer --bootstrap-server localhost:9092 --topic test# 删除Topic
kafka-topics --delete --topic test --bootstrap-server localhost:9092

代码操作Kafka

简单版本

生产者:

packagemainimport("github.com/IBM/sarama""log"
)funcmain() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start producer: %s", err)}deferproducer.Close()msg := &sarama.ProducerMessage{Topic: "test_topic",Value: sarama.StringEncoder("Hello, Kafka!"),}partition, offset, err := producer.SendMessage(msg)iferr != nil {log.Fatalf("Failed to send message: %s", err)}log.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", "test_topic", partition, offset)
}

消费者

package mainimport ("fmt""github.com/IBM/sarama""log"
)func main() {config := sarama.NewConfig()config.Consumer.Return.Errors = true// 创建消费者consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)if err != nil {log.Fatalf("Failed to start consumer: %s", err)}defer consumer.Close()// 订阅 Kafka 主题partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)if err != nil {log.Fatalf("Failed to start partition consumer: %s", err)}defer partitionConsumer.Close()// 消费消息for msg := range partitionConsumer.Messages() {log.Printf("Consumed message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)}
}

多点配置版本

生产者

packagemainimport("fmt""github.com/IBM/sarama""log""time"
)funcmain() {// 配置生产者config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认config.Producer.Retry.Max = 5                    // 最大重试次数config.Producer.Return.Successes = true          // 返回成功的消息config.Producer.Return.Errors = true             // 返回失败的消息config.Producer.Timeout = 10 * time.Second       // 设置生产者的超时时间config.Net.MaxOpenRequests = 5                   // 控制最大请求数config.Version = sarama.V2_8_0_0                 // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建生产者实例producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start producer: %s", err)}deferproducer.Close()// 循环发送消息fori := 1; ; i++ {// 构造消息msg := &sarama.ProducerMessage{Topic: "test_topic",                                                                       // 目标主题Value: sarama.StringEncoder(fmt.Sprintf("Message #%d: Hello, Kafka! www.zpf0000.com", i)), // 动态生成消息内容}// 发送消息partition, offset, err := producer.SendMessage(msg)iferr != nil {// 错误处理:打印错误并继续发送下一条消息log.Printf("Failed to send message: %s", err)continue}// 成功发送消息后记录日志log.Printf("Message #%d is stored in topic(%s)/partition(%d)/offset(%d)", i, "test_topic", partition, offset)// 模拟消息生产间隔(例如每秒发送一条消息)time.Sleep(1 * time.Second)}
}

消费者

packagemainimport("github.com/IBM/sarama""log""os""os/signal""syscall""time"
)funcmain() {// 配置消费者config := sarama.NewConfig()config.Consumer.Return.Errors = true                  // 启用错误返回config.Consumer.Offsets.Initial = sarama.OffsetNewest // 从最新消息开始消费config.Version = sarama.V2_8_0_0                      // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建 Kafka 消费者实例consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start consumer: %s", err)}deferconsumer.Close()// 订阅主题的分区partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)iferr != nil {log.Fatalf("Failed to start partition consumer: %s", err)}deferpartitionConsumer.Close()// 用于捕获系统信号(例如 Ctrl+C),在接收到信号时优雅地退出sigChan := make(chanos.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 用来跟踪消费者的状态,确保及时处理错误gofunc() {forerr := rangepartitionConsumer.Errors() {log.Printf("Error: %s", err.Error())}}()// 监听消息并处理log.Println("Consumer is ready, waiting for messages...")for{select{casemsg := <-partitionConsumer.Messages():// 打印收到的消息log.Printf("Received message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)// 处理消息(可以根据需求扩展处理逻辑)// 模拟消息处理时间time.Sleep(500 * time.Millisecond) // 例如处理消息需要 500 毫秒// 在这里,可以对消息进行确认或其他操作,例如处理完消息后将其存入数据库等case<-sigChan:// 捕获到退出信号,优雅退出log.Println("Received shutdown signal, exiting...")return}}
}

链接

https://cloud.tencent.com/developer/article/1547380 # 优质博客一篇

https://kafka1x.apachecn.org/documentation.html#producerapi # 官方中文文档


http://www.ppmy.cn/embedded/153582.html

相关文章

2024信息安全网络安全等安全意识(附培训PPT下载)

信息安全和网络安全是现代社会中至关重要的领域&#xff0c;它们涉及保护数据、系统和网络免受未经授权的访问、破坏和滥用。以下是一些关键的安全意识和概念&#xff1a; 信息安全意识 数据保护&#xff1a;意识到个人和组织数据的敏感性和价值&#xff0c;采取措施保护数据…

【Rust自学】12.1. 接收命令行参数

12.1.0. 写在正文之前 第12章要做一个实例的项目——一个命令行程序。这个程序是一个grep(Global Regular Expression Print)&#xff0c;是一个全局正则搜索和输出的工具。它的功能是在指定的文件中搜索出指定的文字。 这个项目分为这么几步&#xff1a; 接收命令行参数&am…

MATLAB安装Robotics Toolbox(机器人工具箱)插件

一、下载工具箱安装包http://petercorke.com/wordpress/toolboxes/robotics-toolbox 二、将文件夹放到MATLAB安装文件夹指定目录下 三、打开MATLAB&#xff0c;主页------设置路径-----选添加并包含子文件夹-------选择这个rvctools文件夹save&#xff08;保存&#xff09;-clo…

软件项目管理软件实现步骤

一、明确需求 在开发项目管理软件前&#xff0c;首先要明确用户的需求&#xff0c;软件管理软件的用户包括项目经理、团队成员、顾客等&#xff0c;他们的需求可能不相同。通过调研或者访谈的方式&#xff0c;了解用户的实际需求。 问卷调查&#xff1a;通过问卷的方式&#…

CSS Grid 布局全攻略:从基础到进阶

文章目录 一.Grid 是什么二.示例代码1. 基础使用 - 固定宽高2.百分百宽高3.重复设置-repeat4.单位-fr5.自适应6.间距定义其他 一.Grid 是什么 CSS 中 Grid 是一种强大的布局方式&#xff0c;它可以同时处理行和列 Grid 和Flex有一些类似&#xff0c;都是由父元素包裹子元素使用…

深度学习与机器学习的关系和差别?

深度学习与机器学习既有紧密的联系&#xff0c;又存在明显的差别&#xff1a; 关系 深度学习是机器学习的分支&#xff1a;机器学习是一门多领域交叉学科&#xff0c;旨在让计算机通过数据学习模式&#xff0c;并利用这些模式进行预测或决策。深度学习则是机器学习中的一个特…

电商项目-基于ElasticSearch实现商品搜索功能(三)

本系列文章主要介绍基于 Spring Data Elasticsearch 实现商品搜索的后端代码&#xff0c;介绍代码逻辑和代码实现。 主要实现功能&#xff1a;根据搜索关键字查询、条件筛选、规格过滤、价格区间搜索、搜索查询分页、搜索查询排序、高亮查询。 主要应用技术:canal&#xff0c;…

线程安全问题介绍

文章目录 **什么是线程安全&#xff1f;****为什么会出现线程安全问题&#xff1f;****线程安全问题的常见场景****如何解决线程安全问题&#xff1f;**1. **使用锁**2. **使用线程安全的数据结构**3. **原子操作**4. **使用volatile关键字**5. **线程本地存储**6. **避免死锁*…