Windows10配置C++版本的Kafka,并进行发布和订阅测试

devtools/2025/2/26 4:42:51/

配置的环境为:Release x64下的环境

完整项目:https://gitee.com/jiajingong/kafka-publisher

1、首先下载相应的库文件(.lib,.dll)

参考链接:

GitHub - eStreamSoftware/delphi-kafka

GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll

2、新建一个新的命令行C++工程

建完工程后,选择Release x64,并在生成中执行重新生成解决方案,这样会在项目目录下生成x64/Release文件夹

3、通过VS2017配置附加库目录和附加依赖项

所有的.lib、.dll等库文件均在下图x64/Release目录下

附加依赖项加入:librdkafka.lib;librdkafkacpp.lib,如下图:

4、发布端:

将主函数的CPP文件改为:

#include <iostream>
#include <thread>
#include "rdkafkacpp.h"int main()
{std::string brokers = "172.18.4.96:9092";std::string errorStr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (!conf) {std::cout << "Create RdKafka Conf failed" << std::endl;return -1;}conf->set("message.max.bytes", "10240000", errorStr); //最大字节数conf->set("replica.fetch.max.bytes", "20485760", errorStr);conf->set("bootstrap.servers", brokers, errorStr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);if (!producer) {std::cout << "Create Producer failed" << std::endl;return -1;}//创建TopicRdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);if (!topic) {std::cout << "Create Topic failed" << std::endl;}int count = 0;while (true){   //发送消息RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;if (resCode != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;}count += 1;std::this_thread::sleep_for(std::chrono::seconds(1));}delete conf;delete tconf;delete topic;delete producer;RdKafka::wait_destroyed(5000);return 0;
}

5、订阅端

新建一个同样的订阅端工程,同样将主函数的代码改为:

