ES传输带宽优化方案

server/2025/2/12 13:12:59/

背景:目前日志从kafka中消费后转存ES,是通过批量发送的方式打入ES,但是如果数据量很大那么就会占用很多的带宽,而目前正在降本增效,无法增加带宽或者服务节点。

源码在最下方!!

  • 限流牺牲磁盘作为代价
  1. 从kafka消费消息后,向ES发送数据时进行限流比如50M批量发送,但是这个时候遇到的问题就是消息消费出来100M 剩下的50M怎么办,存储在本机的Mysql中或者持久化文件中,后续再慢慢消费。也不用担心所谓的消息顺序的问题,因为流量、告警中时间类字段均是在落ES之前就已经设置好了。但是磁盘总有满的一天。此方案舍弃
  2. 减少kafka生产者发送量,因为探针一直在生成日志,所以会有数据堆积,如果放在mysql或者磁盘中也会有撑满的一天,此方案舍弃
  3. 通过代码中增加消费等待时间,降低消费速度,得到限流,但是这样会出现broker消息堆积,而且kafka目前设置的是5分钟删除旧数据,消费堆积会撑满磁盘,而且因为5分钟的设置 会出现删除未消费的数据,导致消息丢失。此方案舍弃
  • 限流牺牲内存作为代价
  1. 引用上面的解决方法,将限流出现的堆积数据存储在内存中或者引入redis,都会牺牲内存。此方案舍弃
  • 增加服务节点作为代价
  1. 新增服务器,用来存储限流后堆积的数据,不管是磁盘也好 内存也罢,都对业务系统所在的服务器无影响,但是会出现成本增加,此方案舍弃
  • 删除冗余字段、删除不必要的字段类型(text)
  1. 数据发送占用带宽,字段过多也有原因,如果一条数据10个字段与30个字段,所占用的字节肯定是有区别,所以删除掉索引中不必要的字段,可以测试一下能提升多少性能。
  2. 目前所有索引的mapping字段都支持两种方式(text、keyword)对于需要分词查询的字段可以沿用两种方式,但是对于其他不涉及分词、要是集中在精确查询、排序、聚合的字段 可以改为只有 keyword,因为text由于需要存储原始文本以及经过分词后的各个词项,因此通常比keyword占用更多的存储空间

  • 使用httpClient方式发送批量操作(bulk)请求,并使用gzip进行压缩

原理:通过gzip压缩,将50万条数据转为字节流,然后再转为字节数组,放入entity实体中,然后告诉ES服务器,是通过gzip的方式压缩的,ES接收到数据后会通过gzip的方式解压落库。

压缩数据部分代码:

压缩后测试结果 5万条数据 带宽占用最高 667KB!!

未压缩数据部分代码

未压缩测试结果 5万条数据 带宽占用达到了16M 最高达到19M

结论

对于5万数据来说  压缩与不压缩相差了30倍,但是目前5万条数据大部分字段都是相同的,所以在压缩的时候 压缩比很高。实际情况可能就是10倍  但也是显而易见压缩后带宽占用很少!!

源码贴出来了,兄弟们自取

