📌 1. 项目介绍
本项目使用 Spring Boot + InfluxDB 2.x,主要介绍 批量写入数据 的三种方式:
- 同步写入(Blocking Write)
- 异步写入(Non-blocking Write)
- 带重试机制的写入(Handling Errors with Retry)
适用于 高并发数据写入、物联网(IoT)、实时监控 等场景。
📌 2. InfluxDB 连接配置
✅ application.yml
java"># InfluxDB 独立配置
influxdb:url: http://192.168.1.xxx:28086/ # InfluxDB 服务器地址token: _7FZlXGJJcd8Ayox-F-hVBDdXb_a5SI3530x1DdFKZfQ65uOhnpQciJWHpd7ULhpAOcgj5oV2JsR-Xf0qTtAxg==org: xxx # 组织名称bucket: xxx # 存储桶名称logLevel: BODY # 记录完整的 HTTP 请求和响应日志
✅ InfluxDBConfig.java
java">import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class InfluxDBConfig {@Value("${influxdb.url}")private String url;@Value("${influxdb.token}")private String token;@Value("${influxdb.org}")private String org;@Value("${influxdb.bucket}")private String bucket;@Beanpublic InfluxDBClient influxDBClient() {return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);}
}
🔹 说明
- 通过
@Value("${influxdb.xxx}")
读取application.yml
的配置。 InfluxDBClientFactory.create(url, token, org, bucket)
创建 InfluxDB 客户端。
📌 3. Service 层:批量写入数据
在 TestServiceImpl.java
中,我们提供 三种批量写入方法。
✅ 3.1 同步批量写入
java">@Override
public void writeBatchDataSync(List<TemperatureDTO> temperatureDTOs) {// 获取 InfluxDB 同步写入 APIWriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();// 构建批量数据点List<Point> points = temperatureDTOs.stream().map(dto -> Point.measurement("temperature").addTag("location", dto.getLocation()) // 标签(索引).addField("value", dto.getValue()) // 字段(数据).time(Instant.now(), WritePrecision.NS)).collect(Collectors.toList());// **同步写入数据**writeApi.writePoints(points);System.out.println("✅ 批量数据写入成功(同步)");
}
🔹 说明
getWriteApiBlocking()
同步写入 API,主线程会阻塞,直到数据写入完成。writePoints(points)
直接写入所有数据点。
🔹 适用场景
- 小规模数据写入
- 确保数据立即存入数据库
✅ 3.2 异步批量写入
java">@Override
public void writeBatchDataAsync(List<TemperatureDTO> temperatureDTOs) {// 获取 InfluxDB **异步写入 API**WriteApi writeApi = influxDBClient.makeWriteApi();// 构建批量数据点List<Point> points = temperatureDTOs.stream().map(dto -> Point.measurement("temperature").addTag("location", dto.getLocation()).addField("value", dto.getValue()).time(Instant.now(), WritePrecision.NS)).collect(Collectors.toList());// **异步写入数据**CompletableFuture<Void> future = CompletableFuture.runAsync(() -> writeApi.writePoints(points));// **写入完成后的回调**future.whenComplete((result, error) -> {if (error != null) {System.err.println("🔥 异步写入失败:" + error.getMessage());} else {writeApi.close(); // **写入完成后关闭 API,避免资源泄漏**System.out.println("✅ 批量数据写入成功(异步)");}});
}
🔹 说明
- 异步写入 API (
makeWriteApi()
),不会阻塞主线程,数据写入后台执行。 CompletableFuture
处理回调,写入失败会打印日志。- 写入完成后
writeApi.close()
,避免资源泄漏。
🔹 适用场景
- 高吞吐量、大数据量写入
- 不需要立即确认写入结果
✅ 3.3 带重试机制的批量写入
java">@Override
public void writeBatchDataWithRetry(List<TemperatureDTO> temperatureDTOs) {// 获取 InfluxDB **异步写入 API**WriteApi writeApi = influxDBClient.makeWriteApi();// 构建批量数据点List<Point> points = temperatureDTOs.stream().map(dto -> Point.measurement("temperature").addTag("location", dto.getLocation()).addField("value", dto.getValue()).time(Instant.now(), WritePrecision.NS)).collect(Collectors.toList());// **设置最大重试次数**int maxRetries = 3;int attempt = 0;boolean success = false;while (attempt < maxRetries && !success) {try {writeApi.writePoints(points); // **尝试写入数据**success = true; // **写入成功,退出循环**System.out.println("✅ 批量数据写入成功(重试机制)");} catch (Exception e) {attempt++;System.err.println("⚠️ 第 " + attempt + " 次写入失败:" + e.getMessage());if (attempt == maxRetries) {System.err.println("❌ 达到最大重试次数,写入失败");}try {Thread.sleep(1000); // **延迟 1 秒后重试**} catch (InterruptedException ie) {Thread.currentThread().interrupt();}}}writeApi.close(); // **确保最终关闭 API**
}
🔹 说明
- 最大重试次数
maxRetries = 3
,写入失败会自动重试。 Thread.sleep(1000)
延迟 1 秒,防止频繁重试导致服务器压力过大。- 重试成功后
success = true
退出循环。
🔹 适用场景
- 网络不稳定(断网、超时)
- 避免短暂的数据库异常导致数据丢失
📌 4. 测试接口(Controller 层)
复制编辑
java">@RestController
@RequestMapping("/api/influxdb")
public class InfluxDBController {@Autowiredprivate TestService influxDBService;@PostMapping("/write-batch-sync")public String writeBatchSync(@RequestBody List<TemperatureDTO> temperatureDTOs) {influxDBService.writeBatchDataSync(temperatureDTOs);return "✅ 批量数据写入成功(同步)";}@PostMapping("/write-batch-async")public String writeBatchAsync(@RequestBody List<TemperatureDTO> temperatureDTOs) {influxDBService.writeBatchDataAsync(temperatureDTOs);return "✅ 批量数据写入成功(异步)";}@PostMapping("/write-batch-retry")public String writeBatchWithRetry(@RequestBody List<TemperatureDTO> temperatureDTOs) {influxDBService.writeBatchDataWithRetry(temperatureDTOs);return "✅ 批量数据写入成功(重试机制)";}
}
📌 5. 总结
✅ 同步写入:适合小规模数据
✅ 异步写入:适合高吞吐量、大规模数据
✅ 重试机制:适合网络不稳定场景
🚀 这样,你的批量写入功能更加健壮了! 🎯