golang rocketmq开发

ops/2024/11/1 15:46:14/

安装

下载

https://mxshop-files.oss-cn-hangzhou.aliyuncs.com/install.zip

新建文件夹

mkdir rocketmq

解压

unzip install.zip -d rocketmq/

修改配置文件

cd rocketmq/install/conf/
vim broker.conf

修改brokerIP1为当前IP,如果是本地电脑填对应IP地址,如果是虚拟机填虚拟机的IP地址,如果是服务器填服务器的公网IP并开放rocketmq使用的全部端口(启动后通过docker ps -a查看,否则容易出错)
在这里插入图片描述

启动

cd ..
docker-compose up

第一次拉取镜像如果出网络问题参考如下链接(ubuntu系统,其他操作系统自行解决)
ubuntu Error response from daemon: Get “https://registry-1.docker.io/v2/”: net/http
访问rocketmq网页http://39.103.59.35:8080/#/message

go包官网

go get   github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3

rocketmq_41">rocketmq的基本概念

Producer:消息的发送者;举例:发信者
Consumer:消息接收者;举例:收信者
Broker:暂存和传输消息;举例:邮局
NameServer:管理Broker;举例:各个邮局的管理机构
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
Message Queue:相当于是Topic的分区;用于并行发送和接收消息
在这里插入图片描述

rocketmq_49">rocketmq的消息类型

1按照发送的特点分:

1.1 同步发送

a. 同步发送,线程阻塞,投递completes阻塞结束
b. 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
c. 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
d. SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值的

1.2 异步发送

a. 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
b. 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry,源码参考 DefaultMQProducerImpl.class
c. 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

1.3 单向发送

a. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
b. 此方式发送消息的过程耗时非常短,一般在微秒级别
在这里插入图片描述

2按照使用功能特点分:

2.1 普通消息(订阅)

普通消息是我们在业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景。
producer

package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"39.103.59.35:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}res, err := p.SendSync(context.Background(), primitive.NewMessage("imooc1", []byte("this is imooc1")))if err != nil {fmt.Printf("发送失败:%s\n", err)} else {fmt.Printf("发送成功:%s\n", res.String())}if err = p.Shutdown(); err != nil {fmt.Printf("关闭producer失败")}
}

consumer

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {c, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"39.103.59.35:9876"}),consumer.WithGroupName("maxshop"),)if err != nil {panic("连接失败")}if err = c.Subscribe("imooc1", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {fmt.Printf("获取到值:%v\n", msgs[i])}return consumer.ConsumeSuccess, nil}); err != nil {fmt.Println("读取消息失败")}_ = c.Start()//不能让主goroutine退出time.Sleep(time.Hour)_ = c.Shutdown()
}
2.2 顺序消息

顺序消息分为分区顺序消息和全局顺序消息,全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的FIFO,很多时候全局消息的实现代价很大,所以就出现了分区顺序消息。分区顺序消息的概念可以如下图所示:
在这里插入图片描述
我们通过对消息的key,进行hash,相同hash的消息会被分配到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要一个即可,所以全局顺序消息的代价是比较大的。

3. 延时消息 - 订单超时库存归还

延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s
应用于超时归还(支付-淘宝、12306、购票):1.时间一到就执行。2.消息中包含了订单编号,只需要查询订单编号。如果采用轮询(超时时间30分钟),时间设为1分钟,则导致29次是无用的,时间设置为30分钟,会导致用户等待支付结果时间过长。
producer

package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"39.103.59.35:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}msg := primitive.NewMessage("imooc1", []byte("this is delay message"))msg.WithDelayTimeLevel(2)res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("发送失败:%s\n", err)} else {fmt.Printf("发送成功:%s\n", res.String())}if err = p.Shutdown(); err != nil {fmt.Printf("关闭producer失败")}
}

consumer

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {c, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"39.103.59.35:9876"}),consumer.WithGroupName("maxshop"),)if err != nil {panic("连接失败")}if err = c.Subscribe("imooc1", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {fmt.Printf("获取到值:%v\n", msgs[i])}return consumer.ConsumeSuccess, nil}); err != nil {fmt.Println("读取消息失败")}_ = c.Start()//不能让主goroutine退出time.Sleep(time.Hour)_ = c.Shutdown()
}
4. 事务消息

