ELK+kafka日志采集

server/2024/10/21 11:41:57/

ElasticSeach(存储日志信息)
Logstash(搬运工)
Kibana 连接ElasticSeach图形化界面查询日志

ELK采集日志的原理:

  1. 在每个服务器上安装Logstash
  2. Logstash需要配置固定读取某个日志文件
  3. Logstash将日志文件格式化为json的格式输出到es中
  4. 开发者使用Kibana连接到ElasticSeach 查询存储日志内容

为什么将日志存储在ElasticSeach
其底层使用到倒排索引 搜索效率高

为什么需要使用elk+kafka
如果单纯的使用elk的话,服务器节点扩容时需要在每个服务器上安装 Logstash 步骤十分冗余。
Logstash读取本地日志文件,可能会对本地的磁盘io性能会有一定影响。

elkkafka_15">elk+kafka采集日志的原理:

  1. springboot项目基于aop的方式拦截系统中日志
  2. 将该日志投递到 kafka 中,该过程一定要采用异步的形式
  3. Logstash 订阅 kafka 的主题获取日志消息内容
  4. 在将日志消息内容输出到es中存放
  5. 开发者使用Kibana连接到ElasticSeach 查询存储日志内容

logstash

Logstash是一个开源数据收集引擎,具有实时管道功能。
Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地

进入 logstash 目录,执行命令安装输入输出插件

bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch

添加配置文件:logstash/config/kafka.conf

# 输入
input {kafka {bootstrap_servers => "192.168.10.110:9091"topics => "主题名称"}
}
# 过滤排除一些不需要写入的日志
filter {#Only matched data are send to output.
}
# 输出
output {elasticsearch {action => "index"          #The operation on EShosts  => "192.168.10.110:9200"   #ElasticSearch host, can be array.index  => "索引名称"         #The index to write data to.}
}

启动logstash:./logstash -f …/config/kafka.conf

Aop拦截日志

@Aspect
@Component
public class AopLogAspect {@Value("${server.port}")private String serverPort;@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@Pointcut("execution(* com.example.service.*.*(..))")private void serviceAspect() {}@Autowiredprivate LogContainer logContainer;// 异常通知@AfterThrowing(pointcut = "serviceAspect()", throwing = "e")public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = requestAttributes.getRequest();JSONObject jsonObject = new JSONObject();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式jsonObject.put("request_time", df.format(new Date()));jsonObject.put("request_url", request.getRequestURL().toString());jsonObject.put("request_method", request.getMethod());jsonObject.put("signature", point.getSignature());jsonObject.put("request_args", Arrays.toString(point.getArgs()));jsonObject.put("error", e.toString());// IP地址信息jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);JSONObject requestJsonObject = new JSONObject();requestJsonObject.put("request", jsonObject);// 将日志信息投递到kafkaString log = requestJsonObject.toJSONString();logContainer.put(log);}
}
使用队列+线程实现异步
@Component
public class LogContainer {private static BlockingDeque<String> logDeque = new LinkedBlockingDeque<>();@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;public LogContainer() {new LogThreadKafka().start();}// 存入日志public void put(String log) {logDeque.offer(log);}// 只需要创建一次线程class LogThreadKafka extends Thread {@Overridepublic void run() {while (true) {String log = logDeque.poll();if (!StringUtils.isEmpty(log)) {// 将消息投递kafkakafkaTemplate.send("xxx-log", log);}}}}
}

http://www.ppmy.cn/server/38960.html

相关文章

汇编--栈和寄存器

栈 栈是一种运算受限的线性表&#xff0c;其限定仅在表尾进行插入和删除操作的线性表&#xff0c;表尾也被叫做栈顶。简单概括就是我们对于元素的操作只能够在栈顶进行&#xff0c;也造就了其先进后出的结构特性。 栈 这种内存空间其实本质上有两种操作&#xff1a;将数据放入…

Python sqlite3库 实现 数据库基础及应用 输入地点,可输出该地点的爱国主义教育基地名称和批次的查询结果。

目录 【第11次课】实验十数据库基础及应用1-查询 要求: 提示: 运行结果&#xff1a; 【第11次课】实验十数据库基础及应用1-查询 声明&#xff1a;著作权归作者所有。商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 1.简答题 数据库文件Edu_Base.db&#…

《21天学通C++》(第十七章)STL 动态数组类(vector和deque)

std::vector的特点 1.在数组末尾添加元素所需的时间是固定的&#xff0c;删除亦是如此 2.在数组中间添加或删除元素所需的时间&#xff0c;和该元素后面的元素个数成正比 3.动态存储 1.实例化vector 实例化vector时&#xff0c;要指定该动态数组中存储的对象类型 std::vector…

Hotcoin Research | 模块化将是大势所趋:拆解模块化区块链的现状和未来

关于模块化区块链叙事的讨论源于Celestia和其代币TIA的亮眼表现。实际上&#xff0c;模块化是未来区块链设计的主要发展方向和大势所趋。模块化区块链就像乐高积木一样&#xff0c;将区块链系统拆分为可重用的模块&#xff0c;通过定制组合可实现不同功能的区块链网络。这种灵活…

深入理解 ICMP 协议

目录 前言 1. 概述 特性与功能 报文封装与格式 2. ICMP差错报告 3. ICMP查询 4. ICMP应用 总结 前言 ICMP&#xff08;Internet Control Message Protocol&#xff09;是互联网控制报文协议&#xff0c;是TCP/IP协议族中的一个重要组成部分。作为网络层的协议之一&#…

初学python记录:力扣1652. 拆炸弹

题目&#xff1a; 你有一个炸弹需要拆除&#xff0c;时间紧迫&#xff01;你的情报员会给你一个长度为 n 的 循环 数组 code 以及一个密钥 k 。 为了获得正确的密码&#xff0c;你需要替换掉每一个数字。所有数字会 同时 被替换。 如果 k > 0 &#xff0c;将第 i 个数字用…

【微服务】网关(详细知识以及登录验证)

微服务网关 网关网关路由快速入门路由属性 路由断言网关登录校验自定义过滤器实现登录校验网关传递用户OpenFeign传递用户 网关 网络的关口&#xff0c;负责请求的路由&#xff0c;转发&#xff0c;身份校验 当我们把一个单体项目分成多个微服务并部署在多台服务器中&#xff…

Paddle 基于ANN(全连接神经网络)的GAN(生成对抗网络)实现

什么是GAN GAN是生成对抗网络&#xff0c;将会根据一个随机向量&#xff0c;实现数据的生成&#xff08;如生成手写数字、生成文本等&#xff09;。 GAN的训练过程中&#xff0c;需要有一个生成器G和一个鉴别器D. 生成器用于生成数据&#xff0c;鉴定器用于鉴定数据的准确性&…