InfluxDB使用

 2017-06-10 23:09:22     InfluxDB  数据库  Grafana   4544


导读: 最近想对现有的数据可视化做一下调整,以便减少前端的开发工作量,尽量自动化,把主要精力放在数据上。发现 Grafana 非常适合,决定尝试一下。由于目前 Grafana4.2 版本不支持 Mysql 数据源(官方说到4.3版本开始支持),所以,决定再尝试一下最近比较火的时间序列数据库 InfluxDB(没有选择 ElasticSearch 是因为它本身太重,且重查询轻写入。最主要的是目前的数据量不是很大,可用服务器有限)。

InfluxDB 是一个开源分布式时序、事件和指标数据库,使用 Go 语言编写,无需外部依赖,其设计目标是实现分布式和水平伸缩扩展。

InfluxData 提供了 TICK 一套解决方案,不过使用比较多的是 InfluxDB,这里先从 InfluxDB 学习开始。

介绍


InfluxDBInfluxData 下的子项目。目前,InfluxData 由收集、存储、可视化、处理四个项目组成(简称TICK)。这些都是建立在时间序列数据基础之上的。从下图就可以看出它们的使用方式。

influxdata

这个和 ELK (Elasticsearch + Logstash + Kinaba)组合很像。但是 TICK 相对而言,更灵活、更丰富。每个项目又可以独立出来与其它框架组合使用。

安装InfluxDB


安装比较简单。

# 下载
wget https://dl.influxdata.com/influxdb/releases/influxdb_1.2.4_amd64.deb

# 安装
dpkg -i influxdb_1.2.4_amd64.deb

# 停止服务
service influxdb stop

# 启动服务
service influxdb start

#进入命令客户端
influx

安装后就可以直接使用。如果想修改默认参数,请编辑 /etc/influxdb/influxdb.conf 文件中对应的参数。

特性


  1. Time Series (时间序列):你可以使用与时间有关的相关函数(如最大,最小,求和等)
  2. Metrics(度量):你可以实时对大量数据进行计算
  3. Eevents(事件):它支持任意的事件数据

在存储上,采用 Schemaless ,列存储,压缩率高,可以存储任意数量的列。

概念说明及操作


Database

数据库,可以创建多个,不同数据库中的数据文件,存放在磁盘上的不同目录。

# 查看数据库
show databases

# 创建数据库
create database test

# 使用数据库
use test

# 删除数据库
drop database test

Measurement

相当于数据库中的表名,时间数据 timeTagField 组合成了 Measurement

# 查看所有的 measurement
show measurements

# 插入数据
insert cpu,ip=192.168.1.1 server="A",used=0.23
insert memory,host=192.168.1.1 service="A",used=26342

# 查看表中数据
> select * from cpu
name: cpu
time                ip          server used
----                --          ------ ----
1495440286179940657 192.168.1.1 1      0.23

# 删除measurement
drop measurement cpu

InfluxDB 没有创建表一说,直接 insert 就可以将数据插入 Measurement,可以理解为自动创建表。

InfluxDB 没有对 measurement 的修改操作。

写入数据库的数据采用 Line Protocol,详细参考Line Protocol Tutorial,格式如下:

insert <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [timestamp]

其中,tagtimestamp 是可选项。

Point

表里的一行数据,由时间戳(timestamp)、标签(tag)、数据(field)组成。

timestamp 相当于 point 的时间戳,field 相当于point的值,tags 相当于 point 的属性。

Timestamp

数据的时间戳,相当于主键。不赋值时,默认为系统时间。

Tag

相当于索引。采用key=value形式,多个 tag 之间用 ',' 分隔。

<tag_key>=<tag_value>,<tag_key>=<tag_value>

数据结构中,Tag 是可选的。并不是一定要有,因为是索引,所以一般建议带上。

# 查看所有 tags 的 key
show tag keys from cpu

# 查看指定 key 的 value
show tag values from cpu with key = "ip" where used=0.23

Field

相当于实际记录的数据值,也是采用key=value形式,多个 tag 之间用 ',' 分隔。

<field_key>=<field_value>,<field_key>=<field_value>
# 查看所有的 field
show field keys

注意:
每个 fieldvalue 类型必须保持一致,如果不确定,最好用 string 类型
字符串类型的 value"" 括起来
Booleans 类型,TRUEt, T, true, True, TRUEFALSEf, F, false, False, FALSE

Retention Policy

存储策略,InfluxDB 会自动清理数据,可以设置数据保留的时间,默认会创建存储策略 autogen (保留时间为永久),之后用户可以自己设置,例如保留最近 30 天的数据。

# 查看数据保持策略  
> show retention policies on "test"
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 0s       168h0m0s           1        true


# 创建新的策略
create retention policy "rp_day_30" on "test" duration 30d replication 1 default

# 修改策略
alter retention policy "rp_day_30" on "test" duration 20d replication 1 default

# 删除策略
drop retention policy "rp_day_30" on "test"

注意:
存储策略有点类似于分区数据块,修改了策略,新策略里不会有旧策略的数据。
要想查看旧策略下的数据,需要在 measurement 前加上策略名称。