消息队列RocketMQ版提供的分布式事务消息适用于所有对数据最终一致性有强需求的场景。本文介绍消息队列RocketMQ版事务消息的概念、优势、典型场景、交互流程以及使用过程中的注意事项。
概念介绍
○ 事务消息:消息队列RocketMQ版提供类似X或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。
○ 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
○ 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
分布式事务消息的优势
消息队列RocketMQ版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)type OrderListener struct{}// When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
func (o *OrderListener) ExecuteLocalTransaction(addr *primitive.Message) primitive.LocalTransactionState {// example1 本地成功不会调用回查// fmt.Println("开始执行本地逻辑")// time.Sleep(time.Second * 3)// fmt.Println("执行本地逻辑成功")// return primitive.CommitMessageState// example2 本地失败不会调用回查// fmt.Println("开始执行本地逻辑")// time.Sleep(time.Second * 3)// fmt.Println("执行本地逻辑失败")// return primitive.RollbackMessageState// example1 本地失败不会调用回查fmt.Println("开始执行本地逻辑")time.Sleep(time.Second * 3)fmt.Println("执行本地逻辑失败")//本地执行逻辑无缘无故失败 代码异常 宕机return primitive.UnknowState
}// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
// method will be invoked to get local transaction status.
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {fmt.Println("rocketmq的消息回查")time.Sleep(time.Second * 15)return primitive.CommitMessageState
}
func main() {p, err := rocketmq.NewTransactionProducer(&OrderListener{},producer.WithNameServer([]string{"39.103.59.35:9876"}),)if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("transTopic", []byte("this is transaction message")))if err != nil {fmt.Printf("发送失败:%s\n", err)} else {fmt.Printf("发送成功:%s\n", res.String())}//回查time.Sleep(time.Hour)if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
}

http://www.ppmy.cn/ops/130169.html

相关文章

真题与解析 202309二级 青少年软件编程(Python)考级

青少年软件编程(Python)等级考试试卷(二级) 202309真题与解析 分数:100 题数:37 测试时长:60分钟

ReactNative 启动应用(2)

ReactNative 启动应用 简述 本节我们来看一下ReactNative在Android上启动Activity的流程,ReactNative在Android上也是一个Apk,它的实现全部都在应用层,所以它肯定也是符合我们Android应用的启动流程的,UI页面的载体也是一个Acti…

axios源码分析之请求adapter

axios源码分析之请求adapter axios changeLog 注:axios从 v1.7.0-beta.0 支持了fetch v1.7.0-beta.0 changgeLog Featuresadapter: add fetch adapter; (#6371) (a3ff99b)Contributors to this releaseavatar Dmitriy Mozgovoyavatar Jayv1.7.0-beta.0 之前的版…

内网穿透技术选型PPTP(点对点隧道协议)和 FRP(Fast Reverse Proxy)

PPTP(点对点隧道协议)和 FRP(Fast Reverse Proxy)是两种实现内网穿透的技术,但它们的工作原理、使用场景和特点有很大区别。以下是它们的详细比较: PPTP(Point-to-Point Tunneling Protocol&am…

C++接口集成、身份实名认证-游戏防沉迷,保障未成年人健康

随着互联网的快速发展,网络游戏在年轻人中越来越受欢迎。然而,未成年玩家长时间沉迷游戏的问题却引发了社会的广泛关注。为了应对这一现象,各大网络游戏平台纷纷引入翔云身份证实名认证接口,以有效辨别用户身份,建立完…

Xamarin 存档报错 XABLD7000 Xamarin.Tools.Zip.ZipException

Xamarin App 调试正常,存档时发生错误; XABLD7000: Xamarin.Tools.Zip.ZipException: Renaming temporary file failed: Permission denied 查了资料,说是要去掉 快速部署;如下图: 当我去掉勾选时,还是依然…

聚簇索引和非聚簇索引B+树的关系

在数据库系统中,聚簇索引和非聚簇索引通常都基于 B 树 实现(例如 MySQL 的 InnoDB 引擎)。尽管它们的数据存储方式有所不同,但其底层结构和 B 树 的特性相辅相成,适合于高效的查询操作。以下是聚簇索引、非聚簇索引和 …

Spring整合Mybatis过程

配置文件 springConfig --> [jdbcConfig mybatisConfig] jdbc配置文件进行基本的数据库连接池配置 mybatis配置文件进行SqlSessionFactory Bean 和 MapperScannerConfigurer Bean的创建 在Spring容器启动时,系统会根据配置创建并初始化所有MyBatis所需的Bean…