什么是InfluxDB?
InfluxDB介绍
InfluxDB是一款用Go语言编写的开源分布式时序、事件和指标数据库,无需外部依赖。
该数据库现在主要用于存储涉及大量的时间戳数据,如DevOps监控数据,APP metrics, loT传感器数据和实时分析数据。
InfluxDB特征:
– 无结构(无模式):可以是任意数量的列
– 可以设置metric的保存时间
– 支持与时间有关的相关函数(如min、max、sum、count、mean、median等),方便统计
– 支持存储策略:可以用于数据的删改。(influxDB没有提供数据的删除与修改方法)
– 支持连续查询:是数据库中自动定时启动的一组语句,和存储策略搭配可以降低InfluxDB的系统占用量。
– 原生的HTTP支持,内置HTTP API
– 支持类似sql语法
– 支持设置数据在集群中的副本数
– 支持定期采样数据,写入另外的measurement,方便分粒度存储数据。
– 自带web管理界面,方便使用(登入方式:http://:8083)
关键概念
InfluxDB关键概念列表:
| database | field key | field set | | field value | measurement | point | | retention policy | series | tag key | | tag set | tag value | timestamp |
下面举个例子进行概念介绍:
我们虚拟一组数据,其中有一张数据表(measurement)为census,该表记录了由两个科学家(langstroth和perpetua)在两个不同的位置(1和2),统计了butterflies和honeybees的数据,时间段是2015-08-18 00: 00:00 — 2015-08-18 06: 12:00. 我们假设这些数据属于叫my_database的数据库(database),且该数据存储在autogen的存储策略(retention policy)中。
数据展示如下:
name: census
time butterflies honeybees location scientist 2015-08-18T00:00:00Z 12 23 1 langstroth 2015-08-18T00:00:00Z 1 30 1 perpetua 2015-08-18T00:06:00Z 11 28 1 langstroth 2015-08-18T00:06:00Z 3 28 1 perpetua 2015-08-18T05:54:00Z 2 11 2 langstroth 2015-08-18T06:00:00Z 1 10 2 langstroth 2015-08-18T06:06:00Z 8 23 2 perpetua 2015-08-18T06:12:00Z 7 22 2 perpetua
我们针对数据来进行概念分析:
InfluxDB是时序数据库,所以怎么都绕不开时间,第一纵列time存储着时间戳,而时间戳是与数据进行关联,这样才能将时间和数据进行展示。
接下去两纵列(butterflies和honeybees),称为Fields。Fields由field keys和field values组成。butterflies和honeybees两个字符串就是field keys;而butterflies这个field key对应的field values就是12 — 7, honeybees这个field key对应的field values就是23 — 22。Field values就是你的数据,它们可以是string、float、int或者bool等。因为influxdb是时序数据库,所以field values总是要和timestamp关联。
field set是在数据层之上应用概念,由field key和field value组成了field set,如这里有8组field set数据:
butterflies = 12 honeybees = 23 butterflies = 1 honeybees = 30 butterflies = 11 honeybees = 28 butterflies = 3 honeybees = 28 butterflies = 2 honeybees = 11 butterflies = 1 honeybees = 10 butterflies = 8 honeybees = 23 butterflies = 7 honeybees = 22
field是InfluxDB的必要结构,但也需要注意field是没有索引的。
剩下的两个纵列是location和scientist,它们是tags。
Tags也是由键值对(tag keys和tag values)组成。
这里的tag keys是字符串location和scientist;location 这个tag key有两个tag values: 1和2;scientist这个tag key也有两个tag values:perpetua和langstroth。
tag set也是数据之上的概念,是不同的tag key-value组合,这里有4组tag sets数据:
location = 1, scientist = langstroth location = 2, scientist = langstroth location = 1, scientist = perpetua location = 2, scientist = perpetua
Tags是可选的参数,也就是说你存储的数据结构中不一定非要带tags,但是它非常好用,因为可以索引。一般都会通过tags来查询数据会快很多。
measurement包含了tags、fields和time,就类似于传统数据库的表。一个measurement可以属于不同的retention policy(存储策略),存储策略描述了InfluxDB怎么去保持数据(DURATION),需要在集群中存储多少份数据副本(REPLICATION)。
示例中的数据都属于census这个measurement,而该measurement又属于autogen这个存储策略。InfluxDB一般都会创建一个default存储策略,它有无限长的持续时间和等于1的副本数。
我们了解过了measurements、tag sets和retention policies的概念后,是时候该知道series了。
在同一个database中,series由retention policy、measurement、tag sets三部分组成,在我们上面的数据中有如下4个series:
|Arbitrary series number | Retention policy | Measurement |Tag set |
| —-| —– | —- | —- |
| series 1 | autogen | census | location = 1,scientist = langstroth |
| series 2 | autogen | census | location = 2,scientist = langstroth |
| series 3 | autogen | census | location = 1,scientist = perpetua |
| series 4 | autogen | census | location = 2,scientist = perpetua |
同一个Series的数据在物理上会按照时间顺序排列存储在一起。
Series的key为measurement + 所有tags的序列化字符串。
代码结构如下:
tyep Series struct { mu sync.RWMutex Key string Tags map[string]string id uint64 measurement *Measurement }
介绍完Series后,就可以解释point了。point是在一个series中有相同时间戳的field set,也可以理解如表里的一行数据。示例中一个Point:
{{{name: census
time butterflies honeybees location scientist
2015-08-18T00:00:00Z 1 30 1 perpetua}}}
上例中,series由retention policy(autogen), measurement(census)和tag set(location=1,scientist=perpetua)进行定义。而这个point的时间戳则是2015-08-18T 00: 00: 00Z。
InfluxDB Database可以有多个users、continuous queries、retention policy、measurement。因为InfluxDB是一个结构化的数据库,我们可以轻松的去新增measurements、tags、fields。
高级概念
Retention Policy
之前讲关键性概念时有简单介绍了RP,这里会进行较详细的介绍。
InfluxDB的数据保留策略(RP)是用来定义数据在数据库中存放的时间,或者定义保存某个期间的数据。
RP在InfluxDB中是比较重要的概念,因为InfluxDB本身是没有提供数据的删除操作,所以需要通过定义RP来控制数据量的问题。
(一个数据库可以有多个RP,但是每个RP必须是独一无二的。)
在具体介绍RP之前,先介绍下另外一个跟RP相关的基础概念(shard)。
shard:
每个RP下面会存在很多shard,每个shard都存储了实际编码和压缩数据,并且不重复。例如你在创建RP时指定了shard duration为1h,那么7–8点存入shard_group0,8–9点就会存入shard_group1中。所以shard才是真实存储InfluxDB数据的地方。
每个shard都属于唯一一个shard group,一个group中会有多个shard;而每个shard包含一组特定的series;所有的points都落在给定的series中,而series是都落在给定的shard group中;
问题1:每个shard group指定了一段时间区域,而且其中有多个shard;每个shard包含一组特定的series。那么shard中存的数据是怎么区分的?series是由RP、meansurement、tags组成,那么shard的区分是根据tags??
shard duration:
shard duration决定了每个shard group存放数据的时间区域。这段时间是在定义RP时由”SHARD DURATION”字段决定。
例如你创建RP时指定了SHARD DURATION为1w,那么每个shard group的时间跨度就为1w,它将包含所有在这一周时间戳内的points。
OK,大概了解了shard之后,继续回到Retention Policy。
当你创建一个数据库时,InfluxDB会自动给你创建一个叫”autogen”的retention Policy,这个RP的数据保留时间是无限。
1.创建RP语法:
CREATE RETETION POLICY {rp_name} ON {database_name} DURATION {duration} REPLICATION {n} [SHARD DURATION {duration}] [DEFAULT]
DURATION: 用于描述数据保留时间。可设置的时间区间是1h — INF(无穷大)。
REPLICATION: 用于指定数据的备份数量,n是表示数据节点的数量。
SHARD DURATION: 用于指定shard group的时间区域,这个字段的duration是不支持INF的。默认情况下,shard group的duration由RP的duration决定。
| Retention Policy’s DURATION | Shard Group Duration |
| —- | —– |
| = 2 days and 6 mouths | 7days |
DEFAULT: 可选参数,用于指定使用新的RP来作为数据库的默认RP。
2.修改RP语法:
ALTER RETENTION POLICY {rp_name} ON {database_name} DURATION {duration} REPLICATION {n} SHARD DURATION {duration} DEFAULT
注:
后面的参数字段都一样,主要差别就在于关键字段:ALTER RETENTION POLICY
3.删除RP语法:
DROP RETENTION POLICY {rp_name} ON {database_name}
注:
即使你企图去删除一个不存在的rp,命令返回值也是空,不会返回一个错误码。
Continuous Queries
之前我们介绍了数据保存策略,数据超过保存策略里指定的时间之后,就会被删除。但我们不想完全删除这些数据,比如我们想把每秒的监控数据至少保留成每小时,就需要用到连续查询(Continuous Queries)功能。
连续查询主要用在将数据归档,以降低系统空间的占用率,但这主要是以降低数据精度为代价。
基本语法:
CREATE CONTINUOUS QUERY {cq_name} ON {database_name} BEGIN {cq_query} END
注:cq_name表示创建的Continuous query的名字;database_name表示要操作的数据库。
cq_query是操作函数,如下:
SELECT {function[s]} INTO {destnation_measurement} FROM {measurement} [WHERE {stuff}] GROUP BY time({interval})[,{tag_key[s]}]
注:destnation_measurement表示新生成的数据存放的表;measurement表示数据查询的表;
GROUP BY time表示采样分析的数据时间,比如设置1h,如果当前是17:00,那么需要计算的数据时间就是16:00 — 16:59。
例子1: 自动降低精度来采样数据
CREATE CONTINUOUS QUERY "cq_basic" ON "transportation" BEGIN SELECT mean("passengers") INTO "average_passengers" FROM "bus_data" GROUP BY time(1h) END 查看结果: > SELECT * FROM "average_passengers"
name: average_passengers
time mean 2016-08-28T07:00:00Z 7 2016-08-28T08:00:00Z 13.75
连续查询(cq_basic)通过在数据库”transportation”中的”bus_data”表,计算每小时平均的旅客数,然后在该数据库中新建”average_passengers”表,并将数据存入该表中。该cq_basic每小时执行一遍,然后将每个小时的point写入表中。
例子2:自动降低精度来采样数据,并将数据存入另外一个Retention Policy(RP)
CREATE CONTINUOUS QUERY "cq_basic_rp" ON "transportation" BEGIN SELECT mean("passengers") INTO "transportation"."three_weeks"."average_passengers" FROM "bus_data" GROUP BY time(1h) END 查看结果: > SELECT * FROM "transportation"."three_weeks"."average_passengers"
name: average_passengers
time mean 2016-08-28T07:00:00Z 7 2016-08-28T08:00:00Z 13.75
连续查询(cq_basic_rp)通过在数据库”transportation”中的”bus_data”表,计算每小时平均的旅客数,然后将数据存入transportation数据库中的three_weeks(RP)的average_passengers表中。该cq_basic_rp每小时执行一遍,然后将每个小时的point写入表中。
例子3:采用通配符,自动降低精度采样数据
CREATE CONTINUOUS QUERY "cq_basic_br" ON "transportation" BEGIN SELECT mean(*) INTO "dowmsample_transportation"."autogen".:MEASUREMENT FROM /.*/ GROUP BY time(30m),* END 查看结果: > SELECT * FROM "downsample_transportation"."autogen"."bus_data"
name: bus_data
time mean_complaints mean_passengers 2016-08-28T07:00:00Z 9 6.5 2016-08-28T07:30:00Z 9 7.5 2016-08-28T08:00:00Z 8 11.5 2016-08-28T08:30:00Z 7 16
连续查询(cq_basic_br),计算数据库(transportation)中每张表(这里只有一张表”bus_data”),每30分钟平均的旅客数和投诉量,然后将数据存入downsample_transportation数据库中的autogen(RP)中。该cq_basic_br每30分钟执行一遍,然后将每个小时的point写入表中。
例子4:配置CQ的时间偏移,来采集数据:
CREATE CONTINUOUS QUERY "cq_basic_offset" ON "transportation" BEGIN SELECT mean("passengers") INTO "average_passengers" FROM "bus_data" GROUP BY time(1h,15m) END 查看结果: > SELECT * FROM "average_passengers"
name: average_passengers
time mean 2016-08-28T07:15:00Z 7.75 //注意时间是从7:15 -- 8:15 2016-08-28T08:15:00Z 16.75
该CQ(cq_basic_offset),设置了每整点往后偏移15分钟,再进行每小时的平均值计算。比如会将8 : 15–9: 15,来代替8: 00–9: 00。
高级语法:
CREATE CONTINUOUS QUERY {cq_name} ON {database_name} RESAMPLE EVERY {val1} FOR {val2} BEGIN {cq_query} END
注意:
cq_name、database_name、cq_query和之前的基本语法都一致。
EVERY后面带的时间,表示每val1点时间就触发一次数据采样,而数据的区间是和cq_query、FOR有关。在这段时间内每val1点时间再采集一次。比如cq_query设置1h,val1设置为30m,表示在1h内会有两次数据计算。比如8点–9点之间就会有两次数据的计算,第一次计算是8:30触发的,计算的区间是8:00–8:30,第二次计算是9:00触发的,计算的区间是8:00–9:00。在目的数据库中,默认第二次的计算结果会覆盖第一次的计算结果。
FOR后面带的时间,表示修改了cq_query计算的数据区间,比如cq_query时间设置为30m,val2设置的是1h。那么cq每30m会触发一次数据计算,计算的区间是(now-1h)–now。
示例数据: 给下面的例子使用
{{{name: bus_data
time passengers
2016-08-28T06:30:00Z 2
2016-08-28T06:45:00Z 4
2016-08-28T07:00:00Z 5
2016-08-28T07:15:00Z 8
2016-08-28T07:30:00Z 8
2016-08-28T07:45:00Z 7
2016-08-28T08:00:00Z 8
2016-08-28T08:15:00Z 15
2016-08-28T08:30:00Z 15
2016-08-28T08:45:00Z 17
2016-08-28T09:00:00Z 20}}}
例子1:配置执行间隔
CREATE CONTINUOUS QUERY "cq_advanced_every" ON "transportation" RESAMPLE EVERY 30m BEGIN SELECT mean("passengers") INTO "average_passengers" FROM "bus_data" GROUP BY time(1h) END 中间的执行过程: At 8:00, cq_advanced_every executes a query with the time range WHERE time >= '7:00' AND time = '8:00' AND time = '8:00' AND time SELECT * FROM "average_passengers"
name: average_passengers
time mean 2016-08-28T07:00:00Z 7 2016-08-28T08:00:00Z 13.75
cq_advanced_every在8点–9点执行了两次。第一次8:30触发,因为cq_query设置了1h,所以数据区间是8: 00–9: 00,但因为是在8:30触发的,8: 30–9: 00的数据还没产生呢,所以实际采集的数据区间是在8: 00–8: 30,即数据(8, 15, 15), 计算的平均值为12.6667;第二次9:00触发,计算的区间是8: 00–9: 00,即数据(8, 15, 15, 17),计算的平均值为13.75.
例子2:配置重采样的时间区间
CREATE CONTINUOUS QUERY "cq_advanced_for" ON "transportation" RESAMPLE FOR 1h BEGIN SELECT mean("passengers") INTO "average_passengers" FROM "bus_data" GROUP BY time(30m) END 采样过程: At 8:00 cq_advanced_for executes a query with the time range WHERE time >= '7:00' AND time = '7:30' AND time = '8:00' AND time SELECT * FROM "average_passengers"
name: average_passengers
time mean 2016-08-28T07:00:00Z 6.5 2016-08-28T07:30:00Z 7.5 2016-08-28T08:00:00Z 11.5 2016-08-28T08:30:00Z 16
该cq_advanced_for,每30m重采样一次,采样的区间是(now-1h — now), 也就是每触发一次执行,就会进行两次计算。因为采样的区间是1h,而需要计算的是每30m的平均值。
例子3:配置cq的执行区间和时间范围
CREATE CONTINUOUS QUERY "cq_advanced_every_for" ON "transportation" RESAMPLE EVERY 1h FOR 90m BEGIN SELECT mean("passengers") INTO "average_passengers" FROM "bus_data" GROUP BY time(30m) END 采样过程: At 8:00 cq_advanced_every_for executes a query with the time range WHERE time >= '6:30' AND time = '7:30' AND time SELECT * FROM "average_passengers"
name: average_passengers
time mean 2016-08-28T06:30:00Z 3 2016-08-28T07:00:00Z 6.5 2016-08-28T07:30:00Z 7.5 2016-08-28T08:00:00Z 11.5 2016-08-28T08:30:00Z 16
该cq_advanced_every_for,需要计算30m的平均值,每1小时触发一次cq执行,采样的数据区间是90m,所以每触发一次就会计算3次平均值。
例子4:配置CQ的采样时间区间,并且填充空结果
CREATE CONTINUOUS QUERY "cq_advanced_for_fill" ON "transportation" RESAMPLE FOR 2h BEGIN SELECT mean("passengers") INTO "average_passengers" FROM "bus_data" GROUP BY time(1h) fill(1000) END 采样过程: At 6:00, cq_advanced_for_fill executes a query with the time range WHERE time >= '4:00' AND time = '5:00' AND time = {start_time} AND time $field_value' --data-urlencode 'params={"field_value":9}' {"results":[{"series":[{"name":"mymeas","columns":["time","myfieldkey","mytagkey"],"values":[["2016-09-05T18:25:30.408555195Z",10,"mytagvalue1"],["2016-09-05T18:25:39.108978991Z",111,"mytagvalue1"],["2016-09-05T18:25:46.587728107Z",111,"mytagvalue2"]]}]}]}
示例6:通过WHERE字段指定多个条件
$ curl -G 'http://localhost:8086/query?db=mydb' --data-urlencode 'q=SELECT * FROM "mymeas" WHERE "mytagkey" = $tag_value AND "myfieldkey" > $field_value' --data-urlencode 'params={"tag_value":"mytagvalue2","field_value":9}' {"results":[{"series":[{"name":"mymeas","columns":["time","myfieldkey","mytagkey"],"values":[["2016-09-05T18:25:46.587728107Z",111,"mytagvalue2"]]}]}]}
/write
/wirte只支持POST的HTTP请求,使用该Endpoint可以写数据到已存在的数据库中。
定义:
POST http://localhost:8086/write
Query参数说明:
| 参数 | 是否可选 | 描述 |
| —- | —- | —- |
| consistency=[any,one,quorum,all] | 可选 | 设置point的写入一致性,默认是one.详细的请参考HERE |
| db={db_name} | 必选 | 设置数据库名 |
| precision=[h,m,s,ms,u,n] | 可选 | 指定时间戳的精度,默认是ns |
| p={password} | 可选 | 如果设置了认证,则需要用户密码 |
| rp={rp_name} | 可选 | 设置查询的rp。如果没有设置,则查询默认的rp |
| u={username} | 可选 | 如果设置了认证,则需要用户密码 |
示例1:使用秒级的时间戳,将一个point写入数据库mydb
$ curl -i -XPOST "http://localhost:8086/write?db=mydb&precision=s" --data-binary 'mymeas,mytag=1 myfield=90 1463683075'
示例2:将一个point写入数据库mydb,并指定RP为myrp
$ curl -i -XPOST "http://localhost:8086/write?db=mydb&rp=myrp" --data-binary 'mymeas,mytag=1 myfield=90'
示例3:使用HTTP认证的方式,将一个point写入数据库mydb
$ curl -i -XPOST "http://localhost:8086/write?db=mydb&u=myusername&p=mypassword" --data-binary 'mymeas,mytag=1 myfield=91'
示例4:使用基础认证的方式,将一个point写入数据库mydb
$ curl -i -XPOST -u myusername:mypassword "http://localhost:8086/write?db=mydb" --data-binary 'mymeas,mytag=1 myfield=91'
数据请求体:
--data-binary ''
所有写入的数据必须是二进制,且使用Line Protocol格式。
示例1:写多个points到数据库中,需要使用新的一行
$ curl -i -XPOST "http://localhost:8086/write?db=mydb" --data-binary 'mymeas,mytag=3 myfield=89 mymeas,mytag=2 myfield=34 1463689152000000000'
示例2:通过导入文件的形式,写入多个points。需要使用@来指定文件
$ curl -i -XPOST "http://localhost:8086/write?db=mydb" --data-binary @data.txt 文件内容如下 mymeas,mytag1=1 value=21 1463689680000000000 mymeas,mytag1=1 value=34 1463689690000000000 mymeas,mytag2=8 value=78 1463689700000000000 mymeas,mytag3=9 value=89 1463689710000000000
响应的状态码:
| HTTP状态码 | 描述 |
| ———-| —–|
| 204 No Content | 成功 |
| 400 Bad Request | 不能接受的请求。可能是Line Protocol语法错误;写入错误的field values类型;等。。 |
| 404 Not Fount | 不能接受的请求。可能是数据库不存在,或者别的原因 |
| 500 Internal Server Error | 系统超负荷了或者明显受损。可能是用户企图去写一个不存在的RP。或者别的原因 |
InfluxDB集群化
> InfluxDB v0.12及以上版本已经不再开源其集群部分代码,转为商业版本功能。
可以参考支持集群的最新版本v0.11。
InfluxDB支持任意大小的集群,支持任何大小的数据副本(从1到集群node数量)。
InfluxDB集群支持3中类型的node: 一致性Nodes、数据Nodes、混合Nodes。集群需要运行一致性服务的花,必须要保持奇数的Nodes来组成大量的consensus group,并且保持集群在一个健康状态。
集群设置
使用3个混合Nodes来搭建集群。
1.安装InfluxDB到各个Node上,先不启用服务。
2.修改节点上的InfluxDB配置:
配置文件是”/etc/influxdb/influxdb.conf”,配置如下:
[meta] enabled = true ... bind-address = ":8088" ... http-bind-address = ":8091" ... [data] enabled = true ... [http] ... bind-address = ":8086"
– 设置[meta] enabled = true, [data] enabled = true。
– [meta] bind-address 是集群IP,用于集群范围交互。
– [meta] http-bind-address Node的IP地址,用于Node级别交互。
– [http] bind-address 用于HTTP API。
3.在每个节点上指明所有的Node信息:
设置/etc/default/influxdb文件中的INFLUXD_OPTS:
INFLUXD_OPTS=”-join :8091,:8091,:8091″
IP1,IP2,IP3分别对应着各个Nodes的IP地址或者hostname。
4.开启每个节点的服务:
sudo service influxdb start
5.确认集群是否健康:
在influx的CLI命令行中,使用SHOW SERVERS命令查看健康情况:
> SHOW SERVERS
name: data_nodes
id http_addr tcp_addr 1 :8086 :8088 2 :8086 :8088 3 :8086 :8088
name: meta_nodes
id http_addr tcp_addr 1 :8091 :8088 2 :8091 :8088 3 :8091 :8088
向集群增加节点
只要你的集群是健康的运行,就可以向集群中增加节点。
步骤都基本一样,需要注意第4步,你需要把新增的节点都改到所有的INFLUXD_OPTS中去,包括自己的。
删除集群节点
语法:
drop_server_stmt = "DROP (META | DATA) SERVER" (server_id)
示例:
– 删除consensus node,在集群中的id是1:
DROP META SERVER 1
– 删除data node,在集群中的id是2:
DROP DATA SERVER 2
– 删除hybrid node,在集群中的meta node id是3,data node id是3:
DROP META SERVER 3 DROP DATA SERVER 3
参考资料:
- 官方概念介绍: https://docs.influxdata.com/in … epts/
- InfluxDB详解之TSM存储引擎解析(一): http://blog.fatedier.com/2016/ … -one/
- InfuxDB详解之TSM存储引擎解析(二):http://blog.fatedier.com/2016/ … -two/
- InfluxDB v0.11 Cluster Setup: https://docs.influxdata.com/in … etup/
- InfluxDB引擎浅析: https://segmentfault.com/a/1190000005977485