Kafka+PostgreSql,构建一个总线服务

ops/2024/9/20 8:18:18/ 标签: kafka, postgresql, linq

之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。

环境安装

安装Kafka

官方文档:Apache Kafka,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。

我这里是在本地WSL 2环境下进行的安装,安装过程就参考官方文档的推荐流程即可

下载安装包

注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。

wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz

安装

tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0

这里安装完成后的路径是这样子的

重点关注的就是bin,config和logs这3个目录。

启动服务

官方提供了2中启动策略,一个是KRaft,一个是Zookeeper,我这里用的zookeeper

先启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

在启动kafka服务

bin/kafka-server-start.sh config/server.properties

后面的zookeeper.properties和server.properties是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid等等,长这样👇

启动后控制台的输出是这样

这样,一个kafka的服务节点就启动了。

对了,kafka是依赖java环境的,安装之前本地要安装jdk,我这里使用的是openjdk,也是ok的。

*端口转发(仅WSL2环境)

在WSL2环境下,需要配置下端口转发,不然宿主机连接不到broker,

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79

后面那个ip地址就写宿主机给WSL环境下发的地址

此外,宿主机和wsl环境都放开9092(或者你设置的)端口

链接测试

这里有很多客户端的ui工具或者插件可以连接Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令

本地开发的话,我这里用的vs code的tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可

至此,一个本地的Kafka节点就基本配置完成了

安装PostgreSql

这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种GPT

也可以用docker,快速部署一个节点做本地的测试。

docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

开发测试

新建项目

这里因为我是用的IDE做开发,所以直接创建个web项目就好,也可以用命令行来创建。

总之创建完成后,我的项目长这样

安装依赖

我这里是用的是dotnet.cap这个系列组件,然后为了测试方便,数据库的orm适用的是dapper,主要是图快,大家实际项目中可以用习惯的orm就好。

这里我的项目文件长这样

<Project Sdk="Microsoft.NET.Sdk.Web"><PropertyGroup><TargetFramework>net8.0</TargetFramework><Nullable>enable</Nullable><ImplicitUsings>enable</ImplicitUsings></PropertyGroup><ItemGroup><PackageReference Include="Dapper" Version="2.1.35" /><PackageReference Include="DotNetCore.CAP" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" /><PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" /></ItemGroup></Project>

注入服务

这里主要注入pg和Kafka

builder.Services.AddCap(x =>
{x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");x.UseKafka("localhost:9092");x.UseDashboard();
});

测试的业务代码

在常规的controller中注入服务

public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{/*业务代码*/
}
//上面这是最新的写法,以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{private ICapPublisher _capPublisher;public Values2Controller(ICapPublisher capPublisher){_capPublisher = capPublisher;}
}

写一个生产者接口

public async Task<IActionResult> Producer()
{Console.WriteLine("生产者发布消息: " + DateTime.Now);await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);return Ok();
}

再写一个延时发送消息的生产者接口

public async Task<IActionResult> ProducerDelay()
{Console.WriteLine("生产者发布延时消息: " + DateTime.Now);await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);return Ok();
}

创建消费者

