Flink消费Kafka实时写入Doris

ops/2024/10/22 17:11:16/

本文模拟实际生产环境,通过FileBeat采集日志信息到Kafka,再通过Flink消费Kafka实时写入Doris。

文章目录

    • Filebeat采集日志到Kafka
    • Flink消费Kafka实时写入Doris
    • 总结

在这里插入图片描述

Filebeat采集日志到Kafka

常见的日志采集工具有以下几种:Flume、Logstash和Filebeat

  • Flume采用Java编写,它是一个分布式、高度可靠且高度可用的工具,旨在高效地搜集、汇总和转移大量日志数据,该工具拥有一个简洁且灵活的流数据流架构,它配备了可调节的可靠性机制、故障切换以及恢复功能,此外,Flume通过简单且可扩展的数据模型支持在线分析应用程序。
  • Logstash是一个开源的日志管理和分析工具,它能够从多个数据源收集数据,对数据进行转换和清洗,并将处理后的数据传输到目标系统。
  • Filebeat是一款go语言编写的日志文件收集工具,当在服务器上部署其客户端后,它会持续监听特定的日志目录或日志文件,实时跟踪并读取这些文件的更新内容,并将这些数据发送到指定的输出目标,例如Elasticsearch或Kafka等。

这里选择Filebeat进行日志采集的主要原因在于其资源消耗极低,相较于Flume和Logstash,Filebeat占用的内存最少,对CPU的负载也最小。它的运行进程十分稳定,很少出现崩溃或宕机的情况。

首先下载Filebeat

java">curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-linux-x86_64.tar.gz

在这里插入图片描述
解压缩文件

java">tar xzvf filebeat-8.12.0-linux-x86_64.tar.gz

进入目录

java">cd filebeat-8.12.0-linux-x86_64

编写配置文件接入Kafka

java">vim filebeat.yaml

filebeat.yaml的文件内容

