文章目录
一、简介
1、简介
关系型数据库也是支持时间戳的,也能够基于时间戳进行查询。但是,从我们的使用场景出发,需要注意数据库的写入性能。通常,关系型数据库会采用 B+树数据结构,在数据写入时,有可能会触发叶裂变,从而产生了对磁盘的随机读写,降低写入速度。
当前市面上的数据库>时序数据库通常都是采用 LSM Tree 的变种,顺序写磁盘来增强数据的写入能力。通常数据库>时序数据库都会保证在单点每秒数十万的写入能力
。
数据库>时序数据库一般用于指标监控场景
。这个场景的数据有一个非常明显的特点就是冷热差别明显。通常,指标监控只会使用近期一段时间的数据。
2、官网
官网:https://www.influxdata.com/
二、部署
1、安装
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.4.0-linux-amd64.tar.gz
# 解压
tar -zxvf influxdb2-2.4.0-linux-amd64.tar.gz# 启动
./influxd
2、配置
(1)用户初始化
1、访问http://192.168.56.10:8086/
2、点击GET STARTED后,需要填写用户名、密码、组织和初始化的数据库(Bucket)
admin/admin123
下一步之后,点击CONFIGURE LATER
3、进入到初始化页面
三、入门(Web UI)
1、加载数据
(1)上传数据文件
在 Web UI 上,你可以用文件的方式上传数据,前提是文件中的数据符合 InfluxDB 支持的类型,包括 CSV、带 Flux 注释的 CSV 和 InfluxDB 行协议。
点开一个文件上传的页面,会有对应的示例:
(2)代码接入模板
在这里提供了一些代码相关的Demo,非常方便:
2、管理存储桶
可以将 InfluxDB 中的 bucket 理解为普通关系型数据库中的 database。
(1)创建桶
创建桶,可以选择自动删除超过多长时间的数据。
(2)修改桶
可以直接修改桶的数据过期时间。
修改桶的名称是敏感行为,谨慎操作!
3、演示:测试录入数据
4、Telegraf集成
Telegraf 是 InfluxDB 生态中的一个数据采集组件,它可以将各种时序数据自动采集到InfluxDB。现在,Telegraf 不仅仅是 InfluxDB 的数据采集组件了,很多数据库>时序数据库都支持与 Telegraf 进行协作,不少类似的时序数据收集组件选择在 Telegraf 的基础上二次开发。
这里先略过了。
5、管理抓取任务
抓取任务就是你给定一个 URL,InfluxDB 每隔一段时间去访问这个链接,把访问到的数据入库。
目标 URL 暴露出来的数据格式必须得是 Prometheus 数据格式
。
此处略了。
6、管理 API Token
token用于API的使用。
7、使用Data Explorer
(1)查询构造器
(2)FLUX 脚本编辑器
可以手动将查询构造器切换为 FLUX 脚本编辑器
。
(3)数据预览区
数据预览区可以将你的数据展示出来。下图是一个效果图。
(4)数据导出为CSV
8、使用Notebook
四、Flux语法
暂略。
五、HTTP API
使用API时,需要在Header头携带Authorization
,value值为Token空格 + 你的token
# 查询目前所有的 Token 信息,包括他们拥有什么权限
http://192.168.56.10:8086/api/v2/authorizations
官方文档:https://docs.influxdata.com/influxdb/v2/api/
六、Java操作Influxdb
客户端参考:https://github.com/influxdata/influxdb-client-java
1、导包
<dependency><groupId>com.influxdb</groupId><artifactId>influxdb-client-java</artifactId><version>6.5.0</version>
</dependency>
2、POJO
import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;import java.time.Instant;/*** @description: 这是一个POJO类,对应将POJO类写入InfluxDB*/
@Measurement(name="temperature")
public class DemoPOJO {/** 注意类上的@Measurement注解,我们既可以使用Measurement注解来指定一个POJO类的测量名称* 但是使用@Measurement注解会将测量名称直接写死在代码里* 当测量名称会在运行时发生改变时,我们可以使用@Column(menasurement=true)注解* 这样会将POJO类中被注解的值作为测量名称。* **///@Column(measurement = true)//String measurementName;/** 相当于InfluxDB行协议中的标签集,此处标签的名称将会是location **/@Column(tag = true)String location;/** 相当于InfluxDB行协议中的字段集,此处字段的名称将会是value **/@ColumnDouble value;/** 相当于InfluxDB行协议中的时间戳 **/@Column(timestamp = true)Instant timestamp;/** 全参构造器,方便调用者创建对象 **/public DemoPOJO(String location, Double value, Instant timestamp) {this.location = location;this.value = value;this.timestamp = timestamp;}
}
3、同步写入
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.internal.AbstractWriteClient;
import com.influxdb.client.internal.MeasurementMapper;
import com.influxdb.client.write.Point;import java.time.Instant;/*** @author: tony* @description: 这里是使用同步方式向InfluxDB写入数据的示例代码**/
public class Write1 {/** token 操作时需要换成自己的 **/private static char[] token = "token....".toCharArray();/** 组织名称 **/private static String org = "cxf";/** InfluxDB服务提供的url **/private static String url = "http://localhost:8086/";/** 存储桶名称 **/private static String bucket = "example_java";public static void main(String[] args) {InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();// 0. 使用InflxuDB行协议写入
// writeApiBlocking.writeRecord(WritePrecision.MS,"temperature,location=north value=50");// 1. 使用Point写入
// Point point = Point.measurement("temperature")
// .addTag("location", "west")
// .addField("value", 38.0)
// .time(Instant.now(),WritePrecision.NS)
// ;
// writeApiBlocking.writePoint(point);// 2. 使用POJO类写入DemoPOJO demoPOJO = new DemoPOJO("east", 22.2, Instant.now());writeApiBlocking.writeMeasurement(WritePrecision.MS,demoPOJO);// 3. 调用close方法会关闭并释放一些比如守护线程之类的对象。influxDBClient.close();}
}
4、异步写入
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;/*** @description: 通过异步的方式向InfluxDB写入数据*/
public class AsyncWrite {/** token 操作时需要换成自己的 **/private static char[] token = "token....".toCharArray();/** 组织名称 **/private static String org = "cxf";/** InfluxDB服务提供的url **/private static String url = "http://localhost:8086/";/** 存储桶名称 **/private static String bucket = "example_java";public static void main(String[] args) {// 0.创建InfluxDB的客户端InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);// 1.异步写入会创建一个守护线程,所以在makWriteApi时可以传递一些配置项,也就是WriteOptions对象WriteOptions options = WriteOptions.builder().batchSize(999).flushInterval(10000).build();// 2.使用makeWriteApi创建的WriteApi writeApi = influxDBClient.makeWriteApi(options);for (int i = 0; i < 999; i++) {writeApi.writeRecord(WritePrecision.MS,"temperature,location=south value=77");}// 3.关闭连接,此方法会触发一次刷写,将缓冲区中剩下的数据向InfluxDB写入一次。influxDBClient.close();}
}
5、查询
import com.influxdb.client.BucketsApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.QueryApi;
import com.influxdb.client.domain.File;
import com.influxdb.query.FluxColumn;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;import java.util.List;
import java.util.Map;/*** @description: 这是关于从InfluxDB查询数据的代码*/
public class Query {/** token 操作时需要换成自己的 **/private static char[] token = "token....".toCharArray();/** 组织名称 **/private static String org = "cxf";/** InfluxDB服务提供的url **/private static String url = "http://localhost:8086/";public static void main(String[] args) {// 0.创建InfluxDB客户端对象InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org);// 1.获取查询API对象QueryApi queryApi = influxDBClient.getQueryApi();// 2.这个flux脚本会查询test_init存储桶中的go_goroutines测量,这个测量下只有一个序列String flux = "from(bucket: \"test_init\")\n" +" |> range(start: -1h)\n" +" |> filter(fn: (r) => r[\"_measurement\"] == \"go_goroutines\")\n" +" |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +" |> yield(name: \"mean\")";// 3.这个flux脚本会查询example02存储桶中的cpu测量,指定字段名称为usage_user后,String flux2 = "from(bucket: \"example02\")\n" +" |> range(start: -1h)\n" +" |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\")\n" +" |> filter(fn: (r) => r[\"_field\"] == \"usage_user\")\n" +" |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)\n" +" |> yield(name: \"mean\")";// 4.使用query方法将会得到一个List<FluxTable>对象,其中每一个FluxTable都对应着一个序列List<FluxTable> query = queryApi.query(flux);// 5.下面这个for循环会把遍历每个序列,并将这个序列中对应的每一行数据打印出来。for (FluxTable fluxTable : query) {List<FluxRecord> records = fluxTable.getRecords();for (FluxRecord record : records) {Map<String, Object> one = record.getValues();System.out.println(one);}}// 6.下面的queryRaw方法将会得到一个字符串,字符串中是FLUX拓展的CSV格式的数据String data = queryApi.queryRaw(flux2);System.out.println(data);}
}