Spring Cloud Data Flow快速入门Demo

news/2024/11/23 8:07:16/

1.什么是Spring Cloud Data Flow?

Spring Cloud Data Flow 是一个用于构建和编排数据处理流水线的云原生框架。它提供了一种简化的方式来定义、部署和管理数据处理任务和流应用程序。以下是一些关键特性和组件:

关键特性

  1. 流处理

    • 支持实时数据流处理,可以通过定义源、处理器和接收器来构建数据流。
  2. 批处理任务

    • 支持批处理任务的调度和执行,适用于需要定期运行的任务。
  3. 可扩展性

    • 支持多种数据处理引擎,如 Apache Kafka、RabbitMQ 和 Apache Spark,允许用户根据需求选择合适的技术栈。
  4. 可视化界面

    • 提供基于 Web 的用户界面,用户可以通过拖放组件来设计数据流和任务。
  5. 监控和管理

    • 提供监控和管理工具,可以查看应用程序的运行状态、日志和指标。

组件

  1. Spring Cloud Data Flow Server

    • 核心组件,负责管理和协调数据流和任务的部署。
  2. Skipper

    • 用于应用程序的版本管理和滚动更新,确保在更新过程中最小化停机时间。
  3. 数据流应用程序

    • 由 Spring Boot 构建的微服务应用程序,分为源、处理器和接收器三种类型。
  4. 任务应用程序

    • 用于执行一次性任务或批处理作业。

Spring Cloud Data Flow 适用于需要处理大规模数据流和批处理任务的场景,特别是在分布式系统和云环境中。它简化了数据管道的开发和管理,使开发者能够专注于业务逻辑的实现

2.环境搭建

docker-compose.yml

version: '3'services:mysql:image: mysql:5.7.25container_name: dataflow-mysqlenvironment:MYSQL_DATABASE: dataflowMYSQL_USER: rootMYSQL_ROOT_PASSWORD: rootpwexpose:- 3306ports:- "3306:3306"volumes:- ./my.cnf:/etc/mysql/my.cnfkafka-broker:image: confluentinc/cp-kafka:5.3.1container_name: dataflow-kafkaexpose:- "9092"environment:- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_ADVERTISED_HOST_NAME=kafka-broker- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1depends_on:- zookeeperzookeeper:image: confluentinc/cp-zookeeper:5.3.1container_name: dataflow-kafka-zookeeperexpose:- "2181"environment:- ZOOKEEPER_CLIENT_PORT=2181dataflow-server:image: springcloud/spring-cloud-dataflow-server:2.6.3container_name: dataflow-serverports:- "9393:9393"environment:- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=PLAINTEXT://kafka-broker:9092- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.streams.binder.brokers=PLAINTEXT://kafka-broker:9092- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=zookeeper:2181- spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.streams.binder.zkNodes=zookeeper:2181- spring.cloud.skipper.client.serverUri=http://skipper-server:7577/api- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow- SPRING_DATASOURCE_USERNAME=root- SPRING_DATASOURCE_PASSWORD=rootpw- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driverdepends_on:- kafka-brokerentrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-dataflow-server.jar"volumes:- ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf}app-import:image: springcloud/openjdk:2.0.0.RELEASEcontainer_name: dataflow-app-importdepends_on:- dataflow-servercommand: >/bin/sh -c "./wait-for-it.sh -t 180 dataflow-server:9393;wget -qO- 'http://dataflow-server:9393/apps' --post-data='uri=${STREAM_APPS_URI:-https://dataflow.spring.io/kafka-maven-latest&force=true}';echo 'Stream apps imported'wget -qO- 'http://dataflow-server:9393/apps' --post-data='uri=${TASK_APPS_URI:-https://dataflow.spring.io/task-maven-latest&force=true}';echo 'Task apps imported'"skipper-server:image: springcloud/spring-cloud-skipper-server:2.5.2container_name: skipperports:- "7577:7577"- "20000-20105:20000-20105"environment:- SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_LOW=20000- SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_HIGH=20100- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow- SPRING_DATASOURCE_USERNAME=root- SPRING_DATASOURCE_PASSWORD=rootpw- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driverentrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-skipper-server.jar"volumes:- ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf}

启动

docker-compose -f .\docker-compose.yml up -d

dashboard

http://localhost:9393/dashboard/

dashboard

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springcloud-demo(Spring Cloud Data Flow)

3.使用指南

Task

