golang封装调用kafka的工具包

devtools/2024/9/25 4:34:29/

封装一个golang调用kafka的工具包,包含了consumer,producer,auth,在自己的生产环境上做过验证。可以做参考作用,也可以直接使用。

部分代码

// Run 执行消费动作
func (cg *ConsumerGroup) Run(ctx context.Context) {defer cg.close()for {select {case err := <-cg.consumer.Errors():cg.logger.WithError(err).Errorln("Error channel")cg.handleConsumeError(err)case <-ctx.Done():err := ctx.Err()if err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {// 正常退出return}cg.logger.WithError(err).Errorln("上下文异常退出")default:if err := cg.consumer.Consume(ctx, cg.options.topics, cg.options); err != nil {cg.logger.WithError(err).Errorln("Consume Error channel")cg.handleConsumeError(err)}}}
}
func (kc *KafkaClient) CreateTopic(ctx context.Context, topicName string, ops ...TopicOption) (err error) {topicConf := &sarama.TopicDetail{NumPartitions:     3,ReplicationFactor: 1,ConfigEntries: map[string]*string{"cleanup.policy": &TopicTTLPolicy,"retention.ms":   &TopicTTLRetention,},}for _, op := range ops {op(topicConf)}return kc.cli.CreateTopic(topicName, topicConf, false)
}

代码太多,全部写出来不现实,详细代码在这个下载包里,有test文件可以查看是如何调用的

另外需要自己处理消费时的速度,比如用channel控制同时消费的数量
golang调用kafka下载


http://www.ppmy.cn/devtools/21985.html

相关文章

JMeter的下载安装与使用(Mac)

1、下载地址​​​​​​https://jmeter.apache.org/download_jmeter.cgi 2、下载Binaries 下的apache-jmeter5.5.tgz 3、解压 4、启动 在bin目录下打开终端&#xff0c;输入sh jmeter 出现jmeter首页界面&#xff0c;即为成功。 5、使用 5.1 语言选择 option选项卡&am…

go语言实现简单认证样例

目录 1、代码实现样例 2、postman调用 1、代码实现样例 package mainimport ("net/http""strings""github.com/dgrijalva/jwt-go""github.com/gin-gonic/gin" )var (// 密钥&#xff0c;用于验证 JWT 令牌signingKey []byte("…

高频面试题:解决Spring框架中的循环依赖问题

引言&#xff1a;什么是Spring框架与循环依赖&#xff1f; 在Spring框架中&#xff0c;循环依赖是指两个或多个bean相互依赖对方以完成自己的初始化。这种依赖关系形成了一个闭环&#xff0c;导致无法顺利完成依赖注入。比如&#xff0c;如果Bean A在其构造函数中需要Bean B&a…

mac 桌面不能右键 文件也不见了 但在finder的桌面上有

mac 桌面不能右键 文件也不见了 但在finder的桌面上有 出现该现象&#xff0c;可能是因为安装了带有隐藏桌面文件功能的软件&#xff0c;无意中操作引起的。可以利用终端轻松解决&#xff1a; 1、在Launchpad中找到终端并打开&#xff1a; 2、粘贴如下代码&#xff0c;回车即…

日期类的实现,const成员

目录 一&#xff1a;日期类实现 二&#xff1a;const成员 三&#xff1a;取地址及const取地址操作符重载 一&#xff1a;日期类实现 //头文件#include <iostream> using namespace std;class Date {friend ostream& operator<<(ostream& out, const Dat…

tsconfig.json 常用属性配置和注释

下面是一个详细的 tsconfig.json 文件示例&#xff0c;其中包含了许多常用的配置选项。这个配置适用于一个使用 TypeScript 进行前端和后端开发的通用项目。 {"compilerOptions": {"target": "es6", // 指定 ECMAScri…

Github 2024-04-21 开源项目日报 Top10

根据Github Trendings的统计,今日(2024-04-21统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目4TypeScript项目3HTML项目1CSS项目1C++项目1Rust项目1Jupyter Notebook项目1Vue项目1Code Llama: 大型代码语言模型 创建周期:241 天…

GitHub 异常——无法连接22端口:Connection timed out

GitHub 异常——无法连接22端口&#xff1a;Connection timed out 问题描述原因分析&#xff1a;解决方案&#xff1a;参考 问题描述 正常配置并使用使用SSH方式&#xff0c;使用以下命令git clone、git pull、git push&#xff0c;报错如下&#xff1a; ssh: connect to host …