# 查看默认策略中的数据
select * from "autogen".cpu

Series

Series 相当于表里的数据。

# 查看 cpu 表中的所有数据
show series from cpu
# 或
show series on test from cpu

下面是 Retention PolicyMeasurementSeries关系的大体结构图:

influxDB结构

用户授权


安装后,默认没有用户名和密码。可以通过客户端命令修改。

为了安全,建议根据使用情况,创建不同的用户。详细操作,可以查看官方文档用户授权

# 显示用户
show users

# 创建用户
create user "developer" with password '123456'

# 创建管理员权限的用户
create user "admin" with password 'admin123' with all privileges

# 给普通用户授予管理员权限
GRANT ALL PRIVILEGES TO "developer"

# 给用户指定数据库操作权限
GRANT [READ,WRITE,ALL] ON <database_name> TO <username>

GRANT ALL ON test TO "developer"

# 撤回用户的管理员权限
REVOKE ALL PRIVILEGES FROM "developer"

# 查看用户权限
show grants for "developer"

# 修改密码
set password for "developer" = '123456789'

# 删除用户
DROP USER "developer"

创建完用户后,需要开启权限验证功能才能生效。修改 /etc/influxdb/influxdb.conf

[http]
auth-enabled = true

重启服务,连接登录。

influx -host localhost -username admin -password 'admin123'

WEB-Admin


修改配置文件 /etc/influxdb/influxdb.conf,开启 WEB-Admin 操作界面。

[admin]
enabled = true
bind-address = ":8083"

UI: http://127.0.0.1:8083/

InfluxDB UI

连续查询


也就是 Continuous Queries,当数据超过保存策略里指定的时间之后,就会被删除;可以通过连续查询把原来的秒级数据,保存为分钟级或者小时级的数据,从而减小数据的占用空间。

# 查看连续查询
SHOW CONTINUOUS QUERIES;

## 创建连续查询
CREATE CONTINUOUS QUERY cq-name ON db-name BEGIN
    SELECT mean(tbl-name) INTO newtbl-name FROM tbl-name GROUP BY time(30m) END;

## 删除连续查询
DROP CONTINUOUS QUERY <cq-name> ON <db-name>;

API Client Libraries


InfluxDB 提供了丰富的多语言 API 。这里以 Java 使用为例。其它语言请看API Client Libraries

先在 Maven 加上资源库引用。

<dependency>
  <groupId>org.influxdb</groupId>
  <artifactId>influxdb-java</artifactId>
  <version>2.5</version>
</dependency>

简单以添加和查询操作为例。

public class Test {

    public static void main(String[] args) {
        InfluxDB influxDB = InfluxDBFactory.connect("http://127.0.0.1:8086", "admin", "admin123");
        String dbName = "test";
        String retentionPolicy = "rp_day_30";

        Point point1 = Point.measurement("cpu")
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .tag("ip", "192.168.1.22")
                .addField("service", "serviceB")
                .addField("used", 0.14)
                .build();

        Point point2 = Point.measurement("memory")
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .tag("host", "192.168.1.22")
                .addField("service", "serviceB")
                .addField("used", 67821)
                .build();

        influxDB.write(dbName, retentionPolicy, point1);
        influxDB.write(dbName, retentionPolicy, point2);


        Query query = new Query("SELECT * FROM cpu", dbName);
        QueryResult queryResult = influxDB.query(query);
        List<QueryResult.Result> list = queryResult.getResults();
        for (QueryResult.Result result : list) {
            System.out.println("------------------");

            List<QueryResult.Series> l = result.getSeries();

            for (QueryResult.Series series : l) {
                System.out.println("Measurement Name: " + series.getName());

                Map<String, String> map = series.getTags();
                if (map != null) {
                    for (Map.Entry<String, String> entry : map.entrySet()) {
                        System.out.println("tag: " + entry.getKey() + ", value: " + entry.getValue());
                    }
                }

                List<String> columns = series.getColumns();
                for (String column : columns) {
                    System.out.println("column name: " + column);
                }

                List<List<Object>> values = series.getValues();
                for (List<Object> ll : values) {
                    for (Object obj : ll) {
                        System.out.println("Object type: " + obj.getClass() + ", value: " + String.valueOf(obj));
                    }
                }
            }
        }
        //influxDB.deleteDatabase(dbName);
    }
}

结果:

------------------
Measurement Name: cpu
column: time
column: ip
column: service
column: used
Object type: class java.lang.String, value: 2017-05-23T04:10:15.004064854Z
Object type: class java.lang.String, value: 192.168.1.1
Object type: class java.lang.String, value: serviceA
Object type: class java.lang.Double, value: 0.2
Object type: class java.lang.String, value: 2017-05-23T07:53:52.872Z
Object type: class java.lang.String, value: 192.168.1.22
Object type: class java.lang.String, value: serviceB
Object type: class java.lang.Double, value: 0.14
Object type: class java.lang.String, value: 2017-05-23T07:55:01.114Z
Object type: class java.lang.String, value: 192.168.1.22
Object type: class java.lang.String, value: serviceB
Object type: class java.lang.Double, value: 0.14
Object type: class java.lang.String, value: 2017-05-23T08:02:13.67Z
Object type: class java.lang.String, value: 192.168.1.22
Object type: class java.lang.String, value: serviceB
Object type: class java.lang.Double, value: 0.14