任务处理用于一次性或批量数据处理,适合处理需要在特定时间点或周期性执行的任务。任务应用程序通常具有以下特性:

  1. 一次性执行

    • 任务在被触发时执行一次,完成后即停止。
    • 适用于需要定期运行的批处理作业,如数据迁移、报告生成和数据清理。
  2. 批处理支持

    • 可以处理大量数据,通常与 Spring Batch 集成以支持复杂的批处理需求。
    • 支持事务管理、重试机制和并行处理。

任务应用程序适用于需要在后台执行的长时间运行作业,特别是在需要处理大量数据的情况下

任务列表

task1

创建任务

task2

streams

流处理主要用于实时数据处理,适合处理持续不断的数据流。流应用程序通常由以下三种组件组成:

  1. 源(Source)

    • 负责从外部系统(如消息队列、数据库、文件系统等)读取数据并将其发送到流中。
    • 例如,从 Kafka 主题中读取消息。
  2. 处理器(Processor)

    • 接收源发送的数据,对其进行处理或转换,然后将结果发送到下一个组件。
    • 例如,对数据进行过滤、聚合或格式转换。
  3. 接收器(Sink)

    • 负责将处理后的数据输出到目标系统(如数据库、文件系统、消息队列等)。
    • 例如,将处理后的数据写入到数据库表中。

流应用程序通常用于需要低延迟和高吞吐量的场景,如实时数据分析、事件驱动架构和物联网数据处理。

streams列表

 

stream创建

4.引用

  • Spring Cloud Data Flow
  • Spring Cloud Data Flow快速入门Demo | Harries Blog™

http://www.ppmy.cn/news/1549247.html

相关文章

Postman之数据提取

系列文章目录 1.Postman之安装及汉化基本使用介绍 2.Postman之变量操作 3.Postman之数据提取 4.Postman之pm.test断言操作 5.Postman之newman Postman之数据提取 1. 提取请求头\request中的数据2. 提取响应消息\response中的数据3. 通过正在表达式提取4. 提取cookies数据 本文主…

【大数据知识】ClickHouse入门

ClickHouse入门 概述一、主要应用场景二、技术特点三、性能表现四、限制与不足五、使用建议 分布式架构一、架构特点二、核心组件三、数据组织方式四、分布式查询原理五、优势与局限性 核心架构一、ClickHouse执行过程架构二、ClickHouse数据存储架构 为什么速度这么快存储层&a…

时序论文23|ICML24谷歌开源零样本时序大模型TimesFM

论文标题:A DECODER - ONLY FOUNDATION MODEL FOR TIME - SERIES FORECASTING 论文链接:https://arxiv.org/abs/2310.10688 论文链接:https://github.com/google-research/timesfm 前言 谷歌这篇时间序列大模型很早之前就在关注&#xff…

【Python入门第九讲】 集合(set)

_集合(Set)_是 Python 中的一种基本数据结构,它是由不重复元素组成的无序集合。集合对象支持多种数学运算,如并集、交集、差集和对称差等。 集合的特点包括: 无序性: 集合中的元素没有顺序,不能…

Java二分查找+冒泡排序

二分查找在编程中是用来查找目标元素在有序数组中的位置,并返回目标元素的索引 先给定一个有序数组,在创建一个方法来进行二分 主要思想是:根据数组具有下标的特点来分别计算,最左边的索引,以及最右边的索引,在判断目标元素与中间元素的大小,如果目标元素小于中间元素,我们可…

呼叫中心大模型的各项应用?

呼叫中心大模型的各项应用? 作者:开源呼叫中心系统 FreeIPCC,Github地址:https://github.com/lihaiya/freeipcc 呼叫中心大模型主要指的是在呼叫中心系统中应用的大型机器学习模型,特别是深度学习领域中的大型神经网络…

Linux 上查看和转换 .bin 文件为二进制格式

1. 查看 .bin 文件内容 使用 hexdump 查看二进制文件 hexdump 是一个非常有用的工具,可以将二进制文件转换为十六进制格式,便于查看。 hexdump -C filename.bin 将其内容以十六进制格式导出并保存到 bluetooth_hex.txt 文件中,可以执行以…

SAP PI/PO Proxy2JDBC SQL_QUERY动态接口示例

目录 背景: 完整demo步骤: IR: ID: SPROXY: 测试代码: 注意点: 背景: 中途临时帮客户项目做其他功能,项目上有部分开发项需要通过PO去第三方数据库取数,项目上的开发对PO不太熟&#xf…