ClickHouse and PromQL

TLDR

See the end-result here

Background

ClickHouse is a pretty great backend for metrics storage, especially at large scale. You get great compression, blazing fast query latency for large data sets and you can use it as a single backend for metrics, logs and traces.

You do have to use SQL, of course, to query - so in case of Grafana for example you would most likey use the excellent ClickHouse data source for Grafana. Or if Grafana is not a requirement for you you might go with the recently released ClickStack

But, what if you already have a lot Grafana dashboards but don’t really want to use Clickhouse to store your logs, traces and metrics in one place?

ClickHouse TimeSeries table engine

ClickHouse experimental feature

TimeSeries is an experimental feature UNAVAILABLE in ClickHouse Cloud.

Turns out ClickHouse already has an experimental table engine that can be directly used with Prometheus remote write and remote read protocols - the TimeSeries table engine.

Basic configuration for a table is simple:

CREATE TABLE timeseries_table ENGINE=TimeSeries

Resulting table will not bea ctually storing any data directly - everyhting will be stored in its target tables. Once the table has been created, you will be able to setup prometheus remote read and write endpoints on your ClickHouse server:

<clickhouse>   
  <prometheus>
    <port>9363</port>
    <handlers>
      <prometheus_write_rule>
        <url>/remote-write</url>
          <handler>
              <type>remote_write</type>
              <database>timeseries_db</database>
              <table>timeseries_table</table>
          </handler>
      </prometheus_write_rule>
      <prometheus_read_rule>
        <url>/remote-read</url>
        <handler>
          <type>remote_read</type>
          <database>timeseries_db</database>
          <table>timeseries_table</table>
        </handler>
      </prometheus_read_rule>
    </handlers>
  </prometheus>
</clickhouse>

Once you do, <clickhouse ip>:9363/remote-write and <clickhouse ip>:9363/remote-read become valid Prometheus remote write/read endpoints, that you can target from another Prometheus instance, Thanos or an OpenTelemetry exporter.

For example, you could build something like this:

    architecture-beta
    group cluster(cloud)[Cluster]

    service ch(database)[ClickHouse] in cluster
    service chr(internet)['ClickHouse /remote-read'] in cluster
    service chw(internet)['ClickHouse /remote-write'] in cluster
    service prom(server)[Prometheus] in cluster
    service otel(server)[Otel Metrics Gateway] in cluster
    service grafana(server)[Grafana] in cluster

    prom:T --> B:chr
    grafana:T <--> B:prom
    otel:T --> B:chw
    chr:R <-- L:ch 
    chw:L --> R:ch
  • ClickHouse is setup with /remote-read and /remote-write
  • OpenTelemetry collector (gateway) is setup to use prometheusremotewrite exporter to write metrics to /remote-write
  • A lightweight Prometheus instance with no local retention is setup to read from /remote-read
  • Grafana is configured to use the Prometheus instance as datasource

Problems

Setup above will work ok for smaller data sets, but once you start trying to run Grafana queries that try to do rates or averages of hihg-cardinality metrics over 24+ hour periods, you will quickly see that your queries will start timing out.

Some of this can be mitigated at ClickHouse level by improving the underlying database schema a little - mostly, however the problem is that unlike Thanos, for example, TimeSeries engine does not do any kind of metric downsampling for you. In other words - no matter what time period your PromQL query covers, you are always going to be receiving raw, high resolution sample data back; and number of samples is too much for Grafana to cope with.

One way to resolve this is to have ClickHouse cascade metric data as it is written into three separate downsampled “bucket” tables:

  • 1 minute resolution (high resolution)
  • 5 minutes resolution (medium resolution)
  • 1 hour resolution (low resolution)

Then we could define separate ClickHouse remote read endpoints for each table and return data from one of those endpoints based on the time range of the incoming remote read request. For example:

0 to 24h24h to 48h48h to 120h120h +
raw data tablehigh res tablemedium res tablelow res table

