Spring Cloud Stream实现数据流处理

embedded/2024/11/19 6:31:52/

1.什么是Spring Cloud Stream?

我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

module

我们很容易总结出每个模块的流程:

  1. 从上一个模块拉取数据
  2. 处理数据
  3. 将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

2.环境准备

采用docker-compose搭建kafaka环境

version: '3'networks:kafka:ipam:driver: defaultconfig:- subnet: "172.22.6.0/24"services:zookepper:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latestcontainer_name: zookeeper-serverrestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_ANONYMOUS_LOGIN: yesports:- "2181:2181"networks:kafka:ipv4_address: 172.22.6.11kafka:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1container_name: kafkarestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_PLAINTEXT_LISTENER: yesKAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092ports:- "9092:9092"depends_on:- zookeppernetworks:kafka:ipv4_address: 172.22.6.12kafka-map:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-mapcontainer_name: kafka-maprestart: unless-stoppedvolumes:- "./kafka/kafka-map/data:/usr/local/kafka-map/data"environment:DEFAULT_USERNAME: adminDEFAULT_PASSWORD: 123456ports:- "9080:8080"depends_on:                         - kafkanetworks:kafka:ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 访问:http://127.0.0.1:9080
  • 账号密码:admin/123456

3.代码工程

stream

 实验目标

  1. 生成UUID并将其发送到Kafka主题batch-in
  2. batch-in主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out主题。
  3. 监听batch-out主题并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springcloud-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>spring-cloud-stream-kafaka</artifactId><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>

处理流

/** Copyright 2023 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      https://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.et;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;/*** @author Steven Gantz*/
@SpringBootApplication
public class CloudStreamsFunctionBatch {public static void main(String[] args) {SpringApplication.run(CloudStreamsFunctionBatch.class, args);}@Beanpublic Supplier<UUID> stringSupplier() {return () -> {var uuid = UUID.randomUUID();System.out.println(uuid + " -> batch-in");return uuid;};}@Beanpublic Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {return idBatch -> {System.out.println("Removed digits from batch of " + idBatch.size());return idBatch.stream().map(UUID::toString)// Remove all digits from the UUID.map(uuid -> uuid.replaceAll("\\d","")).map(noDigitString -> MessageBuilder.withPayload(noDigitString).build()).toList();};}@KafkaListener(id = "batch-out", topics = "batch-out")public void listen(String in) {System.out.println("batch-out -> " + in);}}
  • 定义一个名为stringSupplier的Bean,它实现了Supplier<UUID>接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in主题。
  • 定义一个名为digitRemovingConsumer的Bean,它实现了Function<List<UUID>, List<Message<String>>>接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。
  • 使用@KafkaListener注解定义一个Kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

配置文件

spring:cloud:function:definition: stringSupplier;digitRemovingConsumerstream:bindings:stringSupplier-out-0:destination: batch-indigitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: truedigitRemovingConsumer-out-0:destination: batch-outkafka:binder:brokers: localhost:9092bindings:digitRemovingConsumer-in-0:consumer:configuration:# Forces consumer to wait 5 seconds before polling for messagesfetch.max.wait.ms: 5000fetch.min.bytes: 1000000000max.poll.records: 10000000

参数解释

spring:cloud:function:definition: stringSupplier;digitRemovingConsumer
  • spring.cloud.function.definition:定义了两个函数,stringSupplierdigitRemovingConsumer。这两个函数将在应用程序中被使用。
    stream:bindings:stringSupplier-out-0:destination: batch-in
  • stream.bindings.stringSupplier-out-0.destination:将stringSupplier函数的输出绑定到Kafka主题batch-in
        digitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: true
  • stream.bindings.digitRemovingConsumer-in-0.destination:将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in
  • group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。
  • consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。
        digitRemovingConsumer-out-0:destination: batch-out
  • stream.bindings.digitRemovingConsumer-out-0.destination:将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springcloud-demo(Spring Cloud Stream)

4.测试

启动弄Spring Boot应用,可以看到控制台输出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

5.引用

  • https://github.com/spring-cloud/spring-cloud-stream-samples
  • Spring Cloud Stream 3.X - coderZoe的博客
  • Spring Cloud Stream实现数据流处理 | Harries Blog™

http://www.ppmy.cn/embedded/138708.html

相关文章

hadoop3.x 新特性

hadoop3.x 新特性 FeaturesHadoop 2.xHadoop 3.xMinimum Required Java VersionJDK 6 and above.JDK 8 is the minimum runtime version of JAVA required to run Hadoop 3.x as many dependency library files have been used from JDK 8.Fault ToleranceFault Tolerance is …

微服务day11-微服务面试

分布式事务 CAP和BASE AT模式的脏写问题 TCC模式 最大努力通知 注册中心 环境隔离 分级模型 Eureka与Nacos 远程调用 切换负载均衡算法 服务保护 线程隔离 滑动窗口算法 漏桶算法 令牌桶算法

【QT常用技术讲解】QSettings把中文输入到配置文件

前言 在 QT 中&#xff0c;使用 QSettings 时&#xff0c;默认是将字符串以 Unicode 格式存储&#xff0c;而不是以 UTF-8 编码直接写入配置文件。因为涉及到配置文件&#xff0c;有些时候&#xff0c;配置信息由界面端进行写操作&#xff0c;而后台服务进程进行读取并处理&…

Maven的下载安装及配置

一、下载Maven 1、访问Maven官网&#xff1a; 打开浏览器&#xff0c;访问Maven的官方网站&#xff1a;Download Apache Maven – Maven 2、选择Maven版本&#xff1a; 在下载页面上&#xff0c;选择适合您操作系统的Maven版本。通常&#xff0c;Maven提供二进制zip归档和tar…

Eclipse 任务管理

Eclipse 任务管理 Eclipse 是一个广泛使用的集成开发环境&#xff08;IDE&#xff09;&#xff0c;它提供了强大的任务管理功能&#xff0c;帮助开发人员有效地组织和管理他们的工作。本文将详细介绍 Eclipse 任务管理系统的功能和使用方法&#xff0c;以及如何利用它来提高开…

npm上传自己封装的插件(vue+vite)

一、npm账号及发包删包等命令 若没有账号&#xff0c;可在npm官网&#xff1a;https://www.npmjs.com/login 进行注册。 在当前项目根目录下打开终端命令窗口&#xff0c;常见命令如下&#xff1a; 1、登录命令&#xff1a;npm login&#xff08;不用每次都重新登录&#xff0…

已有账号,重装系统激活office后发现没有ppt,word,excel等

有时候重装系统后&#xff0c;登录windows结果右键没有word,excel等 点击进入office 进入右边的账户 找到设备和订阅 直接下载office 安装后就会出现了

matlab 读取csv

需要跳过第一行表头等信息 1、读取整个文件 csvread(FILENAME)%文件路径 文件名2、指定起始位置 csvread(FILENAME, R, C)%从文件的第R行和第C列开始读取数据 逗号分开3、指定数据范围 csvread(FILENAME, R, C, [R1 C1 R2 C2])%读取从(R1, C1)到(R2, C2)范围内的数据注意&am…