package com.event.util;import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.security.cert.X509Certificate;
import java.util.zip.GZIPOutputStream;
public class ElasticsearchBulkInsert {public static void main(String[] args) throws Exception {// 设置凭据提供者BasicCredentialsProvider credsProvider = new BasicCredentialsProvider();credsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("root", "root"));// 创建允许所有证书的SSL上下文SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, (X509Certificate[] chain, String authType) -> true).build();Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create().register("https", new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE)).build();// 创建连接池管理器PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry);// 创建HttpClient,并设置默认请求头接受gzip编码CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connectionManager).setDefaultCredentialsProvider(credsProvider).addInterceptorFirst((HttpResponseInterceptor) (request, context) -> request.setHeader("Accept-Encoding", "gzip")).build();try {// 准备多个文档组成的bulk请求体/*** 下面是通过gzip压缩的逻辑*/StringBuilder bulkRequestBody = new StringBuilder();for (int i=0;i<50000;i++){bulkRequestBody.append("{\"index\":{\"_index\":\"http20250208\",\"op_type\":\"create\"}}\n");bulkRequestBody.append("{\"agentId\":\"szhangsan-test-"+i+"\",\"requestByteRange6\":0,\"requestByteRange7\":0,\"client_city\":\"192.168.0.0/16\",\"db_position\":\"内对内\",\"vlanType\":0,\"totalPktps\":2,\"serverPayload\":84,\"responseByteRange3\":0,\"totalPayload\":84,\"server_country_code\":\"CN\",\"clientIp\":\"192.168.2.201\",\"logOffsets\":[],\"clientBitps\":0,\"dataOffsets\":[288236297029019500],\"serverBitps\":1344,\"totalPkts\":2,\"clientPkts\":0,\"deviceId\":\"037ef078-4ccc-480c-97c5-eb9c70dfdd79\",\"serverIpType\":0,\"id\":\"4615197014644469761kQ\",\"client_country_code\":\"CN\",\"client_province\":\"沈庄数据中心\",\"responseByteRange4\":0,\"responseByteRange5\":0,\"clientNetSegmentIds\":[],\"dataLen\":0,\"vlanId2\":0,\"eventType\":2,\"appProtocol\":0,\"requestByteRange3\":0,\"client_longitude\":\"116.400000\",\"dataLens\":[248],\"serverIp\":\"192.168.2.203\",\"alarmCount\":0,\"server_longitude\":\"116.400000\",\"totalBytes\":168,\"clientIpType\":0,\"serverTransRate\":50,\"requestByteRange4\":0,\"serverPktps\":2,\"transProtocol\":2,\"durationTimeNs\":1000000000,\"clientTransRate\":0,\"requestByteRange1\":1,\"appId\":249,\"kafkakey\":\"event\",\"probeIds\":[11],\"serverPkts\":2,\"serverPort\":26051,\"clientPayload\":0,\"clientPortList\":[],\"totalTransRate\":50,\"responseByteRange6\":0,\"responseByteRange2\":1,\"server_country\":\"中国\",\"dataOffset\":0,\"serverBytes\":168,\"flowBeginTimeNs\":1738914456332849000,\"statBeginTimeSec\":1738944831,\"requestByteRange5\":0,\"eventStatId\":461519701464447000,\"statEndTimeSec\":1738944831,\"requestByteRange2\":0,\"vai\":141,\"date\":\"2025-02-08\",\"logLens\":[],\"eventCount\":1,\"clientPktps\":0,\"responseByteRange7\":0,\"client_latitude\":\"39.900000\",\"server_city\":\"192.168.111.111\",\"totalBitps\":1344,\"server_province\":\"北京数据中心\",\"client_country\":\"中国\",\"server_latitude\":\"39.900000\",\"vlanId\":0,\"clientBytes\":0,\"flowEndTimeNs\":1738944831883650000,\"responseByteRange1\":0,\"serverNetSegmentIds\":[]}\n"); // 第一条记录}ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {gzipOutputStream.write(bulkRequestBody.toString().getBytes("UTF-8"));}HttpPost httpPost = new HttpPost("/_bulk");ByteArrayEntity entity = new ByteArrayEntity(byteArrayOutputStream.toByteArray());entity.setContentType("application/json");entity.setContentEncoding("gzip");httpPost.setEntity(entity);/*** 下面是不压缩的*//* StringBuilder bulkRequestBody = new StringBuilder();for (int i=0;i<50000;i++){bulkRequestBody.append("{\"index\":{\"_index\":\"ecs_http20250208\",\"op_type\":\"create\"}}\n");bulkRequestBody.append("{\"agentId\":\"shichuanzong-test-"+i+"\",\"requestByteRange6\":0,\"requestByteRange7\":0,\"client_city\":\"192.168.0.0/16\",\"db_position\":\"内对内\",\"vlanType\":0,\"totalPktps\":2,\"serverPayload\":84,\"responseByteRange3\":0,\"totalPayload\":84,\"server_country_code\":\"CN\",\"clientIp\":\"192.168.2.201\",\"logOffsets\":[],\"clientBitps\":0,\"dataOffsets\":[288236297029019500],\"serverBitps\":1344,\"totalPkts\":2,\"clientPkts\":0,\"deviceId\":\"037ef078-4ccc-480c-97c5-eb9c70dfdd79\",\"serverIpType\":0,\"id\":\"4615197014644469761kQ\",\"client_country_code\":\"CN\",\"client_province\":\"沈庄数据中心\",\"responseByteRange4\":0,\"responseByteRange5\":0,\"clientNetSegmentIds\":[],\"dataLen\":0,\"vlanId2\":0,\"eventType\":2,\"appProtocol\":0,\"requestByteRange3\":0,\"client_longitude\":\"116.400000\",\"dataLens\":[248],\"serverIp\":\"192.168.2.203\",\"alarmCount\":0,\"server_longitude\":\"116.400000\",\"totalBytes\":168,\"clientIpType\":0,\"serverTransRate\":50,\"requestByteRange4\":0,\"serverPktps\":2,\"transProtocol\":2,\"durationTimeNs\":1000000000,\"clientTransRate\":0,\"requestByteRange1\":1,\"appId\":249,\"kafkakey\":\"event\",\"probeIds\":[11],\"serverPkts\":2,\"serverPort\":26051,\"clientPayload\":0,\"clientPortList\":[],\"totalTransRate\":50,\"responseByteRange6\":0,\"responseByteRange2\":1,\"server_country\":\"中国\",\"dataOffset\":0,\"serverBytes\":168,\"flowBeginTimeNs\":1738914456332849000,\"statBeginTimeSec\":1738944831,\"requestByteRange5\":0,\"eventStatId\":461519701464447000,\"statEndTimeSec\":1738944831,\"requestByteRange2\":0,\"vai\":141,\"date\":\"2025-02-08\",\"logLens\":[],\"eventCount\":1,\"clientPktps\":0,\"responseByteRange7\":0,\"client_latitude\":\"39.900000\",\"server_city\":\"192.168.111.111\",\"totalBitps\":1344,\"server_province\":\"北京数据中心\",\"client_country\":\"中国\",\"server_latitude\":\"39.900000\",\"vlanId\":0,\"clientBytes\":0,\"flowEndTimeNs\":1738944831883650000,\"responseByteRange1\":0,\"serverNetSegmentIds\":[]}\n"); // 第一条记录}HttpPost httpPost = new HttpPost("/_bulk");StringEntity entity = new StringEntity(bulkRequestBody.toString(),"UTF-8");entity.setContentType("application/json");httpPost.setEntity(entity);*/// 目标主机HttpHost targetHost = new HttpHost("192.168.2.1", 19200, "https");// 执行请求CloseableHttpResponse response = httpClient.execute(targetHost, httpPost);try {HttpEntity entity1 = response.getEntity();if (entity1 != null) {// 打印响应体内容String responseString = EntityUtils.toString(entity1, "UTF-8");System.out.println(responseString); // 这里将显示具体的错误信息}} finally {response.close();}} catch (Exception e) {e.printStackTrace();} finally {httpClient.close(); // 关闭httpClient以释放资源}}
}


http://www.ppmy.cn/server/167064.html

相关文章

C语言基础11:分支结构以及if的使用

C语言基础 内容提要 分支结构 条件判断用if语句实现分支结构 分支结构 问题抛出 我们在程序设计往往会遇到如下问题&#xff0c;比如下面的函数的计算&#xff1a; y { 1 / x 当 x ≠ 0 时 10000 当 x 0 时 y \begin{cases} 1/x \quad当x\neq0时\\ \\ 10000 \quad当x0…

uni getLocation 公众号h5获取定位坐标没有返回

先看代码 //获取经纬度getLocation() {console.log("111")uni.getLocation({type: wgs84,success: function (res) {console.log(当前位置的经度&#xff1a; res.longitude);console.log(当前位置的纬度&#xff1a; res.latitude);},fail: function(err) {conso…

vue 134~152

认识Vue3 1. Vue2 选项式 API vs Vue3 组合式API <script> export default {data(){return {count:0}},methods:{addCount(){this.count}} } </script><script setup> import { ref } from vue const count ref(0) const addCount ()> count.value &l…

GitHub Pages + Jekyll 博客搭建指南(静态网站搭建)

目录 &#x1f680; 静态网站及其生成工具指南&#x1f30d; 什么是静态网站&#xff1f;&#x1f4cc; 静态网站的优势⚖️ 静态网站 VS 动态网站 &#x1f680; 常见的静态网站生成器对比&#x1f6e0;️ 使用 GitHub Pages Jekyll 搭建个人博客&#x1f4cc; 1. 创建 GitHu…

前端工程化与构建工具详解

四、项目设计与架构 1. 设计模式 观察者模式 vs 发布订阅模式 观察者模式&#xff1a; 直接依赖&#xff1a;观察者直接订阅目标对象&#xff0c;目标对象维护观察者列表。适用场景&#xff1a;简单的一对多依赖关系&#xff08;如事件监听&#xff09;。示例&#xff1a;cla…

CVE-2024-52046 Apache mina 反序列化漏洞简单分析

前言 最近披露了一个新的apache下属产品mina的CVE-2024-52046反序列化漏洞&#xff0c;首先查看cve官网公开的部分漏洞信息 https://www.cve.org/CVERecord?idCVE-2024-52046 感觉有点乱&#xff0c;像这种apache产品的漏洞一般在apache邮件列表归档网站中也会有信息&#x…

2025最新版Node.js下载安装~保姆级教程

1. node中文官网地址&#xff1a;http://nodejs.cn/download/ 2.打开node官网下载压缩包&#xff1a; 根据操作系统不同选择不同版本&#xff08;win7系统建议安装v12.x&#xff09; 我这里选择最新版win 64位 3.安装node ①点击对话框中的“Next”&#xff0c;勾选同意后点…

告别DeepSeek官方的服务器繁忙~腾讯云DeepSeek-V3/R1无限免费调用~不用安装任何东西~小白一学就会~

DeepSeek官方的服务经常崩溃&#xff0c;弄得我们也很崩溃。 还是腾讯云给力&#xff0c;DeepSeek 系列模型限时免费&#xff1a; 即日至北京时间2025年2月25日23:59:59&#xff0c;所有腾讯云用户均可享受 DeepSeek-V3、DeepSeek-R1 模型限时免费服务&#xff0c;单账号限制接…