源码下载
RocketMq下载
运行
这是rocketmq源码大致的业务分层,本地调试主要是启动nameserver和broker,其他的发送和接收的实现代码可直接使用example包中的官方例子,也可以自己编码代码实现。
- 启动namesrv包下的启动类,NamesrvStartup
Connected to the target VM, address: '127.0.0.1:9351', transport: 'socket'
Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation
Disconnected from the target VM, address: '127.0.0.1:9351', transport: 'socket'
初次启动的时候它会提示这个异常信息,提示你没有配置ROCKETMQ_HOME变量。
从字面上理解其实就是rocketmq源码在我们磁盘上的一个路径而已。
只需要在启动配置里面添加上如上配置即可,ROCKETMQ_HOME=你的rocketmq磁盘路径,下面的broker模块启动的时候也要配置上去。
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
看到这个就是启动成功了。
- 启动broker,运行的是BrokerStartup
xxx boot success. serializeType=JSON
看到这个就是启动成功了。
- 运行producer和consumer
这里运行的不是源码包中的example样例,而是自己实现的producer和consumer。
producer如下:
public static void main(String[] args) {String nameServer = "localhost:9876";DefaultMQProducer defaultMQProducer = new DefaultMQProducer();defaultMQProducer.setNamesrvAddr(nameServer);defaultMQProducer.setProducerGroup("test-group");try {defaultMQProducer.start();String message = "hello";SendResult send = defaultMQProducer.send(new Message("test", message.getBytes()));System.out.println("send hello over");System.out.println(send.getSendStatus());defaultMQProducer.shutdown();} catch (Exception e) {e.printStackTrace();}}
consumer如下:
public static void main(String[] args) {String nameServer = "localhost:9876";DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();defaultMQPushConsumer.setNamesrvAddr(nameServer);defaultMQPushConsumer.setConsumerGroup("test-consumer-group");try {defaultMQPushConsumer.subscribe("test", (String) null);defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String s = new String(msg.getBody());System.out.println(s);}return ConsumeOrderlyStatus.SUCCESS;}});defaultMQPushConsumer.start();TimeUnit.SECONDS.sleep(10);defaultMQPushConsumer.shutdown();} catch (Exception e) {e.printStackTrace();}}
运行producer提示如下异常信息:
**org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: test**
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details.at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:718)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1400)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:351)
从错误信息中可以看到,它是因为没有路由信息,其实说白了就是没有创建这个topic。
在BrokerStartup类中,如上图位置,添加nameserver的地址即可。
- 输出结果
这就是consumer消费的消息。
本地rocketmq可以正常启动以后,就可以按照rocketmq的业务逻辑进行debug调试了,便于我们更好的理解rocketmq的原理。