Spring Boot + InfluxDB 批量写入(同步、异步、重试机制)

news/2025/3/16 21:42:19/

📌 1. 项目介绍

本项目使用 Spring Boot + InfluxDB 2.x,主要介绍 批量写入数据 的三种方式:

  1. 同步写入(Blocking Write)
  2. 异步写入(Non-blocking Write)
  3. 带重试机制的写入(Handling Errors with Retry)

适用于 高并发数据写入、物联网(IoT)、实时监控 等场景。

📌 2. InfluxDB 连接配置

✅ application.yml

java"># InfluxDB 独立配置
influxdb:url: http://192.168.1.xxx:28086/  # InfluxDB 服务器地址token: _7FZlXGJJcd8Ayox-F-hVBDdXb_a5SI3530x1DdFKZfQ65uOhnpQciJWHpd7ULhpAOcgj5oV2JsR-Xf0qTtAxg==org: xxx     # 组织名称bucket: xxx  # 存储桶名称logLevel: BODY  # 记录完整的 HTTP 请求和响应日志

✅ InfluxDBConfig.java

java">import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class InfluxDBConfig {@Value("${influxdb.url}")private String url;@Value("${influxdb.token}")private String token;@Value("${influxdb.org}")private String org;@Value("${influxdb.bucket}")private String bucket;@Beanpublic InfluxDBClient influxDBClient() {return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);}
}

🔹 说明

  • 通过 @Value("${influxdb.xxx}") 读取 application.yml 的配置。
  • InfluxDBClientFactory.create(url, token, org, bucket) 创建 InfluxDB 客户端

📌 3. Service 层:批量写入数据

TestServiceImpl.java 中,我们提供 三种批量写入方法


✅ 3.1 同步批量写入

java">@Override
public void writeBatchDataSync(List<TemperatureDTO> temperatureDTOs) {// 获取 InfluxDB 同步写入 APIWriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();// 构建批量数据点List<Point> points = temperatureDTOs.stream().map(dto -> Point.measurement("temperature").addTag("location", dto.getLocation()) // 标签(索引).addField("value", dto.getValue()) // 字段(数据).time(Instant.now(), WritePrecision.NS)).collect(Collectors.toList());// **同步写入数据**writeApi.writePoints(points);System.out.println("✅ 批量数据写入成功(同步)");
}

🔹 说明

  • getWriteApiBlocking() 同步写入 API主线程会阻塞,直到数据写入完成
  • writePoints(points) 直接写入所有数据点。

🔹 适用场景

  • 小规模数据写入
  • 确保数据立即存入数据库

✅ 3.2 异步批量写入

java">@Override
public void writeBatchDataAsync(List<TemperatureDTO> temperatureDTOs) {// 获取 InfluxDB **异步写入 API**WriteApi writeApi = influxDBClient.makeWriteApi();// 构建批量数据点List<Point> points = temperatureDTOs.stream().map(dto -> Point.measurement("temperature").addTag("location", dto.getLocation()).addField("value", dto.getValue()).time(Instant.now(), WritePrecision.NS)).collect(Collectors.toList());// **异步写入数据**CompletableFuture<Void> future = CompletableFuture.runAsync(() -> writeApi.writePoints(points));// **写入完成后的回调**future.whenComplete((result, error) -> {if (error != null) {System.err.println("🔥 异步写入失败:" + error.getMessage());} else {writeApi.close();  // **写入完成后关闭 API,避免资源泄漏**System.out.println("✅ 批量数据写入成功(异步)");}});
}

🔹 说明

  • 异步写入 API (makeWriteApi()),不会阻塞主线程,数据写入后台执行。
  • CompletableFuture 处理回调,写入失败会打印日志。
  • 写入完成后 writeApi.close(),避免资源泄漏。

🔹 适用场景

  • 高吞吐量、大数据量写入
  • 不需要立即确认写入结果

✅ 3.3 带重试机制的批量写入

java">@Override
public void writeBatchDataWithRetry(List<TemperatureDTO> temperatureDTOs) {// 获取 InfluxDB **异步写入 API**WriteApi writeApi = influxDBClient.makeWriteApi();// 构建批量数据点List<Point> points = temperatureDTOs.stream().map(dto -> Point.measurement("temperature").addTag("location", dto.getLocation()).addField("value", dto.getValue()).time(Instant.now(), WritePrecision.NS)).collect(Collectors.toList());// **设置最大重试次数**int maxRetries = 3;int attempt = 0;boolean success = false;while (attempt < maxRetries && !success) {try {writeApi.writePoints(points); // **尝试写入数据**success = true; // **写入成功,退出循环**System.out.println("✅ 批量数据写入成功(重试机制)");} catch (Exception e) {attempt++;System.err.println("⚠️ 第 " + attempt + " 次写入失败:" + e.getMessage());if (attempt == maxRetries) {System.err.println("❌ 达到最大重试次数,写入失败");}try {Thread.sleep(1000); // **延迟 1 秒后重试**} catch (InterruptedException ie) {Thread.currentThread().interrupt();}}}writeApi.close(); // **确保最终关闭 API**
}

