Querying InfluxDB with Flux
fastai
Querying InfluxDB with Flux
Flux is a functional data scripting language designed specifically for time series data exploration and data processing with InfluxDB.
Key concepts
- pipe-forward operator:
|>: chain operators together by passing a table to the next step - tables: flux formats data as annotated CSV
- table group key: describe the table’s contents as a list of those columns with non-unique values
Example query
- Queries data from the
example-bucketbucket - Returns minute average cpu usage (based on tags) from the last hour
from(bucket:"example-bucket")
|> range(start:-1h)
|> filter(fn:(r)) =>
r._measurement == "cpu" and
r.cpu == "cpu-total"
)
|> aggregateWindow(every: 1m, fn:mean)
|> yield()
Elements of a Flux query
Every flux query requires at least the following parameters
- data source:
from(bucket:"bucket-name") - time range:
|> range(start: starting timestamp/time preceding "now", stop: end time preceding "now")- timestamp:
2018-11-05T23:30:00Z - duration preceding now:
-15m(always negative)
- timestamp:
- filter:
filter(fn(r)), the only parameter isfnwhich expects an anonymous function (herer)r: reports or rows- chain operators with
and
- result output:
yield()(Flux automatically assumes this,yield()is necessary only to return the results of multiple queries when it also need to name the output tables)
Exploring data with InfluxDB UI’s Data Explorer
The most straightforward way to explore InfluxDB is to use the UI’s Data Explorer
Here we use flux scripting together with the interactive UI.
In the following flux query, we do not pass all the parameters manually but allow the use the UI to set them dynamically:
|> range(start: v.timeRangeStart, stop: v.timeRangeStop): herev.timeRangeStartandv.timeRangeStopare set with time range selector dropdown and the on the graph (with the mouse)|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false): we define the window withv.windowPeriod
from(bucket: "trial_bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "boltdb_reads_total")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)