filebeat.inputs:
- type: logpaths:- /doc/input/*.log  # 更换为你的日志文件路径
processors:- include_fields:fields: ["message"]
output.kafka:# 更换为你的Kafka地址和主题.hosts: ["192.168.235.130:9092"]topic: k2ggcodec:format:string: '%{[message]}'

运行Filebeat采集日志

java">./filebeat -e -c ./filebeat.yaml

在这里插入图片描述

这是log日志的信息,现要求保持原始格式发送到Kafka
在这里插入图片描述Filebeat采集日志信息发送到Kafka的主题,消费者收到的信息如下,Filebeat会添加一些自带的数据,比如时间戳和元数据等,但是一般情况下只需要采集message里面的信息,通过filebeat.yaml中的processors和codec即可实现。
在这里插入图片描述processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
消费者消费原始格式的日志消息
在这里插入图片描述

Flink消费Kafka实时写入Doris

在写入之前,建立doris的数据表用于接收消费的信息

java">CREATE TABLE transactions (timestamp datetime,user_id INT,transaction_type VARCHAR(50),amount DECIMAL(15, 2),currency CHAR(3),status VARCHAR(20),description TEXT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES("replication_num"="1");

引入依赖

java">   <dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>24.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version></dependency>

主程序

java">package flink;import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;import java.util.Properties;public class DorisWrite {public static void main(String[] args) throws Exception {Properties props = new Properties();//Kafka broker的地址props.put("bootstrap.servers", "192.168.235.130:9092");props.put("group.id", "test");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//指定消费的主题FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("k2gg",new SimpleStringSchema(),props);DorisSink.Builder<String> builder = DorisSink.builder();DorisOptions.Builder dorisBuilder = DorisOptions.builder();//Doris的地址以及账号密码等信息dorisBuilder.setFenodes("192.168.235.130:8030").setTableIdentifier("test.transactions").setUsername("root").setPassword("1445413748");Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("read_json_by_line", "true");DorisExecutionOptions  executionOptions = DorisExecutionOptions.builder().setLabelPrefix("label-doris12"+System.currentTimeMillis()) //streamload label prefix,.setStreamLoadProp(pro).build();builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionOptions).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer);// 将Kafka数据转换为JSON格式DataStream<String> jsonStream = dataStreamSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println("value"+value);// 分割字符串String[] parts = value.split(",");// 创建JSON字符串StringBuilder jsonString = new StringBuilder();jsonString.append("{");jsonString.append("\"timestamp\":\"").append(parts[0]).append("\",");jsonString.append("\"user_id\":").append(parts[1]).append(",");jsonString.append("\"transaction_type\":\"").append(parts[2]).append("\",");jsonString.append("\"amount\":").append(parts[3]).append(",");jsonString.append("\"currency\":\"").append(parts[4]).append("\",");jsonString.append("\"status\":\"").append(parts[5]).append("\",");jsonString.append("\"description\":\"").append(parts[6].replace("\"", "")).append("\"");jsonString.append("}");return jsonString.toString();}});jsonStream.print();jsonStream.sinkTo(builder.build());env.execute("flink kafka to doris by datastream");}
}

运行主程序通过Flink消费Kafka的信息写入doris
在这里插入图片描述log日志的信息
在这里插入图片描述
登录Doris进行验证

java">mysql -h k8s-master -P 9030 -uroot -p

这是没运行主程序之前doris的数据,没有2024-10-15这一天的数据。

java">select * from transactions where date(timestamp) = "2024-10-15";

在这里插入图片描述
运行主程序之后,Flink将Kafka主题的信息实时写入Doris。
在这里插入图片描述

总结

1.Filebeat格式问题
Filebeat采集日志格式会添加一些自带的额外信息,一般情况下只需要message里面的字段信息,那么yaml文件配置processors和codec属性即可。processors处理只保留 message 的字段信息,其他字段将被丢弃,codec用于定义数据的编码格式,将 message 字段的值作为字符串发送到 kafka,这样就可以保留日志信息的原始格式发送到Kafka。
2.Flink消费Kafka失败
Flink在消费Kafka主题的过程中,不要往该主题发送其他格式的数据,否则会解析失败,尽量新建一个新主题来接收Filebeat采集过来的日志信息。如果还是执行失败,可以尝试在setLabelPrefix添加一个时间戳,这样保证每次生成的标签前缀都不一样,这是因为客户端会生成一个唯一的标签来标识这次导入Doris的操作,Doris服务器会根据这个标签来跟踪导入的进度和状态,如果导入过程中出现问题,Doris会保留失败的数据,客户端就可以通过标签重新导入这些数据。
3.实时写入Doris失败
Flink处理字段的数据类型要与Doris匹配,可以参考官方文档Doris 和 Flink 列类型映射关系。


http://www.ppmy.cn/ops/127618.html

相关文章

Python基础和理论学习

Python作为一种高级编程语言&#xff0c;以其简洁的语法和广泛的应用&#xff0c;成为许多开发者和分析师首选的语言。无论是用来进行数据分析、机器学习、Web开发还是自动化任务&#xff0c;Python都具有强大的功能。本文将深入探讨Python的基础知识和理论&#xff0c;以帮助你…

计算机组成原理一句话

文章目录 计算机系统概述存储系统 计算机系统概述 指令和数据以同等地位存储在存储器中&#xff0c;形式上没有差别&#xff0c;但计算机应能区分他们。通过指令周期的不同阶段。 完整的计算机系统包括&#xff0c;1&#xff09;软件系统&#xff1a;程序、文档和数据&#xff…

vue day07(自定义创建项目)

基于 VueCli 自定义创建项目 1. ESLint 代码规范 如果代码不符合规范&#xff0c;ESLint 会提醒 解决方法&#xff1a; 手动修正 不认识命令行中报错语法是什么意思就去【ESLint 规则表】中查 自动修正 基于 vscode 插件 ESLint 高亮错误&#xff0c;并通过配置自动修复错误 自…

关于Qt中进行输出的方式及对比分析

本文详细介绍了在Qt框架和C标准库中可用的多种输出流方式&#xff0c;主要分为标准C输出和Qt输出两大类。 摘要 标准C输出 std::cout&#xff1a;用于标准输出&#xff0c;支持多种数据类型&#xff0c;是C最常用的输出流。结合std::string、QString&#xff08;需转换&#…

【Linux】进程优先级进程切换

文章目录 进程优先级查看进程优先级进程优先级的修改 进程切换进程切换的概念 总结 进程优先级 进程优先级是操作系统中用于决定进程调度顺序的重要属性。它表示一个进程在系统资源分配和 CPU 调度中的相对重要性。优先级越高的进程通常会获得更多的 CPU 时间和资源&#xff0…

讲一讲Redis五大数据类型的底层实现

讲一讲Redis五大数据类型的底层实现 Redis五大数据类型的底层实现 Redis的五大数据类型分别是字符串&#xff08;String&#xff09;、列表&#xff08;List&#xff09;、哈希&#xff08;Hash&#xff09;、集合&#xff08;Set&#xff09;和有序集合&#xff08;Zset&…

薪资管理系统原型PC端+移动端 Axure原型 交互设计 Axure实战项目

薪资管理系统原型PC端移动端 Salary Management System Prototype 薪资管理系统原型图是一种以图形化方式展示系统界面和功能交互的设计图形。该原型图旨在呈现薪资管理系统的整体架构、界面布局和用户交互流程&#xff0c;为开发团队和利益相关者提供一个清晰而具体的概念。…

ansible————ansible的文件管理

一、ansible文件管理常用的模块 file模块&#xff1a;创建文件/目录&#xff0c;删除/目录文件等 copy模块&#xff1a;将控制节点的文件送到被管理主机上 lineinfile模块&#xff1a;向文件输入内容 stat模块&#xff1a;显示文件的状态信息 fetch模块&#xff1a;从被管理…