ActiveMQ高级特性和大厂面试常考重点

news/2025/1/11 4:20:18/

目录

一、引入消息队列之后该如何保证其高可用性

二、异步投递Async Sends

三、延迟投递和定时投递

四、ActiveMQ消费重试机制

五、死信队列

六、如何保证消息不被重复消费呢?幂等性问题你谈谈


一、引入消息队列之后该如何保证其高可用性

 ActiveMQ集群模式_zoeil的博客-CSDN博客
二、异步投递Async Sends

(一)官网

http://activemq.apache.org/async-sends

(二)解释

ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。ActiveMQ默认使用异步发送的模式;除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。


如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻寒producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息


异步发送
它可以最大化produer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,
就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加;此外它不能有效的确保消息的发送成功。在useAsyncSend=true的情况下客户端需要突忍消息丢失的可能。

(三)开启方式

①Activemq_URL带参数

public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616?jms.userAsyncSend=true";

②ActiveMQConnectionFactory的setUseAsyncSend

activeMQConnectionFactory.setUseAsyncSend(true);

(四)面试追问:异步发送如何确保发送成功 

异步发送丢失消息的场景是: 生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。
由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。
如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
所以,正确的异步发送方法是需要接收回调的
同步发送和异步发送的区别就在此,
同步发送等send不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。

回调测试

生产者

public class JMSProduce {public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";public static final String QUEUE_NAME = "Async";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);activeMQConnectionFactory.setUseAsyncSend(true);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);activeMQMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);for(int i = 1 ; i <= 3; i ++ ) {TextMessage textMessage = session.createTextMessage("AsyncMessage--" + i);textMessage.setJMSMessageID(UUID.randomUUID().toString()+"order001");String messageID = textMessage.getJMSMessageID();activeMQMessageProducer.send(textMessage, new AsyncCallback() {@Overridepublic void onSuccess() {System.out.println("成功发送消息Id:"+messageID);}@Overridepublic void onException(JMSException e) {System.out.println("失败发送消息Id:"+messageID);}});}activeMQMessageProducer.close();session.close();connection.close();System.out.println("消息发布到MQ完成……");}
}

 点击Browse查看队列具体信息

(五)总结

  • 异步发送可以让生产者发的更快。
  • 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。

三、延迟投递和定时投递

(一)官网

ActiveMQ

  

(二)案例

1.配置文件修改并重启服务

添加 schedulerSupport=true

2.java代码里面封装的辅助消息类型:ScheduleMessage

生产者

public class JMSProduce {public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";public static final String QUEUE_NAME = "schedule";public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);activeMQConnectionFactory.setUseAsyncSend(true);Connection connection = activeMQConnectionFactory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(QUEUE_NAME);ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);activeMQMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);long delay =  10*1000;long period = 5*1000;int repeat = 3 ;for(int i = 1 ; i <= 3; i ++ ) {TextMessage textMessage = session.createTextMessage("scheduleMessage--" + i);// 延迟的时间textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);// 重复投递的时间间隔textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);// 重复投递的次数textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);// 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。activeMQMessageProducer.send(textMessage);}activeMQMessageProducer.close();session.close();connection.close();System.out.println("消息发布到MQ完成……");}
}

消费者

public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";public static final String QUEUE_NAME = "schedule";public static void main(String[] args) throws JMSException, IOException {//1.创建链接工厂,按照给定的url地址,采用默认的用户和密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过链接工厂,获得Connection并启动Connection connection = activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//3.1 两个参数 ①事务 ②签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4.创建目的地(queue or topic)Queue queue = session.createQueue(QUEUE_NAME);//5.创建消费者MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if(null != message && message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("监听到队列消息:"+textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});//连接AvtiveMQ需要等待时间System.in.read();consumer.close();session.close();connection.close();}
}

 
四、ActiveMQ消费重试机制

(一)官网

http://activemq.apache.org/redelivery-policy

(二)解释

消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。

那些情况会引起消息重发

  1. Client用了transactions且再session中调用了rollback
  2. Client用了transactions且再调用commit之前关闭或者没有commit
  3. Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

(三)请说说消息重发时间间隔和重发次数

间隔:1

次数:6

(四)有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次,可修改)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。 

(五)属性说明

(六)代码验证

模拟上面可能发生消费重试的第二种情况

生产者代码同上

消费者

开启事务,却没有commit。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。代码:

public class JmsPoisonConsumer {private static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";private static final String ACTIVEMQ_QUEUE_NAME = "dead01";public static void main(String[] args) throws JMSException, IOException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);Connection connection = activeMQConnectionFactory.createConnection();connection.start();final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);MessageConsumer messageConsumer = session.createConsumer(queue);messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("***消费者接收到的消息:   " + textMessage.getText());//session.commit();}catch (Exception e){e.printStackTrace();}}}});//关闭资源System.in.read();
//        session.commit();  本应该commit,这里为演示重发机制,故意不提交事务messageConsumer.close();session.close();connection.close();}
}

