2017-06-10 23:09:22 InfluxDB 数据库 Grafana 4038
导读:
最近想对现有的数据可视化做一下调整,以便减少前端的开发工作量,尽量自动化,把主要精力放在数据上。发现 Grafana 非常适合,决定尝试一下。由于目前 Grafana4.2 版本不支持 Mysql 数据源(官方说到4.3版本开始支持),所以,决定再尝试一下最近比较火的时间序列数据库 InfluxDB(没有选择 ElasticSearch 是因为它本身太重,且重查询轻写入。最主要的是目前的数据量不是很大,可用服务器有限)。
InfluxDB 是一个开源分布式时序、事件和指标数据库,使用 Go 语言编写,无需外部依赖,其设计目标是实现分布式和水平伸缩扩展。
InfluxData 提供了 TICK 一套解决方案,不过使用比较多的是 InfluxDB,这里先从 InfluxDB 学习开始。
InfluxDB
是 InfluxData
下的子项目。目前,InfluxData
由收集、存储、可视化、处理四个项目组成(简称TICK)。这些都是建立在时间序列数据基础之上的。从下图就可以看出它们的使用方式。
这个和 ELK
(Elasticsearch + Logstash + Kinaba)组合很像。但是 TICK
相对而言,更灵活、更丰富。每个项目又可以独立出来与其它框架组合使用。
安装比较简单。
# 下载
wget https://dl.influxdata.com/influxdb/releases/influxdb_1.2.4_amd64.deb
# 安装
dpkg -i influxdb_1.2.4_amd64.deb
# 停止服务
service influxdb stop
# 启动服务
service influxdb start
#进入命令客户端
influx
安装后就可以直接使用。如果想修改默认参数,请编辑 /etc/influxdb/influxdb.conf
文件中对应的参数。
在存储上,采用 Schemaless ,列存储,压缩率高,可以存储任意数量的列。
数据库,可以创建多个,不同数据库中的数据文件,存放在磁盘上的不同目录。
# 查看数据库
show databases
# 创建数据库
create database test
# 使用数据库
use test
# 删除数据库
drop database test
相当于数据库中的表名,时间数据 time
、Tag
、Field
组合成了 Measurement
。
# 查看所有的 measurement
show measurements
# 插入数据
insert cpu,ip=192.168.1.1 server="A",used=0.23
insert memory,host=192.168.1.1 service="A",used=26342
# 查看表中数据
> select * from cpu
name: cpu
time ip server used
---- -- ------ ----
1495440286179940657 192.168.1.1 1 0.23
# 删除measurement
drop measurement cpu
InfluxDB
没有创建表一说,直接 insert
就可以将数据插入 Measurement
,可以理解为自动创建表。
InfluxDB
没有对 measurement
的修改操作。
写入数据库的数据采用 Line Protocol
,详细参考Line Protocol Tutorial,格式如下:
insert <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [timestamp]
其中,tag
和 timestamp
是可选项。
表里的一行数据,由时间戳(timestamp)、标签(tag)、数据(field)组成。
timestamp
相当于 point
的时间戳,field
相当于point的值,tags
相当于 point
的属性。
数据的时间戳,相当于主键。不赋值时,默认为系统时间。
相当于索引。采用key=value形式,多个 tag 之间用 ',' 分隔。
<tag_key>=<tag_value>,<tag_key>=<tag_value>
数据结构中,Tag
是可选的。并不是一定要有,因为是索引,所以一般建议带上。
# 查看所有 tags 的 key
show tag keys from cpu
# 查看指定 key 的 value
show tag values from cpu with key = "ip" where used=0.23
相当于实际记录的数据值,也是采用key=value形式,多个 tag 之间用 ',' 分隔。
<field_key>=<field_value>,<field_key>=<field_value>
# 查看所有的 field
show field keys
注意:
每个field
的value
类型必须保持一致,如果不确定,最好用string
类型
字符串类型的value
用""
括起来Booleans
类型,TRUE
用t
,T
,true
,True
,TRUE
。FALSE
用f
,F
,false
,False
,FALSE
存储策略,InfluxDB 会自动清理数据,可以设置数据保留的时间,默认会创建存储策略 autogen (保留时间为永久),之后用户可以自己设置,例如保留最近 30 天的数据。
# 查看数据保持策略
> show retention policies on "test"
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 true
# 创建新的策略
create retention policy "rp_day_30" on "test" duration 30d replication 1 default
# 修改策略
alter retention policy "rp_day_30" on "test" duration 20d replication 1 default
# 删除策略
drop retention policy "rp_day_30" on "test"
注意:
存储策略有点类似于分区数据块,修改了策略,新策略里不会有旧策略的数据。
要想查看旧策略下的数据,需要在measurement
前加上策略名称。
# 查看默认策略中的数据
select * from "autogen".cpu
Series 相当于表里的数据。
# 查看 cpu 表中的所有数据
show series from cpu
# 或
show series on test from cpu
下面是 Retention Policy
、Measurement
、Series
关系的大体结构图:
安装后,默认没有用户名和密码。可以通过客户端命令修改。
为了安全,建议根据使用情况,创建不同的用户。详细操作,可以查看官方文档用户授权。
# 显示用户
show users
# 创建用户
create user "developer" with password '123456'
# 创建管理员权限的用户
create user "admin" with password 'admin123' with all privileges
# 给普通用户授予管理员权限
GRANT ALL PRIVILEGES TO "developer"
# 给用户指定数据库操作权限
GRANT [READ,WRITE,ALL] ON <database_name> TO <username>
GRANT ALL ON test TO "developer"
# 撤回用户的管理员权限
REVOKE ALL PRIVILEGES FROM "developer"
# 查看用户权限
show grants for "developer"
# 修改密码
set password for "developer" = '123456789'
# 删除用户
DROP USER "developer"
创建完用户后,需要开启权限验证功能才能生效。修改 /etc/influxdb/influxdb.conf
。
[http]
auth-enabled = true
重启服务,连接登录。
influx -host localhost -username admin -password 'admin123'
修改配置文件 /etc/influxdb/influxdb.conf
,开启 WEB-Admin 操作界面。
[admin]
enabled = true
bind-address = ":8083"
也就是 Continuous Queries,当数据超过保存策略里指定的时间之后,就会被删除;可以通过连续查询把原来的秒级数据,保存为分钟级或者小时级的数据,从而减小数据的占用空间。
# 查看连续查询
SHOW CONTINUOUS QUERIES;
## 创建连续查询
CREATE CONTINUOUS QUERY cq-name ON db-name BEGIN
SELECT mean(tbl-name) INTO newtbl-name FROM tbl-name GROUP BY time(30m) END;
## 删除连续查询
DROP CONTINUOUS QUERY <cq-name> ON <db-name>;
InfluxDB
提供了丰富的多语言 API 。这里以 Java
使用为例。其它语言请看API Client Libraries。
先在 Maven
加上资源库引用。
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.5</version>
</dependency>
简单以添加和查询操作为例。
public class Test {
public static void main(String[] args) {
InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086", "admin", "admin123");
String dbName = "test";
String retentionPolicy = "rp_day_30";
Point point1 = Point.measurement("cpu")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("ip", "192.168.1.22")
.addField("service", "serviceB")
.addField("used", 0.14)
.build();
Point point2 = Point.measurement("memory")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("host", "192.168.1.22")
.addField("service", "serviceB")
.addField("used", 67821)
.build();
influxDB.write(dbName, retentionPolicy, point1);
influxDB.write(dbName, retentionPolicy, point2);
Query query = new Query("SELECT * FROM cpu", dbName);
QueryResult queryResult = influxDB.query(query);
List<QueryResult.Result> list = queryResult.getResults();
for (QueryResult.Result result : list) {
System.out.println("------------------");
List<QueryResult.Series> l = result.getSeries();
for (QueryResult.Series series : l) {
System.out.println("Measurement Name: " + series.getName());
Map<String, String> map = series.getTags();
if (map != null) {
for (Map.Entry<String, String> entry : map.entrySet()) {
System.out.println("tag: " + entry.getKey() + ", value: " + entry.getValue());
}
}
List<String> columns = series.getColumns();
for (String column : columns) {
System.out.println("column name: " + column);
}
List<List<Object>> values = series.getValues();
for (List<Object> ll : values) {
for (Object obj : ll) {
System.out.println("Object type: " + obj.getClass() + ", value: " + String.valueOf(obj));
}
}
}
}
//influxDB.deleteDatabase(dbName);
}
}
结果:
------------------
Measurement Name: cpu
column: time
column: ip
column: service
column: used
Object type: class java.lang.String, value: 2017-05-23T04:10:15.004064854Z
Object type: class java.lang.String, value: 192.168.1.1
Object type: class java.lang.String, value: serviceA
Object type: class java.lang.Double, value: 0.2
Object type: class java.lang.String, value: 2017-05-23T07:53:52.872Z
Object type: class java.lang.String, value: 192.168.1.22
Object type: class java.lang.String, value: serviceB
Object type: class java.lang.Double, value: 0.14
Object type: class java.lang.String, value: 2017-05-23T07:55:01.114Z
Object type: class java.lang.String, value: 192.168.1.22
Object type: class java.lang.String, value: serviceB
Object type: class java.lang.Double, value: 0.14
Object type: class java.lang.String, value: 2017-05-23T08:02:13.67Z
Object type: class java.lang.String, value: 192.168.1.22
Object type: class java.lang.String, value: serviceB
Object type: class java.lang.Double, value: 0.14
Java
的详细用法请看influxdb-java。
InfluxDB 提供了与 ElasticSearch
类似的 Restful API
。由于我暂时不打算使用这种方式,就先不介绍了,想了解的请查看InfluxDB文档。
curl -G 'http://localhost:8086/query?db=test&pretty=true' --data-urlencode 'q=SELECT * FROM "cpu"'
其它命令操作。
# 元数据备份
$ influxd backup <path-to-backup>
# 数据备份
$ influxd backup -database <mydatabase> <path-to-backup>
$ influxd backup -database telegraf -retention autogen -since 2016-02-01T00:00:00Z /tmp/backup
$ influxd backup -database mydatabase -host 10.0.0.1:8088 /tmp/remote-backup
# 恢复
$ influxd restore -metadir /var/lib/influxdb/meta /tmp/backup
$ influxd restore -database telegraf -datadir /var/lib/influxdb/data /tmp/backup
# 指定时间范围,时间格式也可以为'2017-01-03 00:00:00'
SELECT * FROM cpu WHERE time >= '2017-01-03T12:40:38.708Z' AND time <= '2017-01-03T12:40:50.708Z'
# 最近40min内的数据
SELECT * FROM cpu WHERE time >= now() - 40m
# 最近5分钟的秒级差值
SELECT derivative("queries", 1s) AS "queries" from "cpu" where time > now() - 5m
COUNT()
返回单个字段中非空值的数量。
SELECT COUNT(<field_key>) FROM <measurement_name> [WHERE <stuff>] [GROUP BY <stuff>]
DISTINCT()
返回唯一字段值的列表。
SELECT DISTINCT( [ * | <field_key> | /<regular_expression>/ ] ) FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
MEAN()
返回字段值的算术平均值。
SELECT MEAN( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
MEDIAN()
从排序的字段值列表中返回中间值。
SELECT MEDIAN( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
MODE()
返回字段值列表中最常用的值。
SELECT MODE( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
SPREAD()
返回最小和最大字段值之间的差异。
SELECT SPREAD( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
STDDEV()
返回字段值的标准偏差。
SELECT STDDEV( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
SUM()
返回字段值的总和。
SELECT SUM( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
BOTTOM()
返回最小的N个字段值。
SELECT BOTTOM(<field_key>[,<tag_key(s)>],<N> )[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
FIRST()
返回具有最早时间戳的字段值。
SELECT FIRST(<field_key>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
LAST()
返回具有最新时间戳的字段值。
SELECT LAST(<field_key>)[,<tag_key(s)>|<field_keys(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
MAX()
返回最大的字段值。
SELECT MAX(<field_key>)[,<tag_key(s)>|<field__key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
MIN()
返回最小的字段值。
SELECT MIN(<field_key>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
PERCENTILE()
返回第N个百分位数字段值。
SELECT PERCENTILE(<field_key>, <N>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
SAMPLE()
返回N个字段值的随机抽样。 SAMPLE()使用库抽样来生成随机点。
SELECT SAMPLE(<field_key>, <N>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
TOP()
返回最大的N个字段值。
SELECT TOP( <field_key>[,<tag_key(s)>],<N> )[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
CUMULATIVE_SUM()
返回后续字段值的运行总计。
SELECT CUMULATIVE_SUM( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
DERIVATIVE()
返回后续字段值之间的变化率。
SELECT DERIVATIVE( [ * | <field_key> | /<regular_expression>/ ] [ , <unit> ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
DIFFERENCE()
返回后续字段值之间的减法结果。
SELECT DIFFERENCE( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
ELAPSED()
返回后续字段值的时间戳之间的差异。
SELECT ELAPSED( [ * | <field_key> | /<regular_expression>/ ] [ , <unit> ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
MOVING_AVERAGE()
返回后续字段值窗口中的滚动平均值。
SELECT MOVING_AVERAGE( [ * | <field_key> | /<regular_expression>/ ] , <N> ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
NON_NEGATIVE_DERIVATIVE()
返回后续字段值之间的非负变化率。非负变化率包括正变化率和等于零的变化率。
SELECT NON_NEGATIVE_DERIVATIVE( [ * | <field_key> | /<regular_expression>/ ] [ , <unit> ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]
HOLT_WINTERS()
返回一个数字使用霍尔特-温特斯季节性方法预测的字段值。
SELECT HOLT_WINTERS[_WITH-FIT](<function>(<field_key>),<N>,<S>) [INTO_clause] FROM_clause [WHERE_clause] GROUP_BY_clause [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]