Java 的详细用法请看influxdb-java

HTTP API


InfluxDB 提供了与 ElasticSearch 类似的 Restful API。由于我暂时不打算使用这种方式,就先不介绍了,想了解的请查看InfluxDB文档

curl -G 'http://localhost:8086/query?db=test&pretty=true' --data-urlencode 'q=SELECT * FROM "cpu"'

其它


其它命令操作。

# 元数据备份
$ influxd backup <path-to-backup>

# 数据备份
$ influxd backup -database <mydatabase> <path-to-backup>
$ influxd backup -database telegraf -retention autogen -since 2016-02-01T00:00:00Z /tmp/backup
$ influxd backup -database mydatabase -host 10.0.0.1:8088 /tmp/remote-backup

# 恢复
$ influxd restore -metadir /var/lib/influxdb/meta /tmp/backup
$ influxd restore -database telegraf -datadir /var/lib/influxdb/data /tmp/backup


# 指定时间范围,时间格式也可以为'2017-01-03 00:00:00'
SELECT * FROM cpu WHERE time >= '2017-01-03T12:40:38.708Z' AND time <= '2017-01-03T12:40:50.708Z'

# 最近40min内的数据
SELECT * FROM cpu WHERE time >= now() - 40m

# 最近5分钟的秒级差值
SELECT derivative("queries", 1s) AS "queries" from "cpu" where time > now() - 5m

函数


COUNT()

返回单个字段中非空值的数量。

SELECT COUNT(<field_key>) FROM <measurement_name> [WHERE <stuff>] [GROUP BY <stuff>]

DISTINCT()

返回唯一字段值的列表。

SELECT DISTINCT( [ * | <field_key> | /<regular_expression>/ ] ) FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

MEAN()

返回字段值的算术平均值。

SELECT MEAN( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

MEDIAN()

从排序的字段值列表中返回中间值。

SELECT MEDIAN( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

MODE()

返回字段值列表中最常用的值。

SELECT MODE( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

SPREAD()

返回最小和最大字段值之间的差异。

SELECT SPREAD( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

STDDEV()

返回字段值的标准偏差。

SELECT STDDEV( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

SUM()

返回字段值的总和。

SELECT SUM( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

BOTTOM()

返回最小的N个字段值。

SELECT BOTTOM(<field_key>[,<tag_key(s)>],<N> )[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

FIRST()

返回具有最早时间戳的字段值。

SELECT FIRST(<field_key>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

LAST()

返回具有最新时间戳的字段值。

SELECT LAST(<field_key>)[,<tag_key(s)>|<field_keys(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

MAX()

返回最大的字段值。

SELECT MAX(<field_key>)[,<tag_key(s)>|<field__key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

MIN()

返回最小的字段值。

SELECT MIN(<field_key>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

PERCENTILE()

返回第N个百分位数字段值。

SELECT PERCENTILE(<field_key>, <N>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

SAMPLE()

返回N个字段值的随机抽样。 SAMPLE()使用库抽样来生成随机点。

SELECT SAMPLE(<field_key>, <N>)[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

TOP()

返回最大的N个字段值。

SELECT TOP( <field_key>[,<tag_key(s)>],<N> )[,<tag_key(s)>|<field_key(s)>] [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

CUMULATIVE_SUM()

返回后续字段值的运行总计。

SELECT CUMULATIVE_SUM( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

DERIVATIVE()

返回后续字段值之间的变化率。

SELECT DERIVATIVE( [ * | <field_key> | /<regular_expression>/ ] [ , <unit> ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

DIFFERENCE()

返回后续字段值之间的减法结果。

SELECT DIFFERENCE( [ * | <field_key> | /<regular_expression>/ ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

ELAPSED()

返回后续字段值的时间戳之间的差异。

SELECT ELAPSED( [ * | <field_key> | /<regular_expression>/ ] [ , <unit> ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

MOVING_AVERAGE()

返回后续字段值窗口中的滚动平均值。

SELECT MOVING_AVERAGE( [ * | <field_key> | /<regular_expression>/ ] , <N> ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

NON_NEGATIVE_DERIVATIVE()

返回后续字段值之间的非负变化率。非负变化率包括正变化率和等于零的变化率。

SELECT NON_NEGATIVE_DERIVATIVE( [ * | <field_key> | /<regular_expression>/ ] [ , <unit> ] ) [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

HOLT_WINTERS()

返回一个数字使用霍尔特-温特斯季节性方法预测的字段值。

SELECT HOLT_WINTERS[_WITH-FIT](<function>(<field_key>),<N>,<S>) [INTO_clause] FROM_clause [WHERE_clause] GROUP_BY_clause [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause]

参考:
InfluxDB 官方文档
InfluxDB介绍
InfluxDB使用笔记
InfluxDB CLI/Shell