#include "rdkafkacpp.h"
#include <chrono>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <iostream>
#include <algorithm>
#include <iterator>void consume_cb(RdKafka::Message &message, void *opaque)
{switch (message.err()) {case RdKafka::ERR__TIMED_OUT:std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;break;case RdKafka::ERR_NO_ERROR:/* Real message */RdKafka::MessageTimestamp ts;ts = message.timestamp();if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {std::string timeprefix;if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {timeprefix = "created time";}else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {timeprefix = "log append time";}unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改auto mTime = std::chrono::milliseconds(milli);auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);auto tt = std::chrono::system_clock::to_time_t(tp);tm timeinfo;::gmtime_s(&timeinfo, &tt);//char s[60]{ 0 };//::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);// std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
#if 0std::stringstream ss;std::string dateStr;ss << timeinfo.tm_year + 1900 << "-"<< timeinfo.tm_mon + 1 << "-"<< timeinfo.tm_mday;ss >> dateStr;ss.clear();ss << timeinfo.tm_hour << ":"<< timeinfo.tm_min << ":"<< timeinfo.tm_sec;std::string timeStr;ss >> timeStr;std::string dateTimeStr;dateTimeStr += dateStr;dateTimeStr.push_back(' ');dateTimeStr += timeStr;
#endif // 0//std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;std::cout << "TimeStamp   " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;}std::cout << message.topic_name() << " offset" << message.offset() << "  partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;break;case RdKafka::ERR__PARTITION_EOF:/* Last message */std::cout << "EOF reached for" << std::endl;break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cout << "Consume failed: " << message.errstr();break;default:/* Errors */std::cout << "Consume failed: " << message.errstr();break;}
}
int main()
{std::string brokers = "172.18.4.96:9092";std::string errstr;std::vector<std::string> topics{ "koala-stqf-03","klai-seim-alert-koala-test-03"};std::string group_id = "whl-consumer-group";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if (conf->set("group.id", group_id, errstr)) {std::cout << errstr << std::endl;return -1;}conf->set("bootstrap.servers", brokers, errstr);conf->set("max.partition.fetch.bytes", "1024000", errstr);//conf->set("enable-auto-commit", "true", errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);tconf->set("auto.offset.reset", "latest", errstr);conf->set("default_topic_conf", tconf, errstr);RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!m_consumer) {std::cout << "failed to create consumer " << errstr << std::endl;return -1;}#if 0 //从上一次消费结束的位置开始消费RdKafka::ErrorCode err = m_consumer->subscribe(topics);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;return -1;}
#else //指定每个topic的每个分区开始消费的位置//基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费RdKafka::Metadata *metadataMap{ nullptr };RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;}const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();std::cout << "broker topic size: " << topicList->size() << std::endl;RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();});std::vector<RdKafka::TopicPartition*> topicpartions;std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {auto parVec = data->partitions();std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));});});m_consumer->assign(topicpartions);
#endif // 0RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);if (errccc != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(errccc) << std::endl;return -1;}while (true){RdKafka::Message *msg = m_consumer->consume(6000);consume_cb(*msg, nullptr); //消息一条消息delete msg;}return 0;
}

6、发布 订阅展示:


http://www.ppmy.cn/devtools/162727.html

相关文章

【每日论文】TESS 2: A Large-Scale Generalist Diffusion Language Model

下载PDF或阅读论文&#xff0c;请点击&#xff1a;LlamaFactory - huggingface daily paper - 每日论文解读 | LlamaFactory | LlamaFactory 摘要 我们推出了TESS 2&#xff0c;这是一种通用的指令跟随扩散语言模型&#xff0c;其性能优于当代的指令调整扩散模型&#xff0c;有…

【NLP 23、预训练语言模型】

人类发明后悔&#xff0c;来证明拥有的珍贵 —— 25.1.15 Bert的优势&#xff1a;① 预训练思想 ② Transformer模型结构 一、传统方法 VS 预训练方式 Pre-train&#xff1a; ① 收集海量无标注文本数据 ② 进行模型预训练&#xff0c;并在任务模型中使用 Fine-tune&#xff1a…

MySQL 中的索引数量是否越多越好?

不是越多越好&#xff0c;我们要根据实际需要来增加索引。InnoDB 中每创建一个索引&#xff0c;就会多维护一个B树结构。索引的目的是为我们带来查询效率上的提高。如果不是频繁使用的查询字段&#xff0c;没有必要创建索引。 增加索引带来的问题 需要的磁盘存储空间增大&…

PDF无限使用,永久免费!

今天我给大家安利一个超好用的PDF处理网站&#xff0c;简直是处理文件的神器&#xff01; 这个网站完全免费&#xff0c;没有任何限制&#xff0c;用起来特别爽&#xff01; 它是一个在线的PDF编辑平台&#xff0c;完全不用担心付费或者注册的问题。 这里没有VIP和普通用户的区…

使用C++实现简单的TCP服务器和客户端

使用C实现简单的TCP服务器和客户端 介绍准备工作1. TCP服务器实现代码结构解释 2. TCP客户端实现代码结构解释 3. 测试1.编译&#xff1a;2.运行 结语 介绍 本文将通过一个简单的例子&#xff0c;介绍如何使用C实现一个基本的TCP服务器和客户端。这个例子展示了如何创建服务器…

Docker Swarm 内置的集群编排

在现代容器化应用中&#xff0c;容器编排&#xff08;Container Orchestration&#xff09;是至关重要的&#xff0c;它负责自动化容器的部署、扩展、负载均衡和管理。Docker Swarm 是 Docker 提供的原生集群管理和容器编排工具&#xff0c;允许用户通过 Docker CLI 在多个 Doc…

UE5实现角色二段跳

1.二段跳 首先如果不想使用UE中增强输入功能&#xff0c;可以在SetupPlayerInputComponent函数中绑定对应的操作&#xff0c;具体可以自行查找。如果使用增强输入&#xff0c;可以通过创建一个UE自带的第三人称模板C项目学习&#xff0c;假设当前项目是创建自UE第三人称模板项目…

在windows下安装windows+Ubuntu16.04双系统(下)

这篇文章的内容主要来源于这篇文章&#xff0c;为正式安装windowsUbuntu16.04双系统部分。在正式安装前&#xff0c;若还没有进行前期准备工作&#xff08;1.分区2.制作启动u盘&#xff09;&#xff0c;见《在windows下安装windowsUbuntu16.04双系统(上)》 二、正式安装Ubuntu …