扩展知识:RocketMQ 如何开启 ACL 验证
RocketMQ 在 4.4.0 版本开始支持 ACL 功能,ACL 验证的主要作用就是保证消息的安全性,实现权限控制功能,比如控制可以发送和订阅消息的群体,如某些主题只能被订阅,某些主题只有指定的IP,或者只有携带账号密码才可以订阅和发布等。
了解 ACL 配置
在 RocketMQ 的 conf 目录下有一个 plain_acl.yml 文件,打开这个文件就可以看到有如下默认配置:
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*accounts:
- accessKey: RocketMQsecretKey: 12345678whiteRemoteAddress:admin: falsedefaultTopicPerm: DENYdefaultGroupPerm: SUBtopicPerms:- topicA=DENY- topicB=PUB|SUB- topicC=SUBgroupPerms:# the group should convert to retry topic- groupA=DENY- groupB=PUB|SUB- groupC=SUB- accessKey: rocketmq2secretKey: 12345678whiteRemoteAddress: 192.168.1.*# if it is admin, it could access all resourcesadmin: true
下面详细解释上述 plain_acl.yml 配置文件中的各项配置及其作用:
- globalWhiteRemoteAddresses
- 这是全局的白名单IP地址列表。
- 配置中的10.10.103.*和192.168.0.*表示所有以这些前缀开头的IP地址都可以访问RocketMQ服务,无需进一步的身份验证。
- 注意:这里定义的是全局白名单,意味着这些IP地址将绕过后续的账户验证,直接可以访问RocketMQ服务。
- accounts
- 这是一个账户列表,每个账户下面都定义了一系列的访问权限。
- accessKey 和 secretKey
- accessKey是账户的用户名。
- secretKey是账户的密码,与accessKey成对使用。
- whiteRemoteAddress
- 这个字段定义了账户的IP白名单。只有来自这些IP地址的客户端才能使用这个账户进行身份验证。
- 如果为空(如第一个账户),则没有IP限制。
- admin
- 如果设置为true,则该账户拥有管理员权限,可以访问所有资源。
- 如果设置为false或未设置,则账户将受到topicPerms和groupPerms中定义的权限限制。
- defaultTopicPerm 和 defaultGroupPerm
- 当某个主题或组在topicPerms或groupPerms中没有明确指定权限时,将使用这些默认值。
- 在上面的例子中,对于账户RocketMQ,如果某个主题没有在topicPerms中指定,那么默认是DENY(拒绝访问)。
- 同样,对于组也是这样的逻辑。
- topicPerms
- 这个字段定义了账户对各个主题的访问权限。
- 权限可以是PUB(发布)、SUB(订阅)或DENY(拒绝)。
- 可以使用|来组合多个权限,如PUB|SUB表示同时具有发布和订阅权限。
- 在上面的例子中,账户RocketMQ对topicA有DENY权限(即拒绝访问),对topicB有发布和订阅权限,对topicC只有订阅权限。
- groupPerms
- 这个字段定义了账户对各个消费者组的访问权限。
- 权限与topicPerms中的类似。
- 在上面的例子中,账户RocketMQ对groupA有DENY权限,对groupB有发布和订阅权限,对groupC只有订阅权限。但请注意,在RocketMQ中,消费者组通常只涉及订阅操作,所以发布权限可能在这里没有实际意义。
对于第二个账户rocketmq2,由于设置了admin: true,所以它将拥有对所有资源和主题的访问权限,无论是否在topicPerms或groupPerms中明确指定了权限。同时,它的IP白名单被限制为192.168.1.*。
如何开启 ACL
开启 ACL 只需要在 broker.conf 增加如下配置:aclEnable=true # 打开acl验证
如何使用
java">@Test
void producer() throws Exception{// 1.创建生产者,并配置 aclRPCHook acl = new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));DefaultMQProducer defaultMQProducer = new DefaultMQProducer("test-producer-group", acl);// 2.连接 name serverdefaultMQProducer.setNamesrvAddr("localhost:9876");// 3.启动生产者defaultMQProducer.start();// 4.创建消息并设置主题和消息体Message message = new Message("test-topic", "测试消息".getBytes());// 5.同步发送消息for (int i = 0; i < 3; i++) {System.out.println("开始发送第"+i+"条消息");SendResult send = defaultMQProducer.send(message);// 等待发送结果才能继续向下执行......System.out.println("第"+i+"条消息发送状态:" + send.getSendStatus());}// 等待发送结果才能继续向下执行......System.out.println("发送消息状态:" + send.getSendStatus());System.out.println("继续下面业务逻辑...");// 6.关闭生产者 defaultMQProducer.shutdown();
}
java">@Test
void consumer() throws Exception{// 1.创建 Push 消费,并设置组名和aclRPCHook acl = new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(acl);consumer.setConsumerGroup("test-consumer-group");// 2.连接 name serverconsumer.setNamesrvAddr("sitrmqsrv.axa.cn:9876");// 3.订阅主题 "*" 表示订阅这个主题的全部消息consumer.subscribe("test-topic","*");// 4.同步监听消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {MessageExt messageExt = list.get(0);System.out.println("收到的消息内容:" + new String(messageExt.getBody()));return ConsumeOrderlyStatus.SUCCESS;}});// 5.启动消费者consumer.start();// 挂在jvmSystem.in.read();
}
如果使用的是 SpringBoot 集成 RocketMQ 可以加上如下配置:
rocketmq: name-server: 127.0.0.1:9876 # RocketMQ NameServer地址 producer: group: my-producer-group # 生产者组名 access-key: RocketMQ # 生产者使用的accessKey secret-key: 12345678 # 生产者使用的secretKey consumer: group: my-consumer-group # 消费者组名 access-key: RocketMQ # 消费者使用的accessKey(可以与生产者相同或不同) secret-key: 12345678 # 消费者使用的secretKey(可以与生产者相同或不同)