RabbitMQ Stream插件使用详解

ops/2024/12/1 8:32:56/

2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

将spring rabbit流依赖项添加到项目中:

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId><version>3.1.4</version>
</dependency>

您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:

@Bean
Queue stream() {return QueueBuilder.durable("stream.queue1").stream().build();
}

然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:

@Bean
StreamAdmin streamAdmin(Environment env) {return new StreamAdmin(env, sc -> {sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();sc.stream("stream.queue2").create();});
}

一、Sending Messages

RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

public interface RabbitStreamOperations extends AutoCloseable {CompletableFuture<Boolean> send(Message message);CompletableFuture<Boolean> convertAndSend(Object message);CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);MessageBuilder messageBuilder();MessageConverter messageConverter();StreamMessageConverter streamMessageConverter();@Overridevoid close() throws AmqpException;}

RabbitStreamTemplate实现具有以下构造函数和属性:

public RabbitStreamTemplate(Environment environment, String streamName) {
}public void setMessageConverter(MessageConverter messageConverter) {
}public void setStreamConverter(StreamMessageConverter streamConverter) {
}public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。

StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。

您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。

ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。

 二、Receiving Messages

异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。

侦听器容器需要一个Environment以及一个流名称。

您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

public interface StreamMessageListener extends MessageListener {void onStreamMessage(Message message, Context context);}

有关支持的属性的信息,请参阅消息侦听器容器配置。

与模板类似,容器具有ConsumerCustomizer属性。

有关自定义环境和使用者的信息,请参阅Java客户端文档。

使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。

三、Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");template.setProducerCustomizer((name, builder) -> builder.name("test"));return template;
}@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {return new StreamRabbitListenerContainerFactory(env);
}@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {...
}@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);factory.setNativeListener(true);factory.setConsumerCustomizer((id, builder) -> {builder.name("myConsumer").offset(OffsetSpecification.first()).manualTrackingStrategy();});return factory;
}@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {...context.storeOffset();
}@Bean
Queue stream() {return QueueBuilder.durable("test.stream.queue1").stream().build();
}@Bean
Queue stream() {return QueueBuilder.durable("test.stream.queue2").stream().build();
}

2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {StreamRetryOperationsInterceptorFactoryBean rfb =new StreamRetryOperationsInterceptorFactoryBean();rfb.setRetryOperations(retryTemplate);rfb.setStreamMessageRecoverer((msg, context, throwable) -> {...});return rfb;
}

四、Super Streams

超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。

1、调配

为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。

@Bean
SuperStream superStream() {return new SuperStream("my.super.stream", 3);
}

RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。

如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:

@Bean
SuperStream superStream() {return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i).mapToObj(j -> "rk-" + j).collect(Collectors.toList()));
}

key 的数量必须等于分区的数量。

2、向超级流生产消息

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");template.setSuperStreamRouting(message -> {// some logic to return a String for the client's hashing algorithm});return template;
}

你也可以通过AMQP发布,使用 RabbitTemplate


http://www.ppmy.cn/ops/10694.html

相关文章

「PHP系列」PHP Cookie/Session详解

文章目录 一、PHP Cookie1. Cookie的基本概念2. PHP中操作Cookie的常用函数3. Cookie案例代码设置Cookie读取Cookie删除Cookie 4. 注意事项 二、PHP Session1. PHP Session的基本概念2. PHP中操作Session的常用函数3. Session案例代码启动Session并设置数据读取Session数据销毁…

Llama3-8B+ LLaMA-Factory 中文微调

Llama3-8B LLaMA-Factory 中文微调 Llama3是目前开源大模型中最优秀的模型之一&#xff0c;但是原生的Llama3模型训练的中文语料占比非常低&#xff0c;因此在中文的表现方便略微欠佳&#xff01; 本教程就以Llama3-8B-Instruct开源模型为模型基座&#xff0c;通过开源程序LL…

代码随想录算法训练营day4 | 24. 两两交换链表中的节点、19.删除链表的倒数第N个节点、160.链表相交、142.环形链表II

24. 两两交换链表中的节点 使用哑结点&#xff0c;两个指针交换时多声明几个变量&#xff0c;不容易出错 class Solution:def swapPairs(self, head: Optional[ListNode]) -> Optional[ListNode]:dummy_head ListNode(0, head)cur dummy_headwhile cur.next and cur.nex…

Matplotlib官网查阅资料

Matplotlib官网详细的地址&#xff1a; 英文文档&#xff1a;https://matplotlib.org/stable/contents.html中文文档&#xff1a;https://www.matplotlib.org.cn/ Matplotlib英文官网: 查找属性&#xff1a; 1.进入官网。 2.查找参数属性。 Matplotlib中文官网: 查找属性:…

使用阿里云试用Elasticsearch学习:sentence-transformers 包使用

环境&#xff1a;centos8&#xff0c;windows坑太多。 一、检查linux环境openssl哪个版本&#xff08;如果是OpenSSL 1.1.1k 直接跳过&#xff09; [roothecs-334217 python39]# openssl version OpenSSL 1.0.2k-fips 26 Jan 2017原因后续会出麻烦&#xff0c;遇到这种情况最…

网络IO模型 select poll epoll的区别

epoll与select、poll的对比 1. 用户态将文件描述符传入内核的方式 select&#xff1a;创建3个文件描述符集并拷贝到内核中&#xff0c;分别监听读、写、异常动作。这里受到单个进程可以打开的fd数量限制&#xff0c;默认是1024。 poll&#xff1a;将传入的struct pollfd结构…

48-PCIE转串口和并口电路设计

视频链接 PCIE转串口和并口电路设计01_哔哩哔哩_bilibili PCIe转串口和并口电路设计 1、PCIe转串并口电路设计基本介绍 2、PCIe转串口和并口的方案(京东) 2.1、PCIe转串口 2.1.1、ASIX (亚信)MCS9922-PCIe转2路RS232扩展卡 2.1.2、ASIX (亚信)MCS9900-PCIe转4路RS232扩展卡…

论文阅读:BEVBert: Multimodal Map Pre-training for Language-guided Navigation

BEVBert&#xff1a;语言引导导航的多模态地图预训练 摘要 现存的问题&#xff1a;目前大多数现有的预训练方法都采用离散的全景图来学习视觉-文本关联。这要求模型隐式关联全景图中不完整、重复的观察结果&#xff0c;这可能会损害智能体的空间理解。 本文解决方案&#xf…