🔹 说明

  • 最大重试次数 maxRetries = 3,写入失败会自动重试。
  • Thread.sleep(1000) 延迟 1 秒,防止频繁重试导致服务器压力过大。
  • 重试成功后 success = true 退出循环

🔹 适用场景

  • 网络不稳定(断网、超时)
  • 避免短暂的数据库异常导致数据丢失

📌 4. 测试接口(Controller 层)

 

java

复制编辑

java">@RestController
@RequestMapping("/api/influxdb")
public class InfluxDBController {@Autowiredprivate TestService influxDBService;@PostMapping("/write-batch-sync")public String writeBatchSync(@RequestBody List<TemperatureDTO> temperatureDTOs) {influxDBService.writeBatchDataSync(temperatureDTOs);return "✅ 批量数据写入成功(同步)";}@PostMapping("/write-batch-async")public String writeBatchAsync(@RequestBody List<TemperatureDTO> temperatureDTOs) {influxDBService.writeBatchDataAsync(temperatureDTOs);return "✅ 批量数据写入成功(异步)";}@PostMapping("/write-batch-retry")public String writeBatchWithRetry(@RequestBody List<TemperatureDTO> temperatureDTOs) {influxDBService.writeBatchDataWithRetry(temperatureDTOs);return "✅ 批量数据写入成功(重试机制)";}
}

📌 5. 总结

同步写入:适合小规模数据
异步写入:适合高吞吐量、大规模数据
重试机制:适合网络不稳定场景

🚀 这样,你的批量写入功能更加健壮了! 🎯


http://www.ppmy.cn/news/1579662.html

相关文章

各省水资源平台 水资源遥测终端机都用什么协议

各个省水资源平台 水资源遥测终端机 的建设大部分从2012年开始启动&#xff0c;经过多年建设&#xff0c;基本都已经形成了稳定的通讯要求&#xff1b;河北瑾航科技 遥测终端机&#xff0c;兼容了大部分省市的通讯协议&#xff0c;如果需要&#xff0c;可以咨询和互相学习&…

get提交几个步骤

HTTP GET 请求是一种用于从服务器获取资源的请求方法&#xff0c;其主要步骤如下&#xff1a; 1. **客户端发起请求** 客户端&#xff08;如浏览器或客户端程序&#xff09;通过 URL 向服务器发起 GET 请求。请求中包含请求行、请求头&#xff0c;但通常没有请求体。 2.…

Java多线程与高并发专题——原子类和 volatile、synchronized 有什么异同?

原子类和 volatile异同 首先&#xff0c;通过我们对原子类和的了解&#xff0c;原子类和volatile 都能保证多线程环境下的数据可见性。在多线程程序中&#xff0c;每个线程都有自己的工作内存&#xff0c;当多个线程访问共享变量时&#xff0c;可能会出现一个线程修改了共享变…

计算机网络--访问一个网页的全过程

文章目录 访问一个网页的全过程应用层在浏览器输入URL网址http://www.aspxfans.com:8080/news/index.aspboardID5&ID24618&page1#r_70732423通过DNS获取IP地址生成HTTP请求报文应用层最后 传输层传输层处理应用层报文建立TCP连接传输层最后 网络层网络层对TCP报文进行处…

【强化学习】PPO算法代码详解

介绍 PPO&#xff08;Proximal Policy Optimization&#xff0c;近端策略优化&#xff09;是一种用于强化学习的策略优化算法&#xff0c;由OpenAI在2017年提出。PPO结合了策略梯度方法的优点和信任区域优化&#xff08;Trust Region Optimization&#xff09;的思想&#xff…

微信小程序实现根据不同的用户角色显示不同的tabbar并且可以完整的切换tabbar

直接上图上代码吧 // login/login.js const app getApp() Page({/*** 页面的初始数据*/data: {},/*** 生命周期函数--监听页面加载*/onLoad(options) {},/*** 生命周期函数--监听页面初次渲染完成*/onReady() {},/*** 生命周期函数--监听页面显示*/onShow() {},/*** 生命周期函…

外呼系统破局电话管控:AI电销机器人合规运营实战指南

随着运营商对电话卡管控日趋严格&#xff0c;某金融科技公司曾因单日外呼超限导致80%号码被封——这一案例暴露出AI电销机器人在效率与合规间的矛盾。但数据显示&#xff0c;采用合规策略的企业外呼接通率仍能保持38%以上&#xff0c;关键在于建立适配监管环境的智能外呼体系。…

基于SSM + JSP 的水果蔬菜商城

基于ssm的水果蔬菜商城系统前台和后台&#xff08;源码安装视频数据库环境&#xff09;计算机项目程序设计管理系统java小程序网站商城 一.相关技术 Java、Spring、Springboot、MVC、Mybatis、MySQL、SSM框架、Web、HTML、maven、JavaScript、css、vue 二.部署配置 1.IntelliJ …