Prometheus remote read endpoint configuration does not offer any sort of routing capabilities - if you define multiple remote read endpoits in Prometheus configuration, it will simply return results from all of them. So will will need to build a proxy, that will route remote read requests from Prometheus to the correct remote read endpoint in ClickHouse:

    architecture-beta
    group cluster(cloud)[Cluster]

    service ch(database)[ClickHouse] in cluster
    service chr(internet)['ClickHouse /rm-read/raw'] in cluster
    service chr1m(internet)['ClickHouse /rm-read/1m'] in cluster
    service chr5m(internet)['ClickHouse /rm-read/5m'] in cluster
    service chr1h(internet)['ClickHouse /rm-read/1h'] in cluster
    service chw(internet)['ClickHouse /rm-write'] in cluster
    service prom(server)[Prometheus] in cluster
    service otel(server)[Otel Metrics Gateway] in cluster
    service grafana(server)[Grafana] in cluster
    service proxy(server)[Routing proxy] in cluster
    junction junctionChCenter in cluster
    junction junctionChLeft in cluster
    junction junctionChLeftMost in cluster
    junction junctionChRight in cluster
    junction junctionChRightMost in cluster
    junction junctionProxyCenter in cluster
    junction junctionProxyLeft in cluster
    junction junctionProxyLeftMost in cluster
    junction junctionProxyRight in cluster
    junction junctionProxyRightMost in cluster
    ch:B <-- T:junctionChCenter
    junctionChCenter:L -- R:junctionChLeft
    junctionChLeftMost:R -- L:junctionChLeft
    junctionChCenter:R -- L:junctionChRight
    junctionChRightMost:L -- R:junctionChRight
    junctionChCenter:B --> T:chr1m
    junctionChLeftMost:B --> T:chw
    junctionChLeft:B --> T:chr
    junctionChRight:B --> T:chr5m
    junctionChRightMost:B --> T:chr1h
    proxy:T -- B:junctionProxyCenter
    junctionProxyLeftMost:R -- L:junctionProxyLeft
    junctionProxyLeft:R -- L:junctionProxyCenter
    junctionProxyRight:L -- R:junctionProxyCenter
    junctionProxyRightMost:L -- R:junctionProxyRight
    junctionProxyLeftMost:T --> B:chr
    junctionProxyLeft:T --> B:chr1m
    junctionProxyRight:T --> B:chr5m
    junctionProxyRightMost:T --> B:chr1h
    prom:T --> B:proxy
    grafana:T --> B:prom
    otel:T --> B:chw

Implementation

Database schema

First we’ll create our new ClickHouse database schema. Schema creates 4 TimeSeries engine tables:

  • timeseries_raw - raw metric data
  • timeseries_1m - metrics downsampled to 1m
  • timeseries_5m - metrics downsampled to 5m
  • timeseries_1h - metrics downsampled to 1h

TimeSeries engine tables don’t actually hold any data themselves - metric samples, label names, etc are hel in delegated tables, that are created automatically when you call CREATE TABLE timeseries_table ENGINE=TimeSeries - however you have an option of creating them manually as well - which is what we will do in our schema. Among other things we’ll create:

  • timeseries_data_table - delegated sample data table for timeseries_raw
  • timeseries_1m_table - delegated sample data table for timeseries_1m
  • timeseries_5m_table - delegated sample data table for timeseries_5m
  • timeseries_1h_table - delegated sample data table for timeseries_1h

The idea here is that as samples are written to timeseries_raw table via `/remote-write’, data will:

  1. Cascade to timeseries_1m_table via a materialized view that also downsamples data to 1m
  2. Further cascade from timeseries_1m_table to timeseries_5m_table via a materialized view that also downsamples data to 5m
  3. Further cascade from timeseries_5m_table to timeseries_1h_table via a materialized view that also downsamples data to 1h
-- ============================================================================
-- ClickHouse TimeSeries Schema for Prometheus
-- ============================================================================
CREATE DATABASE IF NOT EXISTS timeseries_db;

-- ============================================================================
-- RAW DATA TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_data_table
(
    `id` UUID,
    `timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
    `value` Float64 CODEC(Gorilla, ZSTD(1))
)
ENGINE = ReplacingMergeTree(timestamp)
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 14 DAY DELETE
SETTINGS 
    index_granularity = 8192,
    compress_marks = true,
    compress_primary_key = true,
    -- Optimized compression for time series data
    min_compress_block_size = 65536,
    max_compress_block_size = 1048576;

