需求
1、在linux日志文件/data/log/moreInfoRes.log中一直会产生如下JSON数据:
{"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"}
{"uid":"861848974414839801","nickname":"mick","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"abc@qq.com","mobile":"","reg_type":"102","last_login_time":"1494344580","reg_time":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"}
{"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"}
2、需要根据数据中的type字段分目录存储,,并且还要对type字段的值进行一定的处理,最终处理之后的数据需要存储到HDFS上的/moreInfoRes目录中。例如:
-
type:video_info 类型的数据需要存储到 /moreInfoRes/videoInfo 目录里面。
-
type:user_info 类型的数据需要存储到 /moreInfoRes/userInfo 目录里面。
-
type:gift_record 类型的数据需要存储到 /moreInfoRes/giftRecord 目录里面。
3、这边拦截器用 Search and Replace Interceptor + Regex Extractor Interceptor 可以实现,但是这边使用前者的话效率有点低,故采用 自定义Interceptor + Regex Extractor Interceptor 实现。
实现
鉴于此,可以使用 Exec Source + Custom Interceptor + Regex Extractor Interceptor + File Channel + HDFS Sink 来实现。官方文档如下:
Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceCustom Interceptor:
可参考其他 Interceptor 的实现Regex Extractor Interceptor:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#regex-extractor-interceptorFile Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelHDFS Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#hdfs-sink
创建工程
引入依赖
主要是 flume-ng-core 和 jackson 依赖,其他可不引入。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flume-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.4</version></dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba</groupId>-->
<!-- <artifactId>fastjson</artifactId>-->
<!-- <version>2.0.25</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>cn.hutool</groupId>-->
<!-- <artifactId>hutool-core</artifactId>-->
<!-- <version>5.8.27</version>-->
<!-- </dependency>--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies></project>
编写 Custom Interceptor
package com.example.flumedemo.interceptor;import com.example.flumedemo.constant.OptType;
import com.example.flumedemo.util.JsonUtil;
import com.example.flumedemo.util.NamingCaseUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.SearchAndReplaceInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;/*** 自定义拦截器,* 将字段的值按照指定操作转换,得到的结果替换原来的值。** @author liaorj* @date 2024/11/13*/
public class MyInterceptor implements Interceptor {private static final Logger logger = LoggerFactory.getLogger(SearchAndReplaceInterceptor.class);/*** json中需要处理的字段*/private final String jsonField;/*** 需要对字段的值进行什么操作*/private final String optType;private final Charset charset;public MyInterceptor(String jsonField, String optType, Charset charset) {this.jsonField = jsonField;this.optType = optType;this.charset = charset;}@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {try {logger.info("----event={}", JsonUtil.toJson(event));if (null == event || ArrayUtils.isEmpty(event.getBody())) {logger.info("----event or body is null");return event;}//将body转为map对象Map<String, Object> jsonObject;jsonObject = JsonUtil.toBean(event.getBody(), new TypeReference<Map<String, Object>>() {});logger.info("----jsonObject={}", jsonObject);//获取jsonField的值logger.info("----jsonField={}", this.jsonField);Object value = jsonObject.get(this.jsonField);logger.info("----jsonFieldValue={}", value);if (jsonObject.containsKey(this.jsonField)) {logger.info("----containsKey");} else {logger.info("----not containsKey");}logger.info("----jsonObject.keySet={}", jsonObject.keySet());if (jsonObject.keySet().contains(this.jsonField)) {logger.info("----keySet containsKey");} else {logger.info("----keySet not containsKey");}//如果含有下划线if (null != value) {String newValue = null;logger.info("-----opt={},code={}", this.optType, OptType.toCamelCase.getCode());logger.info("----opt equals={}", OptType.toCamelCase.getCode().equals(this.optType));if (OptType.toCamelCase.getCode().equals(this.optType)) {//将下划线字符串转为驼峰newValue = NamingCaseUtil.toCamelCase(value.toString());logger.info("----newValue={}", newValue);} else if (OptType.toKebabCase.getCode().equals(this.optType)) {//hutool和fastjson的类本地跑可以,上环境却用不了,执行到相关类就没有日志了,可能包冲突了,暂不用。
// newValue = NamingCase.toKebabCase(value.toString());} else if (OptType.toPascalCase.getCode().equals(this.optType)) {
// newValue = NamingCase.toPascalCase(value.toString());} else if (OptType.toUnderlineCase.getCode().equals(this.optType)) {newValue = NamingCaseUtil.toUnderlineCase2(value.toString());} else {newValue = value.toString();}//替换原来的值logger.info("----newValue2={}", newValue);jsonObject.put(this.jsonField, newValue);logger.info("----jsonObject2={}", jsonObject);event.setBody(JsonUtil.toJson(jsonObject).getBytes(charset));}} catch (Exception e) {throw new RuntimeException(e);}return event;}@Overridepublic List<Event> intercept(List<Event> events) {Iterator var2 = events.iterator();while (var2.hasNext()) {Event event = (Event) var2.next();this.intercept(event);}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {private String jsonField;private String optType;private Charset charset;public Builder() {this.charset = Charsets.UTF_8;}@Overridepublic void configure(Context context) {String jsonObjField = context.getString("jsonField");Preconditions.checkArgument(!StringUtils.isEmpty(jsonObjField), "Must supply a valid jsonField (may not be empty)");this.jsonField = jsonObjField;String optType = context.getString("optType");Preconditions.checkArgument(!StringUtils.isEmpty(optType), "Must supply a valid opt (may not be empty)");this.optType = optType;if (context.containsKey("charset")) {// May throw IllegalArgumentException for unsupported charsets.charset = Charset.forName(context.getString("charset"));}}@Overridepublic Interceptor build() {Preconditions.checkNotNull(this.jsonField, "jsonField required");Preconditions.checkNotNull(this.optType, "opt required");return new MyInterceptor(this.jsonField, this.optType, this.charset);}}/*public static void main(String[] args) {String str = "{\"send_id\":\"834688818270961664\",\"good_id\":\"223\",\"video_id\":\"14943443045138661356\",\"gold\":\"10\",\"timestamp\":1494344574,\"type\":\"gift_record\"}";MyInterceptor myInterceptor = new MyInterceptor("type", "toCamelCase", Charsets.UTF_8);Event event = new SimpleEvent();event.setBody(str.getBytes(StandardCharsets.UTF_8));Event result = myInterceptor.intercept(event);System.out.println(JsonUtil.toJson(JsonUtil.toBean(result.getBody(), new TypeReference<Map<String, Object>>() {})));}*/
}
package com.example.flumedemo.constant;/*** @author liaorj* @date 2024/11/13*/
public enum OptType {//将下划线方式命名的字符串转换为驼峰式。toCamelCase("toCamelCase"),//将驼峰式命名的字符串转换为短横连接方式。toKebabCase("toKebabCase"),//将下划线方式命名的字符串转换为帕斯卡式。toPascalCase("toPascalCase"),//将驼峰式命名的字符串转换为下划线方式toUnderlineCase("toUnderlineCase");private String code;OptType(String code) {this.code = code;}public String getCode() {return code;}
}
package com.example.flumedemo.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ser.FilterProvider;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;import java.io.IOException;/*** @author liaorj* @date 2024/10/24*/
public class JsonUtil {private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();public static byte[] toBytes(Object object) {try {return OBJECT_MAPPER.writeValueAsBytes(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static String toJson(Object object) {try {return OBJECT_MAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}/*** 排除字段,敏感字段或太长的字段不显示:身份证、手机号、邮箱、密码等* 参考:https://www.baeldung-cn.com/jackson-ignore-properties-on-serialization** @param object* @param excludeProperties* @return*/public static String toJson(Object object, String[] excludeProperties) {try {SimpleBeanPropertyFilter theFilter = SimpleBeanPropertyFilter.serializeAllExcept(excludeProperties);FilterProvider filterProvider = new SimpleFilterProvider().addFilter("myFilter", theFilter);OBJECT_MAPPER.setFilterProvider(filterProvider);return OBJECT_MAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static String toPrettyJson(Object object) {try {return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json, Class<T> clazz) {try {return OBJECT_MAPPER.readValue(json, clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(byte[] json, Class<T> clazz) {try {return OBJECT_MAPPER.readValue(json, clazz);} catch (IOException e) {throw new RuntimeException(e);}}public static <T> T toBean(byte[] json, TypeReference<T> typeReference) {try {return OBJECT_MAPPER.readValue(json, typeReference);} catch (IOException e) {throw new RuntimeException(e);}}
}
package com.example.flumedemo.util;import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** 字符串转化工具类* @author liaorj* @date 2024/11/14*/
public class NamingCaseUtil {private static Pattern linePattern = Pattern.compile("_(\\w)");/*** 下划线转驼峰*/public static String toCamelCase(String str) {str = str.toLowerCase();Matcher matcher = linePattern.matcher(str);StringBuffer sb = new StringBuffer();while (matcher.find()) {matcher.appendReplacement(sb, matcher.group(1).toUpperCase());}matcher.appendTail(sb);return sb.toString();}/*** 驼峰转下划线(简单写法,效率低于{@link #toUnderlineCase2(String)})*/public static String toUnderlineCase(String str) {return str.replaceAll("[A-Z]", "_$0").toLowerCase();}private static Pattern humpPattern = Pattern.compile("[A-Z]");/*** 驼峰转下划线,效率比上面高*/public static String toUnderlineCase2(String str) {Matcher matcher = humpPattern.matcher(str);StringBuffer sb = new StringBuffer();while (matcher.find()) {matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());}matcher.appendTail(sb);return sb.toString();}
}
打包
打包前可以使用MyInterceptor类的main函数测试下。
mvn clean
mvn package
打包好后,需要把当前jar包上传到linux上的flume目录下的lib目录中。
配置文件
然后在flume目录下的conf目录下创建配置文件:file-to-hdfs-customInterceptor.conf,内容如下,注意自定义拦截器所在包和HDFS主机ip要修改成自己的。
# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/moreInfoRes.log# Describe/configure the source interceptors
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.example.flumedemo.interceptor.MyInterceptor$Builder
a1.sources.r1.interceptors.i1.jsonField = type
a1.sources.r1.interceptors.i1.optType = toCamelCasea1.sources.r1.interceptors.i2.type = regex_extractor
a1.sources.r1.interceptors.i2.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i2.serializers = s1
a1.sources.r1.interceptors.i2.serializers.s1.name = logType# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.163.130:9000/moreInfoRes/%{logType}
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/moreInfoRes/checkpointDir
a1.channels.c1.dataDirs = /data/moreInfoRes/dataDirs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
切换到flume目录,执行:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-hdfs-customInterceptor.conf -Dflume.root.logger=INFO,console
测试结果
执行 hdfs dfs -ls -R / 命令查看 HDFS上 的 /moreInfoRes 目录文件信息,可以看到处理成功了: