ClickHouse and PromQL
Querying metrics stored in ClickHouse with PromQL.
See the end-result here
Background
ClickHouse is a pretty great backend for metrics storage, especially at large scale. You get great compression, blazing fast query latency for large data sets and you can use it as a single backend for metrics, logs and traces.
You do have to use SQL, of course, to query - so in case of Grafana for example you would most likey use the excellent ClickHouse data source for Grafana. Or if Grafana is not a requirement for you you might go with the recently released ClickStack
But, what if you already have a lot Grafana dashboards but don’t really want to use Clickhouse to store your logs, traces and metrics in one place?
ClickHouse TimeSeries table engine
TimeSeries is an experimental feature UNAVAILABLE in ClickHouse Cloud.
Turns out ClickHouse already has an experimental table engine that can be directly used with Prometheus remote write and remote read protocols - the TimeSeries table engine.
Basic configuration for a table is simple:
CREATE TABLE timeseries_table ENGINE=TimeSeries
Resulting table will not bea ctually storing any data directly - everyhting will be stored in its target tables. Once the table has been created, you will be able to setup prometheus remote read and write endpoints on your ClickHouse server:
<clickhouse>
<prometheus>
<port>9363</port>
<handlers>
<prometheus_write_rule>
<url>/remote-write</url>
<handler>
<type>remote_write</type>
<database>timeseries_db</database>
<table>timeseries_table</table>
</handler>
</prometheus_write_rule>
<prometheus_read_rule>
<url>/remote-read</url>
<handler>
<type>remote_read</type>
<database>timeseries_db</database>
<table>timeseries_table</table>
</handler>
</prometheus_read_rule>
</handlers>
</prometheus>
</clickhouse>
Once you do, <clickhouse ip>:9363/remote-write and <clickhouse ip>:9363/remote-read become valid Prometheus remote write/read endpoints, that you can target from another Prometheus instance, Thanos or an OpenTelemetry exporter.
For example, you could build something like this:
architecture-beta
group cluster(cloud)[Cluster]
service ch(database)[ClickHouse] in cluster
service chr(internet)['ClickHouse /remote-read'] in cluster
service chw(internet)['ClickHouse /remote-write'] in cluster
service prom(server)[Prometheus] in cluster
service otel(server)[Otel Metrics Gateway] in cluster
service grafana(server)[Grafana] in cluster
prom:T --> B:chr
grafana:T <--> B:prom
otel:T --> B:chw
chr:R <-- L:ch
chw:L --> R:ch
- ClickHouse is setup with
/remote-readand/remote-write - OpenTelemetry collector (gateway) is setup to use
prometheusremotewriteexporter to write metrics to/remote-write - A lightweight Prometheus instance with no local retention is setup to read from
/remote-read - Grafana is configured to use the Prometheus instance as datasource
Problems
Setup above will work ok for smaller data sets, but once you start trying to run Grafana queries that try to do rates or averages of hihg-cardinality metrics over 24+ hour periods, you will quickly see that your queries will start timing out.
Some of this can be mitigated at ClickHouse level by improving the underlying database schema a little - mostly, however the problem is that unlike Thanos, for example, TimeSeries engine does not do any kind of metric downsampling for you. In other words - no matter what time period your PromQL query covers, you are always going to be receiving raw, high resolution sample data back; and number of samples is too much for Grafana to cope with.
One way to resolve this is to have ClickHouse cascade metric data as it is written into three separate downsampled “bucket” tables:
- 1 minute resolution (high resolution)
- 5 minutes resolution (medium resolution)
- 1 hour resolution (low resolution)
Then we could define separate ClickHouse remote read endpoints for each table and return data from one of those endpoints based on the time range of the incoming remote read request. For example:
| 0 to 24h | 24h to 48h | 48h to 120h | 120h + |
|---|---|---|---|
| raw data table | high res table | medium res table | low res table |
Prometheus remote read endpoint configuration does not offer any sort of routing capabilities - if you define multiple remote read endpoits in Prometheus configuration, it will simply return results from all of them. So will will need to build a proxy, that will route remote read requests from Prometheus to the correct remote read endpoint in ClickHouse:
architecture-beta
group cluster(cloud)[Cluster]
service ch(database)[ClickHouse] in cluster
service chr(internet)['ClickHouse /rm-read/raw'] in cluster
service chr1m(internet)['ClickHouse /rm-read/1m'] in cluster
service chr5m(internet)['ClickHouse /rm-read/5m'] in cluster
service chr1h(internet)['ClickHouse /rm-read/1h'] in cluster
service chw(internet)['ClickHouse /rm-write'] in cluster
service prom(server)[Prometheus] in cluster
service otel(server)[Otel Metrics Gateway] in cluster
service grafana(server)[Grafana] in cluster
service proxy(server)[Routing proxy] in cluster
junction junctionChCenter in cluster
junction junctionChLeft in cluster
junction junctionChLeftMost in cluster
junction junctionChRight in cluster
junction junctionChRightMost in cluster
junction junctionProxyCenter in cluster
junction junctionProxyLeft in cluster
junction junctionProxyLeftMost in cluster
junction junctionProxyRight in cluster
junction junctionProxyRightMost in cluster
ch:B <-- T:junctionChCenter
junctionChCenter:L -- R:junctionChLeft
junctionChLeftMost:R -- L:junctionChLeft
junctionChCenter:R -- L:junctionChRight
junctionChRightMost:L -- R:junctionChRight
junctionChCenter:B --> T:chr1m
junctionChLeftMost:B --> T:chw
junctionChLeft:B --> T:chr
junctionChRight:B --> T:chr5m
junctionChRightMost:B --> T:chr1h
proxy:T -- B:junctionProxyCenter
junctionProxyLeftMost:R -- L:junctionProxyLeft
junctionProxyLeft:R -- L:junctionProxyCenter
junctionProxyRight:L -- R:junctionProxyCenter
junctionProxyRightMost:L -- R:junctionProxyRight
junctionProxyLeftMost:T --> B:chr
junctionProxyLeft:T --> B:chr1m
junctionProxyRight:T --> B:chr5m
junctionProxyRightMost:T --> B:chr1h
prom:T --> B:proxy
grafana:T --> B:prom
otel:T --> B:chw
Implementation
Database schema
First we’ll create our new ClickHouse database schema. Schema creates 4 TimeSeries engine tables:
timeseries_raw- raw metric datatimeseries_1m- metrics downsampled to1mtimeseries_5m- metrics downsampled to5mtimeseries_1h- metrics downsampled to1h
TimeSeries engine tables don’t actually hold any data themselves - metric samples, label names, etc are hel in delegated tables, that are created automatically when you call CREATE TABLE timeseries_table ENGINE=TimeSeries - however you have an option of creating them manually as well - which is what we will do in our schema. Among other things we’ll create:
timeseries_data_table- delegated sample data table fortimeseries_rawtimeseries_1m_table- delegated sample data table fortimeseries_1mtimeseries_5m_table- delegated sample data table fortimeseries_5mtimeseries_1h_table- delegated sample data table fortimeseries_1h
The idea here is that as samples are written to timeseries_raw table via `/remote-write’, data will:
- Cascade to
timeseries_1m_tablevia a materialized view that also downsamples data to1m - Further cascade from
timeseries_1m_tabletotimeseries_5m_tablevia a materialized view that also downsamples data to5m - Further cascade from
timeseries_5m_tabletotimeseries_1h_tablevia a materialized view that also downsamples data to1h
-- ============================================================================
-- ClickHouse TimeSeries Schema for Prometheus
-- ============================================================================
CREATE DATABASE IF NOT EXISTS timeseries_db;
-- ============================================================================
-- RAW DATA TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_data_table
(
`id` UUID,
`timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
`value` Float64 CODEC(Gorilla, ZSTD(1))
)
ENGINE = ReplacingMergeTree(timestamp)
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 14 DAY DELETE
SETTINGS
index_granularity = 8192,
compress_marks = true,
compress_primary_key = true,
-- Optimized compression for time series data
min_compress_block_size = 65536,
max_compress_block_size = 1048576;
-- ============================================================================
-- 1-MINUTE AGGREGATION TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1m_table
(
`id` UUID,
`timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
`sum_val` Float64 CODEC(Gorilla, ZSTD(1)),
`cnt` UInt64 CODEC(T64, ZSTD(1)),
`min_val` Float64 CODEC(Gorilla, ZSTD(1)),
`max_val` Float64 CODEC(Gorilla, ZSTD(1)),
`value` Float64 ALIAS (sum_val / NULLIF(cnt, 0))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 90 DAY DELETE
SETTINGS
index_granularity = 8192,
compress_marks = true,
compress_primary_key = true,
min_compress_block_size = 65536,
max_compress_block_size = 1048576;
-- Materialized view: Downsample from raw data to 1-minute buckets
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_downsample_1m
TO timeseries_db.timeseries_1m_table
AS
SELECT
id,
toStartOfMinute(timestamp) AS timestamp,
sum(value) AS sum_val,
count() AS cnt,
min(value) AS min_val,
max(value) AS max_val
FROM timeseries_db.timeseries_data_table
GROUP BY id, timestamp;
-- ============================================================================
-- 5-MINUTE AGGREGATION TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_5m_table
(
`id` UUID,
`timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
`sum_val` Float64 CODEC(Gorilla, ZSTD(1)),
`cnt` UInt64 CODEC(T64, ZSTD(1)),
`min_val` Float64 CODEC(Gorilla, ZSTD(1)),
`max_val` Float64 CODEC(Gorilla, ZSTD(1)),
`value` Float64 ALIAS (sum_val / NULLIF(cnt, 0))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 1 YEAR DELETE
SETTINGS
index_granularity = 8192,
compress_marks = true,
compress_primary_key = true,
min_compress_block_size = 65536,
max_compress_block_size = 1048576;
-- Materialized view: CASCADE from 1m table
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_downsample_5m
TO timeseries_db.timeseries_5m_table
AS
SELECT
id,
toStartOfFiveMinutes(timestamp) AS timestamp,
sum(sum_val) AS sum_val,
sum(cnt) AS cnt,
min(min_val) AS min_val,
max(max_val) AS max_val
FROM timeseries_db.timeseries_1m_table
GROUP BY id, timestamp;
-- ============================================================================
-- 1-HOUR AGGREGATION TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1h_table
(
`id` UUID,
`timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
`sum_val` Float64 CODEC(Gorilla, ZSTD(1)),
`cnt` UInt64 CODEC(T64, ZSTD(1)),
`min_val` Float64 CODEC(Gorilla, ZSTD(1)),
`max_val` Float64 CODEC(Gorilla, ZSTD(1)),
`value` Float64 ALIAS (sum_val / NULLIF(cnt, 0))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 5 YEAR DELETE
SETTINGS
index_granularity = 8192,
compress_marks = true,
compress_primary_key = true,
min_compress_block_size = 65536,
max_compress_block_size = 1048576;
-- Materialized view: CASCADE from 5m table
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_downsample_1h
TO timeseries_db.timeseries_1h_table
AS
SELECT
id,
toStartOfHour(timestamp) AS timestamp,
sum(sum_val) AS sum_val,
sum(cnt) AS cnt,
min(min_val) AS min_val,
max(max_val) AS max_val
FROM timeseries_db.timeseries_5m_table
GROUP BY id, timestamp;
-- ============================================================================
-- TAGS/LABELS TABLE
-- ============================================================================
-- Engine: AggregatingMergeTree is required because:
-- 1. min_time and max_time use SimpleAggregateFunction type
-- 2. This is required by aggregate_min_time_and_max_time = true (default)
-- 3. TimeSeries engine will write aggregating inserts to update these columns
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_tags_table
(
`id` UUID DEFAULT reinterpretAsUUID(sipHash128(metric_name, all_tags)),
`metric_name` LowCardinality(String),
-- tags Map
`tags` Map(LowCardinality(String), String),
-- all_tags Map WILL contain all labels (EPHEMERAL - only for id calculation)
`all_tags` Map(String, String) EPHEMERAL,
`labels_hash` UInt64 MATERIALIZED cityHash64(
arrayStringConcat(
arraySort(arrayMap((k,v) -> concat(k, '=', v), mapKeys(tags), mapValues(tags))),
','
)
),
-- Time range columns - automatically maintained by TimeSeries engine
-- Using SimpleAggregateFunction as required by aggregate_min_time_and_max_time setting
`min_time` SimpleAggregateFunction(min, Nullable(DateTime64(3))),
`max_time` SimpleAggregateFunction(max, Nullable(DateTime64(3)))
)
ENGINE = AggregatingMergeTree
ORDER BY (metric_name, id)
SETTINGS
index_granularity = 8192,
allow_nullable_key = 1;
-- Indexes
ALTER TABLE timeseries_db.timeseries_tags_table
ADD INDEX IF NOT EXISTS idx_id id TYPE bloom_filter(0.01) GRANULARITY 4;
ALTER TABLE timeseries_db.timeseries_tags_table
ADD INDEX IF NOT EXISTS idx_labels_hash labels_hash TYPE set(1000000) GRANULARITY 4;
ALTER TABLE timeseries_db.timeseries_tags_table
ADD INDEX IF NOT EXISTS idx_metric_name metric_name TYPE bloom_filter(0.001) GRANULARITY 1;
ALTER TABLE timeseries_db.timeseries_tags_table
ADD INDEX IF NOT EXISTS idx_tags_keys mapKeys(tags) TYPE bloom_filter(0.01) GRANULARITY 1;
ALTER TABLE timeseries_db.timeseries_tags_table
ADD INDEX IF NOT EXISTS idx_tags_values mapValues(tags) TYPE bloom_filter(0.01) GRANULARITY 1;
-- ============================================================================
-- Create addtional indexes on on frequently queried labels if needed, like:
-- ALTER TABLE timeseries_db.timeseries_tags_table
-- ADD INDEX IF NOT EXISTS idx_job tags['job'] TYPE bloom_filter(0.01) GRANULARITY 1;
-- ============================================================================
-- ============================================================================
-- METRICS METADATA TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_metrics_table
(
`metric_family_name` String,
`type` String,
`unit` String,
`help` String
)
ENGINE = ReplacingMergeTree
ORDER BY metric_family_name;
-- ============================================================================
-- TIMESERIES ENGINE TABLES
-- ============================================================================
-- Using built-in min_time/max_time tracking with aggregation
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_raw
ENGINE = TimeSeries
SETTINGS
store_min_time_and_max_time = true, -- Store min/max time per series
filter_by_min_time_and_max_time = true, -- Use for query optimization
aggregate_min_time_and_max_time = true, -- Use SimpleAggregateFunction (default)
tags_to_columns = {}
DATA timeseries_db.timeseries_data_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1m
ENGINE = TimeSeries
SETTINGS
store_min_time_and_max_time = false,
filter_by_min_time_and_max_time = false,
aggregate_min_time_and_max_time = true, -- Use SimpleAggregateFunction (default)
tags_to_columns = {}
DATA timeseries_db.timeseries_1m_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_5m
ENGINE = TimeSeries
SETTINGS
store_min_time_and_max_time = false,
filter_by_min_time_and_max_time = false,
aggregate_min_time_and_max_time = true, -- Use SimpleAggregateFunction (default)
tags_to_columns = {}
DATA timeseries_db.timeseries_5m_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1h
ENGINE = TimeSeries
SETTINGS
store_min_time_and_max_time = false,
filter_by_min_time_and_max_time = false,
aggregate_min_time_and_max_time = true, -- Use SimpleAggregateFunction (default)
tags_to_columns = {}
DATA timeseries_db.timeseries_1h_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;
-- ============================================================================
-- CARDINALITY TRACKING
-- ============================================================================
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_cardinality_stats
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (metric_name, hour)
AS
SELECT
t.metric_name,
toStartOfHour(d.timestamp) AS hour,
uniqExact(d.id) AS unique_series,
count() AS data_points
FROM timeseries_db.timeseries_data_table d
INNER JOIN timeseries_db.timeseries_tags_table t ON d.id = t.id
GROUP BY t.metric_name, hour;
-- ============================================================================
-- HELPER VIEWS
-- ============================================================================
CREATE VIEW IF NOT EXISTS timeseries_db.v_series_list AS
SELECT
id,
metric_name,
tags
FROM timeseries_db.timeseries_tags_table;
Clickhouse protocol config
We’ll add ClickHouse Prometheus protocol handlers for remote write, raw remote read table and all the downsampled tables:
<clickhouse>
<logger>
<level>debug</level>
<console>true</console>
</logger>
<compression>
<case>
<method>zstd</method>
</case>
</compression>
<disable_internal_dns_cache>true</disable_internal_dns_cache>
<prometheus>
<port>9363</port>
<handlers>
<prometheus_write_rule>
<url>/remote-write</url>
<handler>
<type>remote_write</type>
<database>timeseries_db</database>
<table>timeseries_raw</table>
</handler>
</prometheus_write_rule>
<prometheus_read_rule_raw>
<url>/remote-read/raw</url>
<handler>
<type>remote_read</type>
<database>timeseries_db</database>
<table>timeseries_raw</table>
</handler>
</prometheus_read_rule_raw>
<prometheus_read_rule_1m>
<url>/prom-read/1m</url>
<handler>
<type>remote_read</type>
<database>timeseries_db</database>
<table>timeseries_1m</table>
</handler>
</prometheus_read_rule_1m>
<prometheus_read_rule_5m>
<url>/prom-read/5m</url>
<handler>
<type>remote_read</type>
<database>timeseries_db</database>
<table>timeseries_5m</table>
</handler>
</prometheus_read_rule_5m>
<prometheus_read_rule_1h>
<url>/prom-read/1h</url>
<handler>
<type>remote_read</type>
<database>timeseries_db</database>
<table>timeseries_1h</table>
</handler>
</prometheus_read_rule_1h>
</handlers>
</prometheus>
</clickhouse>
Proxy implementation
https://github.com/maratoid/tsdb-aggregate-proxy repository contains the complete proxy implementation. Run:
make build
bin/tsdb-aggregate-proxy --help
to see CLI help.
By default, proxy will attempt to to connect to ClickHouse via native protocol, and query delegated sample data tables directly, translating remote read request into SQL queries. To use ClickHouse prometheus protocol endpoints instead, use the --no-query-bypass flag.
With --query-bypass, using default ClickHouse address, table names and threshold values:
bin/tsdb-aggregate-proxy --chi-password=p@ssw0rd
Without –query-bypass`, using default ClickHouse endpoint and threshold values:
bin/tsdb-aggregate-proxy --no-query-bypass
Prometheus configuration
We’ll setup Prometheus with no local retention, pointing remote read to our proxy, and remote write to clickhouse remote write endpoint:
global:
scrape_interval: 15s
# query_log_file: /prometheus/query.log
# scrape_failure_log_file: /prometheus/scrape-failures.log
scrape_configs:
- job_name: prom-self-monitor
static_configs:
- labels:
job: prom-self-monitor
targets:
- localhost:9090
remote_read:
- url: http://proxy:9091/api/v1/read
name: clickhouse-remote-read
remote_timeout: 10m
read_recent: true
filter_external_labels: false
basic_auth:
username: ingest
password: p@ssw0rd
enable_http2: true
remote_write:
- url: http://clickhouse:9363/remote-write
name: clickhouse-remote-write
remote_timeout: 10m
basic_auth:
username: ingest
password: p@ssw0rd
enable_http2: true
Example Open Telemetry collector configuration
If using Open Telemetry collector for metrics collection you could use the prometheusremotewrite exporter to write scraped metrics to ClickHouse remote write endpoint:
...
exporters:
debug: {}
prometheusremotewrite/clickhouse:
endpoint: http://clickhouse:9363/remote-write
auth:
authenticator: basicauth/clickhouse
resource_to_telemetry_conversion:
enabled: true
send_metadata: true
timeout: 60s
tls:
insecure_skip_verify: true
...
extensions:
...
basicauth/clickhouse:
client_auth:
username: ingest
password: p@ssw0rd
...
service:
extensions:
...
- basicauth/clickhouse
pipelines:
metrics:
exporters:
- debug
- prometheusremotewrite/clickhouse
...
Test docker compose setup
See https://github.com/maratoid/tsdb-aggregate-proxy for a complete docker compose setup that will stand up:
- Clickhouse instance, configured with TimeSeries schema and prometheus protocol handlers
- Routing proxy in
--query-bypassmode - Prometheus instance configured to remote read from the proxy
- Grafana instance provisioned to use Prometheus as the default data source
- Avalanche instance to continuously generate metrics
- Open Telemetry collector setup to scrape metrics from avalanche, proxy and prometheus and write them to ClickHouse remote write endpoint.