(七)修改默认重试次数

消费者。除橙色代码外,其他代码都和之前一样。修改重试次数为3。更多的设置请参考官网文档

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 修改默认参数,设置消息消费重试3
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);

        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

(八)整合spring

五、死信队列

(一)官网

ActiveMQ

死信队列:异常消息规避处理的集合,主要处理失败的消息。

(二)生产环境

(三)死信队列的配置(一般采用默认)

 1.sharedDeadLetterStrategy

不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。

 2.individualDeadLetterStrategy

可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。

 3.自动删除过期消息

过期消息是值生产者指定的过期时间,超过这个时间的消息。

 > 类似于mysql的*,指对所有的队列都起效

4.存放非持久消息到死信队列中

 

六、如何保证消息不被重复消费呢?幂等性问题你谈谈

 幂等性如何解决,根据messageid去查这个消息是否被消费了。

 


http://www.ppmy.cn/news/6966.html

相关文章

题:孤独的照片

Farmer John 最近购入了 NN 头新的奶牛&#xff0c;每头奶牛的品种是更赛牛&#xff08;Guernsey&#xff09;或荷斯坦牛&#xff08;Holstein&#xff09;之一。 奶牛目前排成一排&#xff0c;Farmer John 想要为每个连续不少于三头奶牛的序列拍摄一张照片。 然而&#xff0…

redis集群操作

Redis集群1 集群2 集群架构图3 集群细节4 集群搭建4.1.创建集群4.2.查看集群状态4.3.添加主节点4.4.添加从节点4.5.删除副本节点4.6.集群在线分片1 集群 Redis在3.0后开始支持Cluster(模式)模式&#xff0c;目前redis的集群支持节点的自动发现&#xff0c;支持slave-master选举…

微服务 Spring Boot Mybatis-Plus 整合 EasyPOI 实现 Excel 一对多 导入

文章目录⛄引言一、EasyPOI 实现Excel 的一对多导入 -- 代码实现⛅需求说明⚡核心源码实现二、Easy POI 实现一对多导入 -- 测试三、效果图展示⛵小结⛄引言 Excel导入 是 开发中 很常用的 功能 &#xff0c;本篇讲解 如何使用 Spring Boot MyBatis -Plus 整合 EasyPOI 实现E…

使用JDBC+javafx写一个简单功能齐全的图书管理系统

目录 1、JDBC的使用 2、对应包和Java文件的层级关系及对应的含义 3、数据库 4、相关代码 1&#xff09;、bookmanager包 Ⅰ、main函数 Ⅱ、utils包 Ⅲ、bean包 Ⅳ、controller包 2)resources(为资源文件包&#xff0c;可以看链接文章了解) Ⅰ、book包 Ⅱ、 login包…

微服务洞察,让微服务更透明

作者&#xff1a; 屿山 微服务作为云原生时代下一种开发软件的架构和组织方法&#xff0c;通过将明确定义的功能分成更小的服务&#xff0c;并让每个服务独立迭代&#xff0c;增加了应用程序的灵活性&#xff0c;允许开发者根据需要更轻松地更改部分应用程序。同时每个微服务可…

【安全漏洞】水平权限漏洞和垂直权限漏洞

前言 权限校验非常重要。如果不对水平、垂直权限做校验&#xff0c;就会发生泄漏用户数据的事故&#xff0c;造成P0故障。 一、水平权限漏洞 1、水平权限漏洞基本概念 什么是水平权限漏洞呢&#xff1f; 简单来说&#xff0c;水平权限漏洞是用户CURD了本不属于他的资源。以上图…

《MongoDB入门教程》第21篇 CRUD之删除文档

本文将会介绍如何利用 deleteOne() 和 deleteMany() 方法删除满足指定条件的文档。 deleteOne() 方法 deleteOne() 方法用于删除集合中的单个文档&#xff0c;语法如下&#xff1a; db.collection.deleteOne(filter, option)该方法包含两个参数&#xff1a; filter 是一个必…

项目沟通怎么才能不像在吵架?

项目沟通并非吵架&#xff0c;看起来却总是剑拔弩张。有效沟通才能真正解决问题&#xff0c;笔者给出了一些实用的建议&#xff0c;从对象到场景&#xff0c;再到方法与技巧&#xff0c;应该在沟通中有针对性地注意这些问题。 沟通是个老话题&#xff0c;在项目管理中有专门讲沟…