-- ============================================================================
-- 1-MINUTE AGGREGATION TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1m_table
(
    `id` UUID,
    `timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
    `sum_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `cnt` UInt64 CODEC(T64, ZSTD(1)),
    `min_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `max_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `value` Float64 ALIAS (sum_val / NULLIF(cnt, 0))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 90 DAY DELETE
SETTINGS 
    index_granularity = 8192,
    compress_marks = true,
    compress_primary_key = true,
    min_compress_block_size = 65536,
    max_compress_block_size = 1048576;

-- Materialized view: Downsample from raw data to 1-minute buckets
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_downsample_1m
TO timeseries_db.timeseries_1m_table
AS
SELECT
    id,
    toStartOfMinute(timestamp) AS timestamp,
    sum(value) AS sum_val,
    count() AS cnt,
    min(value) AS min_val,
    max(value) AS max_val
FROM timeseries_db.timeseries_data_table
GROUP BY id, timestamp;

-- ============================================================================
-- 5-MINUTE AGGREGATION TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_5m_table
(
    `id` UUID,
    `timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
    `sum_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `cnt` UInt64 CODEC(T64, ZSTD(1)),
    `min_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `max_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `value` Float64 ALIAS (sum_val / NULLIF(cnt, 0))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 1 YEAR DELETE
SETTINGS 
    index_granularity = 8192,
    compress_marks = true,
    compress_primary_key = true,
    min_compress_block_size = 65536,
    max_compress_block_size = 1048576;

-- Materialized view: CASCADE from 1m table
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_downsample_5m
TO timeseries_db.timeseries_5m_table
AS
SELECT
    id,
    toStartOfFiveMinutes(timestamp) AS timestamp,
    sum(sum_val) AS sum_val,
    sum(cnt) AS cnt,
    min(min_val) AS min_val,
    max(max_val) AS max_val
FROM timeseries_db.timeseries_1m_table
GROUP BY id, timestamp;

-- ============================================================================
-- 1-HOUR AGGREGATION TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1h_table
(
    `id` UUID,
    `timestamp` DateTime64(3) CODEC(Delta, ZSTD(1)),
    `sum_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `cnt` UInt64 CODEC(T64, ZSTD(1)),
    `min_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `max_val` Float64 CODEC(Gorilla, ZSTD(1)),
    `value` Float64 ALIAS (sum_val / NULLIF(cnt, 0))
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (id, timestamp)
TTL timestamp + INTERVAL 5 YEAR DELETE
SETTINGS 
    index_granularity = 8192,
    compress_marks = true,
    compress_primary_key = true,
    min_compress_block_size = 65536,
    max_compress_block_size = 1048576;

-- Materialized view: CASCADE from 5m table
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_downsample_1h
TO timeseries_db.timeseries_1h_table
AS
SELECT
    id,
    toStartOfHour(timestamp) AS timestamp,
    sum(sum_val) AS sum_val,
    sum(cnt) AS cnt,
    min(min_val) AS min_val,
    max(max_val) AS max_val
FROM timeseries_db.timeseries_5m_table
GROUP BY id, timestamp;

-- ============================================================================
-- TAGS/LABELS TABLE
-- ============================================================================
-- Engine: AggregatingMergeTree is required because:
-- 1. min_time and max_time use SimpleAggregateFunction type
-- 2. This is required by aggregate_min_time_and_max_time = true (default)
-- 3. TimeSeries engine will write aggregating inserts to update these columns
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_tags_table
(
    `id` UUID DEFAULT reinterpretAsUUID(sipHash128(metric_name, all_tags)),
    `metric_name` LowCardinality(String),
    -- tags Map
    `tags` Map(LowCardinality(String), String),
    -- all_tags Map WILL contain all labels (EPHEMERAL - only for id calculation)
    `all_tags` Map(String, String) EPHEMERAL,
    `labels_hash` UInt64 MATERIALIZED cityHash64(
        arrayStringConcat(
            arraySort(arrayMap((k,v) -> concat(k, '=', v), mapKeys(tags), mapValues(tags))),
            ','
        )
    ),
    
    -- Time range columns - automatically maintained by TimeSeries engine
    -- Using SimpleAggregateFunction as required by aggregate_min_time_and_max_time setting
    `min_time` SimpleAggregateFunction(min, Nullable(DateTime64(3))),
    `max_time` SimpleAggregateFunction(max, Nullable(DateTime64(3)))
)
ENGINE = AggregatingMergeTree
ORDER BY (metric_name, id)
SETTINGS 
    index_granularity = 8192,
    allow_nullable_key = 1;

-- Indexes
ALTER TABLE timeseries_db.timeseries_tags_table
    ADD INDEX IF NOT EXISTS idx_id id TYPE bloom_filter(0.01) GRANULARITY 4;

ALTER TABLE timeseries_db.timeseries_tags_table
    ADD INDEX IF NOT EXISTS idx_labels_hash labels_hash TYPE set(1000000) GRANULARITY 4;

ALTER TABLE timeseries_db.timeseries_tags_table
    ADD INDEX IF NOT EXISTS idx_metric_name metric_name TYPE bloom_filter(0.001) GRANULARITY 1;

ALTER TABLE timeseries_db.timeseries_tags_table
    ADD INDEX IF NOT EXISTS idx_tags_keys mapKeys(tags) TYPE bloom_filter(0.01) GRANULARITY 1;

ALTER TABLE timeseries_db.timeseries_tags_table
    ADD INDEX IF NOT EXISTS idx_tags_values mapValues(tags) TYPE bloom_filter(0.01) GRANULARITY 1;

-- ============================================================================
-- Create addtional indexes on on frequently queried labels if needed, like:
-- ALTER TABLE timeseries_db.timeseries_tags_table
--    ADD INDEX IF NOT EXISTS idx_job tags['job'] TYPE bloom_filter(0.01) GRANULARITY 1;
-- ============================================================================

-- ============================================================================
-- METRICS METADATA TABLE
-- ============================================================================
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_metrics_table
(
    `metric_family_name` String,
    `type` String,
    `unit` String,
    `help` String
)
ENGINE = ReplacingMergeTree
ORDER BY metric_family_name;

-- ============================================================================
-- TIMESERIES ENGINE TABLES
-- ============================================================================
-- Using built-in min_time/max_time tracking with aggregation
CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_raw
ENGINE = TimeSeries
SETTINGS 
    store_min_time_and_max_time = true,           -- Store min/max time per series
    filter_by_min_time_and_max_time = true,       -- Use for query optimization
    aggregate_min_time_and_max_time = true,       -- Use SimpleAggregateFunction (default)
    tags_to_columns = {}
DATA timeseries_db.timeseries_data_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;

CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1m
ENGINE = TimeSeries
SETTINGS 
    store_min_time_and_max_time = false,
    filter_by_min_time_and_max_time = false,
    aggregate_min_time_and_max_time = true,       -- Use SimpleAggregateFunction (default)
    tags_to_columns = {}
DATA timeseries_db.timeseries_1m_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;

CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_5m
ENGINE = TimeSeries
SETTINGS 
    store_min_time_and_max_time = false,
    filter_by_min_time_and_max_time = false,
    aggregate_min_time_and_max_time = true,       -- Use SimpleAggregateFunction (default)
    tags_to_columns = {}
DATA timeseries_db.timeseries_5m_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;

CREATE TABLE IF NOT EXISTS timeseries_db.timeseries_1h
ENGINE = TimeSeries
SETTINGS 
    store_min_time_and_max_time = false,
    filter_by_min_time_and_max_time = false,
    aggregate_min_time_and_max_time = true,       -- Use SimpleAggregateFunction (default)
    tags_to_columns = {}
DATA timeseries_db.timeseries_1h_table
TAGS timeseries_db.timeseries_tags_table
METRICS timeseries_db.timeseries_metrics_table;

-- ============================================================================
-- CARDINALITY TRACKING
-- ============================================================================
CREATE MATERIALIZED VIEW IF NOT EXISTS timeseries_db.mv_cardinality_stats
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (metric_name, hour)
AS
SELECT
    t.metric_name,
    toStartOfHour(d.timestamp) AS hour,
    uniqExact(d.id) AS unique_series,
    count() AS data_points
FROM timeseries_db.timeseries_data_table d
INNER JOIN timeseries_db.timeseries_tags_table t ON d.id = t.id
GROUP BY t.metric_name, hour;

-- ============================================================================
-- HELPER VIEWS
-- ============================================================================
CREATE VIEW IF NOT EXISTS timeseries_db.v_series_list AS
SELECT
    id, 
    metric_name, 
    tags
FROM timeseries_db.timeseries_tags_table;

Clickhouse protocol config

We’ll add ClickHouse Prometheus protocol handlers for remote write, raw remote read table and all the downsampled tables:

<clickhouse>
    <logger>
        <level>debug</level>
        <console>true</console>
    </logger>
    <compression>
        <case>
            <method>zstd</method>
        </case>
    </compression>
    <disable_internal_dns_cache>true</disable_internal_dns_cache>
    
    <prometheus>
        <port>9363</port>
        <handlers>
            <prometheus_write_rule>
                <url>/remote-write</url>
                <handler>
                    <type>remote_write</type>
                    <database>timeseries_db</database>
                    <table>timeseries_raw</table>
                </handler>
            </prometheus_write_rule>
            <prometheus_read_rule_raw>
                <url>/remote-read/raw</url>
                <handler>
                    <type>remote_read</type>
                    <database>timeseries_db</database>
                    <table>timeseries_raw</table>
                </handler>
            </prometheus_read_rule_raw>
            <prometheus_read_rule_1m>
                <url>/prom-read/1m</url>
                <handler>
                    <type>remote_read</type>
                    <database>timeseries_db</database>
                    <table>timeseries_1m</table>
                </handler>
            </prometheus_read_rule_1m>
            <prometheus_read_rule_5m>
                <url>/prom-read/5m</url>
                <handler>
                    <type>remote_read</type>
                    <database>timeseries_db</database>
                    <table>timeseries_5m</table>
                </handler>
            </prometheus_read_rule_5m>
            <prometheus_read_rule_1h>
                <url>/prom-read/1h</url>
                <handler>
                    <type>remote_read</type>
                    <database>timeseries_db</database>
                    <table>timeseries_1h</table>
                </handler>
            </prometheus_read_rule_1h>
        </handlers>
    </prometheus>
</clickhouse>

Proxy implementation

https://github.com/maratoid/tsdb-aggregate-proxy repository contains the complete proxy implementation. Run:

make build
bin/tsdb-aggregate-proxy --help

to see CLI help.

By default, proxy will attempt to to connect to ClickHouse via native protocol, and query delegated sample data tables directly, translating remote read request into SQL queries. To use ClickHouse prometheus protocol endpoints instead, use the --no-query-bypass flag.

With --query-bypass, using default ClickHouse address, table names and threshold values:

bin/tsdb-aggregate-proxy --chi-password=p@ssw0rd 

Without –query-bypass`, using default ClickHouse endpoint and threshold values:

bin/tsdb-aggregate-proxy --no-query-bypass

Prometheus configuration

We’ll setup Prometheus with no local retention, pointing remote read to our proxy, and remote write to clickhouse remote write endpoint:

global:
  scrape_interval: 15s
  # query_log_file: /prometheus/query.log
  # scrape_failure_log_file: /prometheus/scrape-failures.log
scrape_configs:
  - job_name: prom-self-monitor
    static_configs:
      - labels: 
          job: prom-self-monitor
        targets:
          - localhost:9090
remote_read:
  - url: http://proxy:9091/api/v1/read
    name: clickhouse-remote-read
    remote_timeout: 10m
    read_recent: true
    filter_external_labels: false
    basic_auth:
      username: ingest
      password: p@ssw0rd
    enable_http2: true
remote_write:
  - url: http://clickhouse:9363/remote-write
    name: clickhouse-remote-write
    remote_timeout: 10m
    basic_auth:
      username: ingest
      password: p@ssw0rd
    enable_http2: true

Example Open Telemetry collector configuration

If using Open Telemetry collector for metrics collection you could use the prometheusremotewrite exporter to write scraped metrics to ClickHouse remote write endpoint:

...
exporters:
  debug: {}
  prometheusremotewrite/clickhouse:
    endpoint: http://clickhouse:9363/remote-write
    auth:
      authenticator: basicauth/clickhouse
    resource_to_telemetry_conversion:
      enabled: true
    send_metadata: true
    timeout: 60s
    tls:
      insecure_skip_verify: true
...
extensions:
  ...
  basicauth/clickhouse:
    client_auth:
      username: ingest
      password: p@ssw0rd
...
service:
  extensions:
    ...
    - basicauth/clickhouse
  pipelines:
    metrics:
      exporters:
        - debug
        - prometheusremotewrite/clickhouse
...

Test docker compose setup

See https://github.com/maratoid/tsdb-aggregate-proxy for a complete docker compose setup that will stand up:

  • Clickhouse instance, configured with TimeSeries schema and prometheus protocol handlers
  • Routing proxy in --query-bypass mode
  • Prometheus instance configured to remote read from the proxy
  • Grafana instance provisioned to use Prometheus as the default data source
  • Avalanche instance to continuously generate metrics
  • Open Telemetry collector setup to scrape metrics from avalanche, proxy and prometheus and write them to ClickHouse remote write endpoint.