[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{Console.WriteLine("订阅到消息: " + value);
}

我们访问下接口看下控制台的打印效果

可以看到,订阅到的时间和生产者发送的实际是一致的。

再试下延时发送

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

再看下PostG里保存的消息记录

这是生产记录

这是消费记录

注意,在CAP的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如7天,一个月,到期后,这些持久化的数据就会自动清除掉。

CAP的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了CAP还有MediatR,MassTransit这类组件,也可以轻松实现消息总线的机制。

好了,到此我们的测试就结束了,从安装Kafka,到创建这个新项目并跑通这个测试服务,也就2个小时,所以,这个迁移成本应该还是非常高效的。

小总结

实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是RabbitMQ和SQL Server,RabbitMQ还好,SQL Server在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而PostgreSql是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。

至于Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ,也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自Kafka。而且就生态环境而言,无论国内还是国外,Kafka都是遥遥领先,对dotnet框架的支持,Kafka也远比RocketMQ更好(RocketMQ更多的还是用在java环境里),所以我们再选型的时候,优先考虑的还是Kafka。

更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个AI问一下。

好了,就这些吧。


http://www.ppmy.cn/ops/113317.html

相关文章

C++返回值优化(Return Value Optimization, RVO)与移动语义(Move Semantics)

在C编程中&#xff0c;返回值优化&#xff08;Return Value Optimization, RVO&#xff09;与移动语义&#xff08;Move Semantics&#xff09;是提高程序效率、减少不必要的对象复制的重要机制。 一、返回值优化&#xff08;RVO&#xff09; 基本概念 返回值优化是一种编译器…

Milvus - 构建向量数据库并进行相似度查询

向量相似度检索在大规模数据分析和机器学习应用中是一个非常关键的任务&#xff0c;特别是在处理文本、图像或其他嵌入向量时。Milvus 是一个高性能的开源向量数据库&#xff0c;专为存储和检索大规模向量数据设计。本文将介绍如何在 Docker 中安装 Milvus&#xff0c;并展示如…

GO主流开源框架

GO主流开源框架 Go 语言有着丰富的开源框架生态&#xff0c;涵盖了多种应用场景&#xff0c;如 Web 开发、数据库操作、微服务、日志处理等。以下是一些常见的 Go 框架及其典型作用场景&#xff1a; 1. Web 框架 Gin: 作用&#xff1a;一个高性能的轻量级 Web 框架&#xff…

今天不写项目,聊聊后端面试吧

首先感谢大家之前的观看呀~兄弟们~ 这边把我去过几家公司面试的题目都写一下哈&#xff0c;像我大二下&#xff0c;就是前两个月7-9进了公司进行后端实习&#xff0c;哎.....反正就是学学学..话不多说~ 1.Frist 1.HashMap实现原理 HashMap是基于哈希表的Map接口的非同步实现…

网站在线客服插件配置

使用工具&#xff1a;百度爱番番 下载地址&#xff1a; 百度爱番番—企业的一站式智能营销管家 一、下载百度爱番番APP&#xff0c;注册账号 二、 登录app 三、点击设置——站点设置——新建站点 四、设置站点名称——站点地址——PC站点——确定 五、点击配置好的站点的获取代…

leetcode73矩阵置零

思路 想到的就是需要一个数组来记录是不是这行或者这列是不是有零&#xff0c;然后最后再扫描一遍这个矩阵 题解 借助第0行第0列来记录这个行是不是有0&#xff0c;这个列是不是有0 另外&#xff0c;这个矩阵不大&#xff0c;所以可能有重复的置0应该也没事。 class Soluti…

力扣232:用栈实现队列

请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 MyQueue 类&#xff1a; void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除并返回元素int peek() 返回队列开头…

签署《AI安全国际对话威尼斯共识》 智源持续推动人工智能安全发展

近日&#xff0c;由AI安全国际论坛&#xff08;Safe AI Forum&#xff09;和博古睿研究院&#xff08;Berggruen Institute) 共同举办的第三届国际AI安全对话&#xff08;International Dialogues on AI Safety&#xff09;在威尼斯举办。图灵奖得主Yoshua Bengio、姚期智教授&…

一、编译原理(引论)

目录 【一】、引论 一、编译器 1、编译器 2、编译器与解释器 3、编译器结构 【一】、引论 一、编译器 1、编译器 &#xff08;1&#xff09;编译器&#xff1a;将人类易懂的 高级语言 翻译成 硬件可执行的目标机器语言 &#xff08;2&#xff09; 高级语言 ⚫ 直接面…

聊一聊测试用例的重要性

对于测试从业人员&#xff0c;测试用例术语应该不会陌生&#xff0c;在工作中用到的概率就像医生的药方&#xff0c;厨师心中的菜配方等等。 不过前者对项目组内人员都是公开的&#xff0c;后者的药方和配方大概率不会公开&#xff1b;前者项目内公开为了让测试用例覆盖率更高…

网络安全(黑客技术)2024年三个月自学计划

&#x1f91f; 基于入门网络安全/黑客打造的&#xff1a;&#x1f449;黑客&网络安全入门&进阶学习资源包 前言 什么是网络安全 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”…

如何为子域名配置 Nginx 反向代理到 Flask 应用

在这篇博客中&#xff0c;我将介绍如何为你的域名添加子域名&#xff0c;并使用 Nginx 反向代理将子域名请求转发到 Flask 应用。我们将以子域名 app1.example.com 为例&#xff0c;并通过 Nginx 将请求转发到 Flask 应用的 5000 端口。 1. 前提条件 你已经拥有一个域名&…

向日葵好用吗?4款稳定的远程控制软件推荐。

远程控制技术现在已经被应用于很多个领域&#xff0c;像企业办公&#xff0c;远程协助&#xff0c;智能家居&#xff0c;工业控制等等。我们常常会用到的时前两种。而实现远程控制的方式也有多种&#xff0c;但是最方便高效的还是使用第三方软件。我最常使用的是向日葵&#xf…

Flutter - Win32程序是如何执行main函数

Win32程序的主体结构 int APIENTRY wWinMain(_In_ HINSTANCE instance, _In_opt_ HINSTANCE prev,_In_ wchar_t *command_line, _In_ int show_command) {// Attach to console when present (e.g., flutter run) or create a// new console when running with a debugger.if …

Linux 防火墙:Firewalld 常用命令行操作命令

firewalld命令行操作管理 按增删改查分类&#xff0c;前面加上 firewall-cmd &#xff1a; ### 查询操作--get-default-zone 查看当前默认区域 --get-zones 查看所有可用的区域 --get-active-zones …

科技赋能司法:易保全如何重塑法律文书签署与庭审流程

在这个数字化飞速发展的时代&#xff0c;司法领域也迎来了前所未有的变革。随着人工智能、区块链与互联网技术的深度融合&#xff0c;正以前所未有的力量变革着司法服务的格局。 易保全通过将“区块链人工智能互联网司法”相融合&#xff0c;推动公证系统逐步迈向智能化、高效…

C++学习笔记 —— 内存分配 new

//创建数值 int *pi new int; //pi指向动态分配的&#xff0c;未初始化的无名对象 delete pi; int *pi new int(10); //pi指向动态分配的&#xff0c;初始化10 delete pi;//创建数组 int *a new int[5]; //创建一个数组&#xff0c;未初始化数值 delete []a; // new 和 de…

二叉树进阶--AVL树

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 二叉树进阶--AVL树 收录于专栏 [C进阶学习] 本专栏旨在分享学习C的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 前提提示: 1 AVL树的…

Leetcode Hot 100刷题记录 -Day17(搜索二维矩阵II)

搜索二维矩阵II 问题描述&#xff1a; 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,4,7,11,15],[2,5,8,…

win/mac常用命令

这里写目录标题 windows&#xff08;powershell&#xff09;mac windows&#xff08;powershell&#xff09; Get-ChildItem | ForEach-Object { $_.Name }&#xff1a;打印当前目录中所有文件的名字Get-ChildItem | ForEach-Object { $_.Name } > file_list.txt&#xff1…