go连接kafka基本操作

news/2025/3/13 16:12:35/

本博文源于笔者对kafka的学习,先遵循着对kafka的简单学习,然后go操作一下kafka,包括发送消息、消费消息、列出所有topic,与创建topic。内容都已经由笔者亲自测试过。

文章目录

  • 1.kafka的学习
    • 1.1 启动kafka与zookeeper
    • 1.2 创建topic
    • 1.3 生产消息
    • 1.4 消费之前的消息
    • 1.5 指定偏移量消费
    • 1.4 消费最新的信息
  • 2 go操作
    • 2.1 发送消息
    • 2.2 消费消息
    • 2.3 列出所有topic
    • 2.4 创建topic
  • 参考文档

kafka_2">1.kafka的学习

kafkazookeeper_3">1.1 启动kafka与zookeeper

kafka与zookeeper是相关联的

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

1.2 创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server 主机名:9092

1.3 生产消息

bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic hello

运行后可以发送多条,ctrl+c退出

1.4 消费之前的消息

bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --from-beginning --topic hello

1.5 指定偏移量消费

bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --partition 0 --offset 1 --topic hello

1.4 消费最新的信息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

2 go操作

2.1 发送消息

// Kafka 配置
const (KafkaBroker = "u8sMaster:9092" // 替换为你的 Kafka Broker 地址KafkaTopic  = "k8s-version"          // Kafka 主题
)func main() {sendMesgKafka()
}func sendMesgKafka() {w := kafka.NewWriter(kafka.WriterConfig{Brokers:  []string{KafkaBroker},Topic:    KafkaTopic,Balancer: &kafka.LeastBytes{},})err := w.WriteMessages(context.Background(),kafka.Message{Key:   []byte("Key-A"),Value: []byte("one!"),},kafka.Message{Key:   []byte("Key-B"),Value: []byte("two!"),},kafka.Message{Key:   []byte("Key-C"),Value: []byte("three!"),},)if err != nil {log.Fatal("failed to write messages:", err)}if err := w.Close(); err != nil {log.Fatal("failed to close writer:", err)}fmt.Println("Message sent successfully")}

2.2 消费消息

// to consume messages
topic := "test"
partition := 0conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition)
if err != nil {log.Fatal("failed to dial leader:", err)
}conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB maxb := make([]byte, 10e3) // 10KB max per message
for {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))
}if err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)
}if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)
}

2.3 列出所有topic

func main() {conn, err := kafka.Dial("tcp", "u8sMaster:9092")if err != nil {panic(err.Error())}defer conn.Close()partitions, err := conn.ReadPartitions()if err != nil {panic(err.Error())}m := map[string]struct{}{}for _, p := range partitions {m[p.Topic] = struct{}{}}for k := range m {fmt.Println(k)}
}

2.4 创建topic

func main() {conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0)if err != nil {panic(err.Error())}
}

精准地创建topic

func main() {conn, err := kafka.Dial("tcp", "u8sMaster:9092")if err != nil {panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {panic(err.Error())}var connLeader *kafka.ConnconnLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer connLeader.Close()
}

这里省略了kafka集群的配置,未来有机会补充

参考文档

参考文档一


http://www.ppmy.cn/news/1578825.html

相关文章

【技术方案设计】H5埋点方案设计以及实现(入门版)

文章目录 H5事件埋点方案设计文档1. 概述2. 需求分析3. 数据结构设计4. 技术选型5. 埋点实施5.1 页面浏览事件5.2 点击事件5.3 表单提交事件5.4 数据上报函数 6. 测试与验证7. 维护与优化 H5事件埋点方案设计文档 1. 概述 本方案旨在为H5页面设计一套完整的用户行为跟踪系统&…

postman通过json获取接口返回token,设置为全局变量

1、获取登录接口返回的json的token值 在scripts的post-reponse中写入javascript脚本 var jsonData pm.response.json();//解析响应体var token jsonData.data.loginEntityAdminByEmail.token;// 假设token在响应的JSON体中的"token"字段pm.globals.set("glo…

计算机网络开发(3)——端口复用、I\O多路复用

端口复用 由于有一个MSL,所以上一秒关闭的服务器,可能之前的端口还未释放;又或者是程序突然退出系统没有释放端口,导致端口被占用。 当有新的服务想要用这个端口的时候,会出现错误:服务会出现Bind error:A…

Vscode工具开发Vue+ts项目时vue文件ts语法报错-红波浪线等

Vscode工具开发Vuets项目时vue文件ts语法报错-红波浪线等 解决方案 问题如题描述,主要原因是开发工具使用的代码检查与项目的中的ts不一致导导致,解决办法,修改 vscode 中, 快捷键:command shift p, 输入&#xff…

CLR中的marshal_as 介绍

CLR中的marshal_as 介绍 CLR和CLI的关系CLI(Common Language Infrastructure)CLR(Common Language Runtime)marshal_as介绍marshal_as代码使用示例工程文件说明CLR和CLI的关系 CLR 和 CLI 是与 .NET 技术密切相关的两个概念,它们在编程语言和运行时环境中扮演着重要角色。…

三星首款三折叠手机被曝外屏6.49英寸:折叠屏领域的新突破

在智能手机的发展历程中,折叠屏手机的出现无疑是一次具有里程碑意义的创新。它打破了传统手机屏幕尺寸的限制,为用户带来了更加多元和便捷的使用体验。而三星,作为手机行业的巨头,一直以来都在折叠屏技术领域积极探索和创新。近日,三星首款三折叠手机的诸多细节被曝光,其…

QT通过DeepSeek API获取公式正常显示的方法

一.问题描述 QT开发程序连接网络版本DeepSeek API获取的内容可能有大量LaTeX公式,这样直接在QT控件(比如Text Browser)中显示会无法正常显示,本文会给出两种解决方法。 二.QT通过API连接DeepSeek 1.官网注册并申请key DeepSeek官网:https://www.deepseek.com/

计算机图形学交互式技术实验(鼠标、拾取操作和菜单)——绘制可用鼠标进行修改颜色的五角星和矩形

1、实验目的: 熟练OpenGL中的鼠标响应函数的使用方法、拾取操作实现和创建菜单的实现。 2、实验要求: (1)绘制两个及以上图元(在此我绘制了一个五角星和两个矩形) (2)创建窗体菜单用于改图元的颜色(菜单内写入若干…