如何在Linux环境中的Qt项目中使用ActiveMQ-CPP

news/2024/11/1 11:51:53/

文章目录

  • 代码1:消费者
  • 代码2:生成者

之前在Linux下的qt程序中使用activeMQ的时候也是用了很多时间去研究,本来想的是好好记录一下,但是当时顾着写代码。很多细节也不想再去走一遍了。大概写一下怎么使用就行了。注意:一定要先开启服务器。

代码1:消费者

代码;这个代码其实是官网的源码里面提供的例子
我来对这个代码做个解释,也是作为自己的笔记。

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleAsyncConsumer : public ExceptionListener,public MessageListener,public DefaultTransportListener {
private:Connection* connection;Session* session;Destination* destination;MessageConsumer* consumer;bool useTopic;std::string brokerURI;std::string destURI;bool clientAck;private:SimpleAsyncConsumer(const SimpleAsyncConsumer&);SimpleAsyncConsumer& operator=(const SimpleAsyncConsumer&);public:SimpleAsyncConsumer(const std::string& brokerURI,const std::string& destURI,bool useTopic = false,bool clientAck = false) :connection(NULL),session(NULL),destination(NULL),consumer(NULL),useTopic(useTopic),brokerURI(brokerURI),destURI(destURI),clientAck(clientAck) {}virtual ~SimpleAsyncConsumer() {this->cleanup();}void close() {this->cleanup();}void runConsumer() {try {// Create a ConnectionFactoryActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(brokerURI);// Create a Connectionconnection = connectionFactory->createConnection();delete connectionFactory;ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection);if (amqConnection != NULL) {amqConnection->addTransportListener(this);}//如果不是使用的failover连接的话。如果服务器没有开启,执行这一步的时候会抛出错误//如果是使用failover连接,执行到这一步的时候,就不会往下继续执行了,就一直卡在这里,直到和服务器建立连接//failover连接是一个什么模式我也不是很清楚,我只知道在使用这个模式连接的时候,它是会自动重连的。即使服务器中途断了,服务器再		    	次开启的时候,它是能自动连接上的connection->start();//监听异常。有异常抛出的时候会执行相关的函数connection->setExceptionListener(this);// Create a Sessionif (clientAck) {session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);} else {session = connection->createSession(Session::AUTO_ACKNOWLEDGE);}// Create the destination (Topic or Queue)//使用queue模式还是topic模式//queue模式是一对一连接:一个生成者一个消费者。//topic模式是多对多连接;可以有多个生成者和多个消费者//如果你需要既发送有接收的消息的话,这个情况大概率是应该使用topic模式的//在使用topic模式的时候,你自己通过生成者发出去的消息,你自己的消费者也是会收到这个消息的//这个时候,你可以设置消息过滤,或者使用消息属性识别。if (useTopic) {destination = session->createTopic(destURI);} else {destination = session->createQueue(destURI);}// Create a MessageConsumer from the Session to the Topic or Queueconsumer = session->createConsumer(destination);consumer->setMessageListener(this);} catch (CMSException& e) {e.printStackTrace();}}// Called from the consumer since this class is a registered MessageListener.virtual void onMessage(const Message* message) {static int count = 0;try {count++;const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);string text = "";if (textMessage != NULL) {text = textMessage->getText();} else {text = "NOT A TEXTMESSAGE!";}if (clientAck) {message->acknowledge();}printf("Message #%d Received: %s\n", count, text.c_str());} catch (CMSException& e) {e.printStackTrace();}}// If something bad happens you see it here as this class is also been// registered as an ExceptionListener with the connection.virtual void onException(const CMSException& ex AMQCPP_UNUSED) {printf("CMS Exception occurred.  Shutting down client.\n");exit(1);}virtual void onException(const decaf::lang::Exception& ex) {printf("Transport Exception occurred: %s \n", ex.getMessage().c_str());}virtual void transportInterrupted() {std::cout << "The Connection's Transport has been Interrupted." << std::endl;}virtual void transportResumed() {std::cout << "The Connection's Transport has been Restored." << std::endl;}private:void cleanup(){//*************************************************// Always close destination, consumers and producers before// you destroy their sessions and connection.//*************************************************// Destroy resources.try{if( destination != NULL ) delete destination;}catch (CMSException& e) {}destination = NULL;try{if( consumer != NULL ) delete consumer;}catch (CMSException& e) {}consumer = NULL;// Close open resources.try{if( session != NULL ) session->close();if( connection != NULL ) connection->close();}catch (CMSException& e) {}// Now Destroy themtry{if( session != NULL ) delete session;}catch (CMSException& e) {}session = NULL;try{if( connection != NULL ) delete connection;}catch (CMSException& e) {}connection = NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {activemq::library::ActiveMQCPP::initializeLibrary();std::cout << "=====================================================\n";std::cout << "Starting the example:" << std::endl;std::cout << "-----------------------------------------------------\n";// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc.  See the CMS web site for// a full list of configuration options.////  http://activemq.apache.org/cms///// Wire Format Options:// =====================// Use either stomp or openwire, the default ports are different for each//// Examples://    tcp://127.0.0.1:61616                      default to openwire//    tcp://127.0.0.1:61616?wireFormat=openwire  same as above//    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead//std::string brokerURI ="failover:(tcp://127.0.0.1:61616"
//        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"")";//============================================================// This is the Destination Name and URI options.  Use this to// customize where the consumer listens, to have the consumer// use a topic or queue set the 'useTopics' flag.//============================================================std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";//============================================================// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the consumer.//============================================================bool useTopics = false;//============================================================// set to true if you want the consumer to use client ack mode// instead of the default auto ack mode.//============================================================bool clientAck = false;// Create the consumerSimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );// Start it up and it will listen forever.consumer.runConsumer();// Wait to exit.std::cout << "Press 'q' to quit" << std::endl;while( std::cin.get() != 'q') {}// All CMS resources should be closed before the library is shutdown.consumer.close();std::cout << "-----------------------------------------------------\n";std::cout << "Finished with the example." << std::endl;std::cout << "=====================================================\n";activemq::library::ActiveMQCPP::shutdownLibrary();
}

代码2:生成者

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleProducer : public Runnable {
private:Connection* connection;Session* session;Destination* destination;MessageProducer* producer;bool useTopic;bool clientAck;unsigned int numMessages;std::string brokerURI;std::string destURI;private:SimpleProducer( const SimpleProducer& );SimpleProducer& operator= ( const SimpleProducer& );public:SimpleProducer( const std::string& brokerURI, unsigned int numMessages,const std::string& destURI, bool useTopic = false, bool clientAck = false ) :connection(NULL),session(NULL),destination(NULL),producer(NULL),useTopic(useTopic),clientAck(clientAck),numMessages(numMessages),brokerURI(brokerURI),destURI(destURI) {}virtual ~SimpleProducer(){cleanup();}void close() {this->cleanup();}virtual void run() {try {// Create a ConnectionFactoryauto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );// Create a Connectiontry{connection = connectionFactory->createConnection();connection->start();} catch( CMSException& e ) {e.printStackTrace();throw e;}// Create a Sessionif( clientAck ) {session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );} else {session = connection->createSession( Session::AUTO_ACKNOWLEDGE );}// Create the destination (Topic or Queue)if( useTopic ) {destination = session->createTopic( destURI );} else {destination = session->createQueue( destURI );}// Create a MessageProducer from the Session to the Topic or Queueproducer = session->createProducer( destination );producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );// Create the Thread Id Stringstring threadIdStr = Long::toString( Thread::currentThread()->getId() );// Create a messagesstring text = (string)"Hello world! from thread " + threadIdStr;for( unsigned int ix=0; ix<numMessages; ++ix ){TextMessage* message = session->createTextMessage( text );//这里有一个setIntProperty,应该也会有一个对应的getIntProperty(我没有去看有没有)。你可以在消费者获取消息的时候调用这个函数,函数属性是否一致,如果一致说明是自己发送的消息,就可以不去处理它。//可以确认的是TextMessage是有一个setPropertyString和getPropertyString函数的,这个函数可以用了区别是否是自己的消息。message->setIntProperty( "Integer", ix );// Tell the producer to send the messageprintf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );producer->send( message );delete message;}}catch ( CMSException& e ) {e.printStackTrace();}}private:void cleanup(){// Destroy resources.try{if( destination != NULL ) delete destination;}catch ( CMSException& e ) { e.printStackTrace(); }destination = NULL;try{if( producer != NULL ) delete producer;}catch ( CMSException& e ) { e.printStackTrace(); }producer = NULL;// Close open resources.try{if( session != NULL ) session->close();if( connection != NULL ) connection->close();}catch ( CMSException& e ) { e.printStackTrace(); }try{if( session != NULL ) delete session;}catch ( CMSException& e ) { e.printStackTrace(); }session = NULL;try{if( connection != NULL ) delete connection;}catch ( CMSException& e ) { e.printStackTrace(); }connection = NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {//这个函数只调用一次activemq::library::ActiveMQCPP::initializeLibrary();std::cout << "=====================================================\n";std::cout << "Starting the example:" << std::endl;std::cout << "-----------------------------------------------------\n";// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc.  See the CMS web site for// a full list of configuration options.////  http://activemq.apache.org/cms///// Wire Format Options:// =====================// Use either stomp or openwire, the default ports are different for each//// Examples://    tcp://127.0.0.1:61616                      default to openwire//    tcp://127.0.0.1:61616?wireFormat=openwire  same as above//    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead//std::string brokerURI ="failover://(tcp://127.0.0.1:61616"
//        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"")";//============================================================// Total number of messages for this producer to send.//============================================================unsigned int numMessages = 2000;//============================================================// This is the Destination Name and URI options.  Use this to// customize where the Producer produces, to have the producer// use a topic or queue set the 'useTopics' flag.//============================================================std::string destURI = "TEST.FOO";//============================================================// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the producer.//============================================================bool useTopics = false;// Create the producer and run it.SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );// Publish the given number of Messagesproducer.run();// Before exiting we ensure that all CMS resources are closed.producer.close();std::cout << "-----------------------------------------------------\n";std::cout << "Finished with the example." << std::endl;std::cout << "=====================================================\n";activemq::library::ActiveMQCPP::shutdownLibrary();
}

http://www.ppmy.cn/news/1543571.html

相关文章

Terraform Provider 加速方案

引言 Terraform 是一个安全和高效的用来构建、更改和管理基础架构的工具&#xff0c;其实现了基础设施即代码&#xff08;Infrastructure-as-Code&#xff0c;IaC&#xff09;&#xff0c;即通过代码来描述基础设施&#xff0c;可以通过代码来管理例如 AWS、阿里云等公有云资源…

呼吁中兴向全国免费开放专利许可,支持国产创新,你支持吗?

10月21日&#xff0c;联想向英格兰和威尔士高等法院起诉中兴专利侵权&#xff0c;引发网友热议。 10月30日&#xff0c;中兴在网上做出回应&#xff0c;表示虽难以理解但仍表示尊重。 至此&#xff0c;围观群众的目光也被两家大厂彻底吸引。联想、中兴为何对簿公堂&#xff0…

【17】 傅立叶分析

傅立叶分析(Fourier Analysis)是Excel数据分析工具库中的一种方法,用于将时间序列数据分解为不同频率的正弦波(sinusoidal components)。它特别适用于分析周期性数据或信号处理,帮助用户发现数据中的周期性模式、频率成分及其幅度。 傅立叶变换将复杂的时间序列数据转化…

第N5周:Pytorch文本分类入门

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 本周任务&#xff1a; 了解文本分类的基本流程学习常用数据清洗方法学习如何使用jieba实现英文分词学习如何构建文本向量 前期准备 加载数据 import torch…

LeetCode Hot 100:贪心算法

LeetCode Hot 100&#xff1a;贪心算法 121. 买卖股票的最佳时机 class Solution { public:int maxProfit(vector<int>& prices) {int minPrice INT_MAX;int maxProfit 0;for (int& price : prices) {minPrice min(minPrice, price);maxProfit max(maxProf…

AWS CDK 漏洞使黑客能够接管 AWS 账户

Aquasec 的安全研究人员最近在 AWS Cloud Development Kit &#xff08;CDK&#xff09; 中发现了一个关键漏洞&#xff0c;该漏洞可能允许攻击者获得对目标 AWS 账户的完全管理访问权限。 该问题于 2024 年 6 月报告给 AWS&#xff0c;影响使用版本 v2.148.1 或更早版本的 CD…

蓝牙BLE开发——红米手机无法搜索蓝牙设备?

解决 红米手机&#xff0c;无法搜索附近蓝牙设备 具体型号当时忘记查看了&#xff0c;如果你遇到有以下选项&#xff0c;记得打开~ 设置权限

初探Servlet

文章目录 1. Servlet概述1.1 定义1.2 作用 2. 主要知识点2.1 生命周期2.2 请求处理2.3 Servlet配置 3. 案例演示3.1 创建Web应用项目3.2 修改项目工件名3.3 重新部署Web项目3.4 创建WelcomeServlet3.5 编写doGet方法代码3.6 编写doPost方法代码3.7 访问WelcomeServlet 4. 小结 …