From f1696bb1e2af985f51d234e1a3bf02f51ff241b6 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Tue, 26 May 2026 14:34:01 -0400 Subject: [PATCH 01/10] feat: adds direct_read optimization by default for logs and traces --- .changeset/direct-read-map-default.md | 36 ++++ docker/clickhouse/local/init-db-e2e.sh | 4 + .../schema/seed/00002_otel_logs.sql | 9 +- .../schema/seed/00005_otel_traces.sql | 6 +- .../app/src/__tests__/searchFilters.test.ts | 3 + .../src/__tests__/clickhouseVersion.test.ts | 176 +++++++++++++++ .../src/__tests__/queryParser.test.ts | 202 ++++++++++++++++++ .../src/core/clickhouseVersion.ts | 114 ++++++++++ packages/common-utils/src/core/metadata.ts | 34 +++ packages/common-utils/src/queryParser.ts | 17 +- 10 files changed, 591 insertions(+), 10 deletions(-) create mode 100644 .changeset/direct-read-map-default.md create mode 100644 packages/common-utils/src/__tests__/clickhouseVersion.test.ts create mode 100644 packages/common-utils/src/core/clickhouseVersion.ts diff --git a/.changeset/direct-read-map-default.md b/.changeset/direct-read-map-default.md new file mode 100644 index 0000000000..c7435cb048 --- /dev/null +++ b/.changeset/direct-read-map-default.md @@ -0,0 +1,36 @@ +--- +"@hyperdx/common-utils": minor +"@hyperdx/app": patch +--- + +feat: default the direct_read map column optimization on supported ClickHouse versions + +The full-text-search logs schema (`00002_otel_logs.sql`) now ships with +`ResourceAttributeItems`, `ScopeAttributeItems`, and `LogAttributeItems` +ALIAS columns plus their `text(tokenizer='array')` skip indexes. The +traces schema (`00005_otel_traces.sql`) similarly gains +`ResourceAttributeItems` and `SpanAttributeItems` ALIAS columns with +matching items indexes. New installs and freshly migrated tables get +the optimization automatically — no manual `ALTER TABLE` required. + +Note: the traces table previously used only `bloom_filter` skip indexes +and worked on any ClickHouse version. The added `text(tokenizer='array')` +items indexes raise the minimum ClickHouse version required to **create** +the traces table to **>= 26.2**. Existing tables on older clusters are +unaffected (`CREATE TABLE IF NOT EXISTS` is a no-op). + +At query time, the app gates the `Map['key'] = 'value'` → +`has(, concat('key', '=', 'value'))` rewrite on the connected +ClickHouse server version (`SELECT version()`, cached per connection). +The direct_read feature was backported into multiple stable 26.x release +lines, so the gate uses a per-branch minimum: + +- 26.2 line: >= 26.2.19.43 +- 26.3 line: >= 26.3.12.3 +- 26.4 line: >= 26.4.3.37 +- 26.5+ : always supported + +Servers below their branch's threshold continue to compile filters into +the original Map-subscript form, since they cannot perform the direct_read +against the Map's underlying tuple storage that makes the optimization +profitable. diff --git a/docker/clickhouse/local/init-db-e2e.sh b/docker/clickhouse/local/init-db-e2e.sh index b28676aa64..0ac945bf9e 100755 --- a/docker/clickhouse/local/init-db-e2e.sh +++ b/docker/clickhouse/local/init-db-e2e.sh @@ -81,12 +81,16 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.e2e_otel_traces \`Links.Attributes\` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), \`__hdx_materialized_rum.sessionId\` String MATERIALIZED ResourceAttributes['rum.sessionId'] CODEC(ZSTD(1)), \`SampleRate\` UInt64 MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) CODEC(T64, ZSTD(1)), + \`ResourceAttributeItems\` Array(String) ALIAS arrayMap((arr) -> concat(arr.1, '=', arr.2), ResourceAttributes::Array(Tuple(String, String))), + \`SpanAttributeItems\` Array(String) ALIAS arrayMap((arr) -> concat(arr.1, '=', arr.2), SpanAttributes::Array(Tuple(String, String))), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_rum_session_id __hdx_materialized_rum.sessionId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_items ResourceAttributeItems TYPE text(tokenizer = 'array'), INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_items SpanAttributeItems TYPE text(tokenizer = 'array'), INDEX idx_duration Duration TYPE minmax GRANULARITY 1, INDEX idx_lower_span_name lower(SpanName) TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8 ) diff --git a/docker/otel-collector/schema/seed/00002_otel_logs.sql b/docker/otel-collector/schema/seed/00002_otel_logs.sql index c5e96b260c..c329f59783 100644 --- a/docker/otel-collector/schema/seed/00002_otel_logs.sql +++ b/docker/otel-collector/schema/seed/00002_otel_logs.sql @@ -25,13 +25,16 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_logs `__hdx_materialized_k8s.pod.name` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.pod.name'] CODEC(ZSTD(1)), `__hdx_materialized_k8s.pod.uid` LowCardinality(String) MATERIALIZED ResourceAttributes['k8s.pod.uid'] CODEC(ZSTD(1)), `__hdx_materialized_deployment.environment.name` LowCardinality(String) MATERIALIZED ResourceAttributes['deployment.environment.name'] CODEC(ZSTD(1)), + `ResourceAttributeItems` Array(String) ALIAS arrayMap((arr) -> concat(arr.1, '=', arr.2), ResourceAttributes::Array(Tuple(String, String))), + `ScopeAttributeItems` Array(String) ALIAS arrayMap((arr) -> concat(arr.1, '=', arr.2), ScopeAttributes::Array(Tuple(String, String))), + `LogAttributeItems` Array(String) ALIAS arrayMap((arr) -> concat(arr.1, '=', arr.2), LogAttributes::Array(Tuple(String, String))), INDEX idx_trace_id TraceId TYPE text(tokenizer = 'array'), INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE text(tokenizer = 'array'), - INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE text(tokenizer = 'array'), + INDEX idx_res_attr_items ResourceAttributeItems TYPE text(tokenizer = 'array'), INDEX idx_scope_attr_key mapKeys(ScopeAttributes) TYPE text(tokenizer = 'array'), - INDEX idx_scope_attr_value mapValues(ScopeAttributes) TYPE text(tokenizer = 'array'), + INDEX idx_scope_attr_items ScopeAttributeItems TYPE text(tokenizer = 'array'), INDEX idx_log_attr_key mapKeys(LogAttributes) TYPE text(tokenizer = 'array'), - INDEX idx_log_attr_value mapValues(LogAttributes) TYPE text(tokenizer = 'array'), + INDEX idx_log_attr_items LogAttributeItems TYPE text(tokenizer = 'array'), INDEX idx_lower_body lower(Body) TYPE text(tokenizer = 'splitByNonAlpha') ) ENGINE = MergeTree diff --git a/docker/otel-collector/schema/seed/00005_otel_traces.sql b/docker/otel-collector/schema/seed/00005_otel_traces.sql index 853ea471fe..699e1d30d7 100644 --- a/docker/otel-collector/schema/seed/00005_otel_traces.sql +++ b/docker/otel-collector/schema/seed/00005_otel_traces.sql @@ -25,12 +25,14 @@ CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_traces `Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), `__hdx_materialized_rum.sessionId` String MATERIALIZED ResourceAttributes['rum.sessionId'] CODEC(ZSTD(1)), `SampleRate` UInt64 MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) CODEC(T64, ZSTD(1)), + `ResourceAttributeItems` Array(String) ALIAS arrayMap((arr) -> concat(arr.1, '=', arr.2), ResourceAttributes::Array(Tuple(String, String))), + `SpanAttributeItems` Array(String) ALIAS arrayMap((arr) -> concat(arr.1, '=', arr.2), SpanAttributes::Array(Tuple(String, String))), INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_rum_session_id __hdx_materialized_rum.sessionId TYPE bloom_filter(0.001) GRANULARITY 1, INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_items ResourceAttributeItems TYPE text(tokenizer = 'array'), INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, - INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_items SpanAttributeItems TYPE text(tokenizer = 'array'), INDEX idx_duration Duration TYPE minmax GRANULARITY 1, INDEX idx_lower_span_name lower(SpanName) TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8 ) diff --git a/packages/app/src/__tests__/searchFilters.test.ts b/packages/app/src/__tests__/searchFilters.test.ts index b108556838..92c82bd8b2 100644 --- a/packages/app/src/__tests__/searchFilters.test.ts +++ b/packages/app/src/__tests__/searchFilters.test.ts @@ -1043,6 +1043,9 @@ describe('searchFilters', () => { }, ]); metadata.getSetting = jest.fn().mockResolvedValue('0'); + metadata.getServerVersion = jest + .fn() + .mockResolvedValue([26, 5, 0, 0] as const); const serializer = new CustomSchemaSQLSerializerV2({ metadata, diff --git a/packages/common-utils/src/__tests__/clickhouseVersion.test.ts b/packages/common-utils/src/__tests__/clickhouseVersion.test.ts new file mode 100644 index 0000000000..3232cfdec6 --- /dev/null +++ b/packages/common-utils/src/__tests__/clickhouseVersion.test.ts @@ -0,0 +1,176 @@ +import { + compareClickHouseVersion, + isClickHouseVersionAtLeast, + parseClickHouseVersion, + supportsDirectReadMap, +} from '@/core/clickhouseVersion'; + +type ClickHouseVersionTuple = readonly [number, number, number, number]; + +describe('parseClickHouseVersion', () => { + it('parses a full 4-part version string', () => { + expect(parseClickHouseVersion('26.4.1.3')).toEqual([26, 4, 1, 3]); + }); + + it('parses a version with multi-digit components', () => { + expect(parseClickHouseVersion('25.12.0.123')).toEqual([25, 12, 0, 123]); + }); + + it('defaults missing trailing components to 0', () => { + expect(parseClickHouseVersion('26.4')).toEqual([26, 4, 0, 0]); + expect(parseClickHouseVersion('26.4.1')).toEqual([26, 4, 1, 0]); + }); + + it('trims surrounding whitespace', () => { + expect(parseClickHouseVersion(' 26.4.1.3 ')).toEqual([26, 4, 1, 3]); + }); + + it('ignores trailing build metadata after the 4th component', () => { + expect(parseClickHouseVersion('26.4.1.3-stable')).toEqual([26, 4, 1, 3]); + }); + + it('returns undefined for empty input', () => { + expect(parseClickHouseVersion('')).toBeUndefined(); + expect(parseClickHouseVersion(' ')).toBeUndefined(); + }); + + it('returns undefined when major or minor are not numeric', () => { + expect(parseClickHouseVersion('abc.4.1.3')).toBeUndefined(); + expect(parseClickHouseVersion('26.xx.1.3')).toBeUndefined(); + }); + + it('returns undefined when there are fewer than two components', () => { + expect(parseClickHouseVersion('26')).toBeUndefined(); + }); +}); + +describe('compareClickHouseVersion', () => { + it('returns 0 for equal versions', () => { + expect(compareClickHouseVersion([26, 4, 1, 3], [26, 4, 1, 3])).toBe(0); + }); + + it('compares by major first', () => { + expect( + compareClickHouseVersion([27, 0, 0, 0], [26, 99, 99, 99]), + ).toBeGreaterThan(0); + }); + + it('compares by minor when major is equal', () => { + expect( + compareClickHouseVersion([26, 4, 0, 0], [26, 3, 99, 99]), + ).toBeGreaterThan(0); + }); + + it('compares by patch when major and minor are equal', () => { + expect( + compareClickHouseVersion([26, 4, 2, 0], [26, 4, 1, 99]), + ).toBeGreaterThan(0); + }); + + it('compares by tweak as the lowest-priority component', () => { + expect(compareClickHouseVersion([26, 4, 1, 4], [26, 4, 1, 3])).toBe(1); + expect(compareClickHouseVersion([26, 4, 1, 2], [26, 4, 1, 3])).toBe(-1); + }); +}); + +describe('isClickHouseVersionAtLeast', () => { + const threshold = [26, 4, 1, 3] as const; + + it('accepts the exact threshold version', () => { + expect(isClickHouseVersionAtLeast([26, 4, 1, 3], threshold)).toBe(true); + }); + + it('accepts versions above the threshold', () => { + expect(isClickHouseVersionAtLeast([26, 4, 1, 4], threshold)).toBe(true); + expect(isClickHouseVersionAtLeast([27, 0, 0, 0], threshold)).toBe(true); + }); + + it('rejects versions below the threshold', () => { + expect(isClickHouseVersionAtLeast([26, 4, 1, 2], threshold)).toBe(false); + expect(isClickHouseVersionAtLeast([26, 4, 0, 99], threshold)).toBe(false); + expect(isClickHouseVersionAtLeast([26, 2, 0, 0], threshold)).toBe(false); + expect(isClickHouseVersionAtLeast([25, 99, 99, 99], threshold)).toBe(false); + }); + + it('returns false when the version is unknown', () => { + expect(isClickHouseVersionAtLeast(undefined, threshold)).toBe(false); + }); +}); + +describe('supportsDirectReadMap', () => { + describe('26.2 backport branch (min 26.2.19.43)', () => { + it.each([ + [[26, 2, 19, 43], true], + [[26, 2, 19, 44], true], + [[26, 2, 20, 0], true], + [[26, 2, 99, 99], true], + [[26, 2, 19, 42], false], + [[26, 2, 19, 0], false], + [[26, 2, 18, 99], false], + [[26, 2, 0, 0], false], + ])('%j → %s', (version, expected) => { + expect(supportsDirectReadMap(version)).toBe(expected); + }); + }); + + describe('26.3 backport branch (min 26.3.12.3)', () => { + it.each([ + [[26, 3, 12, 3], true], + [[26, 3, 12, 4], true], + [[26, 3, 13, 0], true], + [[26, 3, 99, 99], true], + [[26, 3, 12, 2], false], + [[26, 3, 12, 0], false], + [[26, 3, 11, 99], false], + [[26, 3, 0, 0], false], + ])('%j → %s', (version, expected) => { + expect(supportsDirectReadMap(version)).toBe(expected); + }); + }); + + describe('26.4 backport branch (min 26.4.3.37)', () => { + it.each([ + [[26, 4, 3, 37], true], + [[26, 4, 3, 38], true], + [[26, 4, 4, 0], true], + [[26, 4, 99, 99], true], + [[26, 4, 3, 36], false], + [[26, 4, 3, 0], false], + [[26, 4, 2, 99], false], + [[26, 4, 1, 3], false], + [[26, 4, 0, 99], false], + ])('%j → %s', (version, expected) => { + expect(supportsDirectReadMap(version)).toBe(expected); + }); + }); + + describe('26.5+ baseline (always supported)', () => { + it.each([ + [[26, 5, 0, 0], true], + [[26, 5, 0, 1], true], + [[26, 5, 99, 99], true], + [[26, 6, 0, 0], true], + [[26, 99, 99, 99], true], + [[27, 0, 0, 0], true], + [[30, 1, 2, 3], true], + ])('%j → %s', (version, expected) => { + expect(supportsDirectReadMap(version)).toBe(expected); + }); + }); + + describe('unsupported branches', () => { + it.each([ + [[26, 1, 99, 99], false], + [[26, 0, 0, 0], false], + [[25, 12, 0, 0], false], + [[25, 99, 99, 99], false], + [[24, 0, 0, 0], false], + ])('%j → %s', (version, expected) => { + expect(supportsDirectReadMap(version)).toBe(expected); + }); + }); + + it('returns false when the version is undefined', () => { + expect(supportsDirectReadMap(undefined)).toBe(false); + }); +}); diff --git a/packages/common-utils/src/__tests__/queryParser.test.ts b/packages/common-utils/src/__tests__/queryParser.test.ts index 540ad2fe20..d039219df6 100644 --- a/packages/common-utils/src/__tests__/queryParser.test.ts +++ b/packages/common-utils/src/__tests__/queryParser.test.ts @@ -2159,6 +2159,9 @@ describe('CustomSchemaSQLSerializerV2 - KV items index optimization', () => { }, ]); metadata.getSetting = jest.fn().mockImplementation(async () => '0'); + metadata.getServerVersion = jest + .fn() + .mockImplementation(async () => [26, 5, 0, 0] as const); const serializer = new CustomSchemaSQLSerializerV2({ metadata, @@ -2271,6 +2274,9 @@ describe('CustomSchemaSQLSerializerV2 - KV items with MATERIALIZED column', () = }, ]); metadata.getSetting = jest.fn().mockImplementation(async () => '0'); + metadata.getServerVersion = jest + .fn() + .mockImplementation(async () => [26, 5, 0, 0] as const); const serializer = new CustomSchemaSQLSerializerV2({ metadata, @@ -2292,6 +2298,78 @@ describe('CustomSchemaSQLSerializerV2 - KV items with MATERIALIZED column', () = }); }); +describe('CustomSchemaSQLSerializerV2 - KV items with ALIAS column (expanded index expr)', () => { + // ClickHouse expands the skip-index `expr` for ALIAS columns to the full + // default_expression, instead of keeping it as the bare column name like it + // does for MATERIALIZED columns. This matches what the FTS schema emits. + const metadata = getMetadata( + new ClickhouseClient({ host: 'http://localhost:8123' }), + ); + const databaseName = 'default'; + const tableName = 'otel_logs'; + const connectionId = 'test'; + const aliasDefaultExpression = + "arrayMap(arr -> concat(arr.1, '=', arr.2), CAST(LogAttributes, 'Array(Tuple(String, String))'))"; + + metadata.getColumn = jest.fn().mockImplementation(async ({ column }) => { + if (column === 'LogAttributes') { + return { name: 'LogAttributes', type: 'Map(String, String)' }; + } else if (column === 'Body') { + return { name: 'Body', type: 'String' }; + } + return undefined; + }); + metadata.getMaterializedColumnsLookupTable = jest + .fn() + .mockResolvedValue(new Map()); + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(String, String)', + default_type: '', + default_expression: '', + }, + { + name: 'LogAttributeItems', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: aliasDefaultExpression, + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([ + { + name: 'idx_log_attr_items', + type: 'text', + typeFull: 'text(tokenizer=array)', + expression: aliasDefaultExpression, + granularity: 1, + }, + ]); + metadata.getSetting = jest.fn().mockResolvedValue('0'); + metadata.getServerVersion = jest + .fn() + .mockResolvedValue([26, 5, 0, 0] as const); + + const serializer = new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + + it('matches the index by its expanded ALIAS expression', async () => { + const builder = new SearchQueryBuilder( + 'LogAttributes.error.message:"Failed to fetch"', + serializer, + ); + const sql = await builder.build(); + expect(sql).toBe( + "((has(`LogAttributeItems`, concat('error.message', '=', 'Failed to fetch'))))", + ); + }); +}); + describe('CustomSchemaSQLSerializerV2 - KV items fallback cases', () => { const databaseName = 'default'; const tableName = 'otel_logs'; @@ -2301,6 +2379,7 @@ describe('CustomSchemaSQLSerializerV2 - KV items fallback cases', () => { columns?: any[]; skipIndices?: any[]; getColumn?: (opts: { column: string }) => any; + serverVersion?: readonly [number, number, number, number] | undefined; }) { const metadata = getMetadata( new ClickhouseClient({ host: 'http://localhost:8123' }), @@ -2325,6 +2404,15 @@ describe('CustomSchemaSQLSerializerV2 - KV items fallback cases', () => { .fn() .mockImplementation(async () => overrides.skipIndices ?? []); metadata.getSetting = jest.fn().mockImplementation(async () => '0'); + const serverVersion = Object.prototype.hasOwnProperty.call( + overrides, + 'serverVersion', + ) + ? overrides.serverVersion + : ([26, 5, 0, 0] as const); + metadata.getServerVersion = jest + .fn() + .mockImplementation(async () => serverVersion); return new CustomSchemaSQLSerializerV2({ metadata, @@ -2474,3 +2562,117 @@ describe('CustomSchemaSQLSerializerV2 - KV items fallback cases', () => { expect(sql).toContain('CAST'); }); }); + +describe('CustomSchemaSQLSerializerV2 - KV items version gate', () => { + const databaseName = 'default'; + const tableName = 'otel_logs'; + const connectionId = 'test'; + + function buildSerializer( + serverVersion: readonly [number, number, number, number] | undefined, + ) { + const metadata = getMetadata( + new ClickhouseClient({ host: 'http://localhost:8123' }), + ); + metadata.getColumn = jest.fn().mockImplementation(async ({ column }) => { + if (column === 'LogAttributes') { + return { name: 'LogAttributes', type: 'Map(String, String)' }; + } else if (column === 'Body') { + return { name: 'Body', type: 'String' }; + } + return undefined; + }); + metadata.getMaterializedColumnsLookupTable = jest + .fn() + .mockResolvedValue(new Map()); + metadata.getColumns = jest.fn().mockResolvedValue([ + { + name: 'LogAttributes', + type: 'Map(String, String)', + default_type: '', + default_expression: '', + }, + { + name: 'LogAttributeItems', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((arr) -> concat(arr.1, '=', arr.2), LogAttributes::Array(Tuple(String, String)))", + }, + ]); + metadata.getSkipIndices = jest.fn().mockResolvedValue([ + { + name: 'idx_log_attr_items', + type: 'text', + typeFull: 'text(tokenizer=array)', + expression: 'LogAttributeItems', + granularity: 1, + }, + ]); + metadata.getSetting = jest.fn().mockResolvedValue('0'); + metadata.getServerVersion = jest.fn().mockResolvedValue(serverVersion); + + return new CustomSchemaSQLSerializerV2({ + metadata, + databaseName, + tableName, + connectionId, + implicitColumnExpression: 'Body', + }); + } + + async function buildSql( + serverVersion: readonly [number, number, number, number] | undefined, + ) { + const serializer = buildSerializer(serverVersion); + const builder = new SearchQueryBuilder( + 'LogAttributes.service.name:"my-app"', + serializer, + ); + return builder.build(); + } + + const HAS_FORM = + "has(`LogAttributeItems`, concat('service.name', '=', 'my-app'))"; + + describe('emits has() direct_read form on supported versions', () => { + it.each([ + [26, 2, 19, 43], + [26, 2, 20, 0], + [26, 3, 12, 3], + [26, 3, 13, 0], + [26, 4, 3, 37], + [26, 4, 99, 99], + [26, 5, 0, 0], + [26, 6, 0, 0], + [27, 0, 0, 0], + ])('%j', async (...version) => { + const sql = await buildSql(version); + expect(sql).toContain(HAS_FORM); + }); + }); + + describe('falls back to Map subscript on unsupported versions', () => { + it.each([ + [26, 2, 19, 42], + [26, 2, 0, 0], + [26, 3, 12, 2], + [26, 3, 0, 0], + [26, 4, 3, 36], + [26, 4, 1, 3], + [26, 4, 0, 99], + [26, 1, 99, 99], + [25, 12, 0, 0], + ])('%j', async (...version) => { + const sql = await buildSql(version); + expect(sql).not.toContain('has(`LogAttributeItems`'); + expect(sql).toContain("= 'my-app'"); + }); + }); + + it('falls back when server version is unknown', async () => { + const sql = await buildSql(undefined); + expect(sql).not.toContain('has(`LogAttributeItems`'); + expect(sql).toContain("= 'my-app'"); + }); +}); diff --git a/packages/common-utils/src/core/clickhouseVersion.ts b/packages/common-utils/src/core/clickhouseVersion.ts new file mode 100644 index 0000000000..0f781a2b10 --- /dev/null +++ b/packages/common-utils/src/core/clickhouseVersion.ts @@ -0,0 +1,114 @@ +export type ClickHouseVersion = readonly [ + major: number, + minor: number, + patch: number, + tweak: number, +]; + +/** + * Parses a ClickHouse `version()` string (e.g. "26.4.1.3" or "25.12.0.1") into a + * 4-tuple. Missing trailing components default to 0. Returns undefined if the + * leading major.minor cannot be parsed as integers. + */ +export function parseClickHouseVersion( + version: string, +): ClickHouseVersion | undefined { + const trimmed = version.trim(); + if (!trimmed) return undefined; + + const parts = trimmed.split('.', 5); + if (parts.length < 2) return undefined; + + const nums: number[] = []; + for (let i = 0; i < 4; i++) { + const raw = parts[i] ?? '0'; + const match = raw.match(/^\d+/); + if (!match) { + if (i < 2) return undefined; + nums.push(0); + continue; + } + const n = Number.parseInt(match[0], 10); + if (!Number.isFinite(n)) { + if (i < 2) return undefined; + nums.push(0); + continue; + } + nums.push(n); + } + + return [nums[0], nums[1], nums[2], nums[3]] as const; +} + +export function compareClickHouseVersion( + a: ClickHouseVersion, + b: ClickHouseVersion, +): number { + for (let i = 0; i < 4; i++) { + if (a[i] !== b[i]) return a[i] - b[i]; + } + return 0; +} + +export function isClickHouseVersionAtLeast( + version: ClickHouseVersion | undefined, + min: ClickHouseVersion, +): boolean { + if (!version) return false; + return compareClickHouseVersion(version, min) >= 0; +} + +/** + * Per-branch minimum versions required for the direct_read map column + * optimization that compiles `Map['key'] = 'value'` filters into + * `has(, concat('key', '=', 'value'))`. + * + * ClickHouse backported the feature into multiple stable release lines, so + * there is no single threshold — each major.minor branch has its own cutoff: + * + * - 26.2 branch → first available at 26.2.19.43 + * - 26.3 branch → first available at 26.3.12.3 + * - 26.4 branch → first available at 26.4.3.37 + * - 26.5+ → always supported (feature shipped in mainline) + * + * Earlier 26.x branches (26.0, 26.1) and anything < 26 never received the + * backport and are considered unsupported. + * + * Listed in ascending order — the highest entry defines the last branch that + * required a backport; everything above `DIRECT_READ_MAP_BASELINE` is on by + * default. + */ +const DIRECT_READ_MAP_BACKPORT_MINS: ReadonlyArray = [ + [26, 2, 19, 43], + [26, 3, 12, 3], + [26, 4, 3, 37], +]; + +/** + * First release where direct_read map support shipped unconditionally. Any + * server with major.minor at or above this is considered supported, even if + * its branch is not present in `DIRECT_READ_MAP_BACKPORT_MINS`. + */ +const DIRECT_READ_MAP_BASELINE: ClickHouseVersion = [26, 5, 0, 0]; + +/** + * Returns true when the connected ClickHouse server supports the direct_read + * map column optimization. Returns false when the version is undefined or + * predates every known backport. + */ +export function supportsDirectReadMap( + version: ClickHouseVersion | undefined, +): boolean { + if (!version) return false; + + if (compareClickHouseVersion(version, DIRECT_READ_MAP_BASELINE) >= 0) { + return true; + } + + const [vMajor, vMinor] = version; + const branchMin = DIRECT_READ_MAP_BACKPORT_MINS.find( + ([major, minor]) => major === vMajor && minor === vMinor, + ); + if (!branchMin) return false; + return compareClickHouseVersion(version, branchMin) >= 0; +} diff --git a/packages/common-utils/src/core/metadata.ts b/packages/common-utils/src/core/metadata.ts index 89412830a3..a80feb1fad 100644 --- a/packages/common-utils/src/core/metadata.ts +++ b/packages/common-utils/src/core/metadata.ts @@ -21,6 +21,7 @@ import type { } from '@/types'; import { isLogSource, isTraceSource, SourceKind } from '@/types'; +import { ClickHouseVersion, parseClickHouseVersion } from './clickhouseVersion'; import { optimizeGetKeyValuesCalls, renderStartOfBucketExpr, @@ -926,6 +927,39 @@ export class Metadata { }); } + /** + * Returns the parsed ClickHouse server version (from `SELECT version()`). + * Returns undefined when the query fails or the value cannot be parsed; the + * result is cached per connection and callers should treat undefined as + * "unknown / assume older". + */ + async getServerVersion({ + connectionId, + }: { + connectionId: string; + }): Promise { + return this.cache.getOrFetch(`${connectionId}.serverVersion`, async () => { + try { + const json = await this.clickhouseClient + .query<'JSON'>({ + connectionId, + query: 'SELECT version() AS version', + query_params: undefined, + clickhouse_settings: this.getClickHouseSettings(), + shouldSkipApplySettings: true, + }) + .then(res => res.json<{ version: string }>()); + + const versionString = json.data[0]?.version; + if (!versionString) return undefined; + return parseClickHouseVersion(versionString); + } catch (e) { + console.warn('Error fetching ClickHouse server version:', e); + return undefined; + } + }); + } + async getSettings({ connectionId }: { connectionId: string }) { return this.cache.getOrFetch( `${connectionId}.availableSettings`, diff --git a/packages/common-utils/src/queryParser.ts b/packages/common-utils/src/queryParser.ts index de783790a0..0d5a50a2a4 100644 --- a/packages/common-utils/src/queryParser.ts +++ b/packages/common-utils/src/queryParser.ts @@ -9,6 +9,7 @@ import { extractInnerCHArrayJSType, JSDataType, } from '@/clickhouse'; +import { supportsDirectReadMap } from '@/core/clickhouseVersion'; import { Metadata, parseKeyPath, @@ -1131,6 +1132,13 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { * a text(tokenizer=array) skip index. */ private async buildKvItemsLookup(): Promise { + const serverVersion = await this.metadata.getServerVersion({ + connectionId: this.connectionId, + }); + if (!supportsDirectReadMap(serverVersion)) { + return new Map(); + } + const [columns, skipIndices] = await Promise.all([ this.metadata.getColumns({ databaseName: this.databaseName, @@ -1161,15 +1169,14 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { if (!parsed) continue; // Check if this column has a text(tokenizer=array) skip index + const candidateName = normalizeChExpression(candidate.name); + const candidateExpr = normalizeChExpression(candidate.default_expression); const hasArrayTextIndex = skipIndices.some(idx => { if (idx.type !== 'text') return false; const tokenizer = parseTokenizerFromTextIndex(idx); if (tokenizer?.type !== 'array') return false; - // Require exact match: has() won't benefit from a transformed index like lower(col) - return ( - normalizeChExpression(idx.expression) === - normalizeChExpression(candidate.name) - ); + const idxExpr = normalizeChExpression(idx.expression); + return idxExpr === candidateName || idxExpr === candidateExpr; }); if (hasArrayTextIndex) { From a83332ffb2435f93b98869099ff832c513377890 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Tue, 26 May 2026 16:22:36 -0400 Subject: [PATCH 02/10] fix: version guard only applies for ALIAS columns --- .changeset/direct-read-map-default.md | 20 +++++--- .../src/__tests__/queryParser.test.ts | 46 +++++++++++++++---- packages/common-utils/src/queryParser.ts | 25 ++++++---- 3 files changed, 67 insertions(+), 24 deletions(-) diff --git a/.changeset/direct-read-map-default.md b/.changeset/direct-read-map-default.md index c7435cb048..e528e5b259 100644 --- a/.changeset/direct-read-map-default.md +++ b/.changeset/direct-read-map-default.md @@ -22,15 +22,23 @@ unaffected (`CREATE TABLE IF NOT EXISTS` is a no-op). At query time, the app gates the `Map['key'] = 'value'` → `has(, concat('key', '=', 'value'))` rewrite on the connected ClickHouse server version (`SELECT version()`, cached per connection). -The direct_read feature was backported into multiple stable 26.x release -lines, so the gate uses a per-branch minimum: +The gate only applies to **ALIAS** items columns, which are computed at +query time and therefore depend on the server being able to perform a +direct_read against the underlying Map's tuple storage. The direct_read +feature was backported into multiple stable 26.x release lines, so the +gate uses a per-branch minimum: - 26.2 line: >= 26.2.19.43 - 26.3 line: >= 26.3.12.3 - 26.4 line: >= 26.4.3.37 - 26.5+ : always supported -Servers below their branch's threshold continue to compile filters into -the original Map-subscript form, since they cannot perform the direct_read -against the Map's underlying tuple storage that makes the optimization -profitable. +ALIAS items columns on servers below their branch's threshold continue +to compile filters into the original Map-subscript form. + +**MATERIALIZED items columns are always used when available**, regardless +of ClickHouse version. MATERIALIZED columns are physically stored on +disk, so `has(items, ...)` reads them directly and works on any +ClickHouse version that supports the text index itself. Operators who +want the optimization on servers below the backport cutoffs can +`ALTER TABLE` to materialize the items columns. diff --git a/packages/common-utils/src/__tests__/queryParser.test.ts b/packages/common-utils/src/__tests__/queryParser.test.ts index d039219df6..dbc79c3e54 100644 --- a/packages/common-utils/src/__tests__/queryParser.test.ts +++ b/packages/common-utils/src/__tests__/queryParser.test.ts @@ -2570,6 +2570,7 @@ describe('CustomSchemaSQLSerializerV2 - KV items version gate', () => { function buildSerializer( serverVersion: readonly [number, number, number, number] | undefined, + defaultType: 'ALIAS' | 'MATERIALIZED' = 'ALIAS', ) { const metadata = getMetadata( new ClickhouseClient({ host: 'http://localhost:8123' }), @@ -2595,7 +2596,7 @@ describe('CustomSchemaSQLSerializerV2 - KV items version gate', () => { { name: 'LogAttributeItems', type: 'Array(String)', - default_type: 'ALIAS', + default_type: defaultType, default_expression: "arrayMap((arr) -> concat(arr.1, '=', arr.2), LogAttributes::Array(Tuple(String, String)))", }, @@ -2623,8 +2624,9 @@ describe('CustomSchemaSQLSerializerV2 - KV items version gate', () => { async function buildSql( serverVersion: readonly [number, number, number, number] | undefined, + defaultType: 'ALIAS' | 'MATERIALIZED' = 'ALIAS', ) { - const serializer = buildSerializer(serverVersion); + const serializer = buildSerializer(serverVersion, defaultType); const builder = new SearchQueryBuilder( 'LogAttributes.service.name:"my-app"', serializer, @@ -2635,7 +2637,7 @@ describe('CustomSchemaSQLSerializerV2 - KV items version gate', () => { const HAS_FORM = "has(`LogAttributeItems`, concat('service.name', '=', 'my-app'))"; - describe('emits has() direct_read form on supported versions', () => { + describe('ALIAS items column - emits has() on supported versions', () => { it.each([ [26, 2, 19, 43], [26, 2, 20, 0], @@ -2647,12 +2649,12 @@ describe('CustomSchemaSQLSerializerV2 - KV items version gate', () => { [26, 6, 0, 0], [27, 0, 0, 0], ])('%j', async (...version) => { - const sql = await buildSql(version); + const sql = await buildSql(version, 'ALIAS'); expect(sql).toContain(HAS_FORM); }); }); - describe('falls back to Map subscript on unsupported versions', () => { + describe('ALIAS items column - falls back on unsupported versions', () => { it.each([ [26, 2, 19, 42], [26, 2, 0, 0], @@ -2664,15 +2666,43 @@ describe('CustomSchemaSQLSerializerV2 - KV items version gate', () => { [26, 1, 99, 99], [25, 12, 0, 0], ])('%j', async (...version) => { - const sql = await buildSql(version); + const sql = await buildSql(version, 'ALIAS'); expect(sql).not.toContain('has(`LogAttributeItems`'); expect(sql).toContain("= 'my-app'"); }); }); - it('falls back when server version is unknown', async () => { - const sql = await buildSql(undefined); + it('ALIAS items column falls back when server version is unknown', async () => { + const sql = await buildSql(undefined, 'ALIAS'); expect(sql).not.toContain('has(`LogAttributeItems`'); expect(sql).toContain("= 'my-app'"); }); + + describe('MATERIALIZED items column - emits has() on EVERY version', () => { + it.each([ + [26, 2, 19, 43], + [26, 2, 19, 42], + [26, 2, 0, 0], + [26, 3, 12, 3], + [26, 3, 12, 2], + [26, 3, 0, 0], + [26, 4, 3, 37], + [26, 4, 3, 36], + [26, 4, 1, 3], + [26, 4, 0, 99], + [26, 1, 99, 99], + [26, 0, 0, 0], + [25, 12, 0, 0], + [26, 5, 0, 0], + [27, 0, 0, 0], + ])('%j', async (...version) => { + const sql = await buildSql(version, 'MATERIALIZED'); + expect(sql).toContain(HAS_FORM); + }); + }); + + it('MATERIALIZED items column emits has() even when server version is unknown', async () => { + const sql = await buildSql(undefined, 'MATERIALIZED'); + expect(sql).toContain(HAS_FORM); + }); }); diff --git a/packages/common-utils/src/queryParser.ts b/packages/common-utils/src/queryParser.ts index 0d5a50a2a4..ad4d617d66 100644 --- a/packages/common-utils/src/queryParser.ts +++ b/packages/common-utils/src/queryParser.ts @@ -1130,16 +1130,17 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { * A KV items column is an ALIAS/MATERIALIZED column whose expression is * arrayMap((k,v)->concat(k,'=',v), mapKeys(X), mapValues(X)) and which has * a text(tokenizer=array) skip index. + * + * The version gate (`supportsDirectReadMap`) only applies to ALIAS items + * columns: ALIAS columns are computed at query time, so `has(items, ...)` + * against an ALIAS only realizes its speedup when the server can perform a + * direct_read against the underlying Map's tuple storage. MATERIALIZED items + * columns are physically stored on disk, so `has()` reads them directly and + * is fast on any ClickHouse version that supports the text index itself. */ private async buildKvItemsLookup(): Promise { - const serverVersion = await this.metadata.getServerVersion({ - connectionId: this.connectionId, - }); - if (!supportsDirectReadMap(serverVersion)) { - return new Map(); - } - - const [columns, skipIndices] = await Promise.all([ + const [serverVersion, columns, skipIndices] = await Promise.all([ + this.metadata.getServerVersion({ connectionId: this.connectionId }), this.metadata.getColumns({ databaseName: this.databaseName, tableName: this.tableName, @@ -1148,9 +1149,10 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { this.skipIndicesPromise ?? Promise.resolve([]), ]); + const directReadSupported = supportsDirectReadMap(serverVersion); + const lookup: KvItemsLookup = new Map(); - // Find columns that are ALIAS or MATERIALIZED with KV items expressions const kvItemsCandidates = columns.filter( c => (c.default_type === 'ALIAS' || c.default_type === 'MATERIALIZED') && @@ -1158,6 +1160,10 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { ); for (const candidate of kvItemsCandidates) { + if (candidate.default_type === 'ALIAS' && !directReadSupported) { + continue; + } + const parsed = (() => { let parsed: { mapColumn: string; separator: string } | undefined; for (const strategy of KV_ITEMS_STRATEGIES) { @@ -1168,7 +1174,6 @@ export class CustomSchemaSQLSerializerV2 extends SQLSerializer { })(); if (!parsed) continue; - // Check if this column has a text(tokenizer=array) skip index const candidateName = normalizeChExpression(candidate.name); const candidateExpr = normalizeChExpression(candidate.default_expression); const hasArrayTextIndex = skipIndices.some(idx => { From 671029bfb9e337694fcb8145ed986ad5e2436d50 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Tue, 26 May 2026 17:00:21 -0400 Subject: [PATCH 03/10] fix: add compat schema for traces --- .../schema/seed/00005_otel_traces_compat.sql | 45 +++++++++++++++++++ packages/otel-collector/cmd/migrate/main.go | 43 ++++++++++++++++-- 2 files changed, 85 insertions(+), 3 deletions(-) create mode 100644 docker/otel-collector/schema/seed/00005_otel_traces_compat.sql diff --git a/docker/otel-collector/schema/seed/00005_otel_traces_compat.sql b/docker/otel-collector/schema/seed/00005_otel_traces_compat.sql new file mode 100644 index 0000000000..e2279b6c15 --- /dev/null +++ b/docker/otel-collector/schema/seed/00005_otel_traces_compat.sql @@ -0,0 +1,45 @@ +-- +goose Up +-- Compatibility schema for ClickHouse < 26.2 (no full text search indexes). +-- The main schema (00005_otel_traces.sql) adds text(tokenizer='array') items +-- indexes that older ClickHouse cannot create; this file preserves the prior +-- bloom_filter-only schema so traces still works on those servers. +CREATE TABLE IF NOT EXISTS ${DATABASE}.otel_traces +( + `Timestamp` DateTime64(9) CODEC(Delta(8), ZSTD(1)), + `TraceId` String CODEC(ZSTD(1)), + `SpanId` String CODEC(ZSTD(1)), + `ParentSpanId` String CODEC(ZSTD(1)), + `TraceState` String CODEC(ZSTD(1)), + `SpanName` LowCardinality(String) CODEC(ZSTD(1)), + `SpanKind` LowCardinality(String) CODEC(ZSTD(1)), + `ServiceName` LowCardinality(String) CODEC(ZSTD(1)), + `ResourceAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `ScopeName` String CODEC(ZSTD(1)), + `ScopeVersion` String CODEC(ZSTD(1)), + `SpanAttributes` Map(LowCardinality(String), String) CODEC(ZSTD(1)), + `Duration` UInt64 CODEC(ZSTD(1)), + `StatusCode` LowCardinality(String) CODEC(ZSTD(1)), + `StatusMessage` String CODEC(ZSTD(1)), + `Events.Timestamp` Array(DateTime64(9)) CODEC(ZSTD(1)), + `Events.Name` Array(LowCardinality(String)) CODEC(ZSTD(1)), + `Events.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), + `Links.TraceId` Array(String) CODEC(ZSTD(1)), + `Links.SpanId` Array(String) CODEC(ZSTD(1)), + `Links.TraceState` Array(String) CODEC(ZSTD(1)), + `Links.Attributes` Array(Map(LowCardinality(String), String)) CODEC(ZSTD(1)), + `__hdx_materialized_rum.sessionId` String MATERIALIZED ResourceAttributes['rum.sessionId'] CODEC(ZSTD(1)), + `SampleRate` UInt64 MATERIALIZED greatest(toUInt64OrZero(SpanAttributes['SampleRate']), 1) CODEC(T64, ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_rum_session_id __hdx_materialized_rum.sessionId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_res_attr_key mapKeys(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_res_attr_value mapValues(ResourceAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_key mapKeys(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_span_attr_value mapValues(SpanAttributes) TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_duration Duration TYPE minmax GRANULARITY 1, + INDEX idx_lower_span_name lower(SpanName) TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 8 +) +ENGINE = MergeTree +PARTITION BY toDate(Timestamp) +ORDER BY (ServiceName, SpanName, toDateTime(Timestamp)) +TTL toDate(Timestamp) + ${TABLES_TTL} +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/packages/otel-collector/cmd/migrate/main.go b/packages/otel-collector/cmd/migrate/main.go index 8af809b190..3f09d48b03 100644 --- a/packages/otel-collector/cmd/migrate/main.go +++ b/packages/otel-collector/cmd/migrate/main.go @@ -410,6 +410,37 @@ func removeCompatLogsSchema(tempDir string) error { return nil } +// swapTracesSchemaForCompat replaces the full-text-search traces schema with +// the compatibility variant (bloom_filter indexes, no items columns) in the +// processed temp directory. It removes 00005_otel_traces.sql and renames +// 00005_otel_traces_compat.sql to take its place, so goose runs the compat +// schema instead. +func swapTracesSchemaForCompat(tempDir string) error { + fullTextPath := filepath.Join(tempDir, "00005_otel_traces.sql") + compatPath := filepath.Join(tempDir, "00005_otel_traces_compat.sql") + + if err := os.Remove(fullTextPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove full text traces schema: %w", err) + } + + if err := os.Rename(compatPath, fullTextPath); err != nil { + return fmt.Errorf("failed to rename compat traces schema: %w", err) + } + + return nil +} + +// removeCompatTracesSchema removes the compat traces schema file from the temp +// directory when full text search is supported, so goose doesn't run both +// schemas. +func removeCompatTracesSchema(tempDir string) error { + compatPath := filepath.Join(tempDir, "00005_otel_traces_compat.sql") + if err := os.Remove(compatPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove compat traces schema: %w", err) + } + return nil +} + // listSQLFiles lists all SQL files in a directory for logging purposes func listSQLFiles(dir string) ([]string, error) { entries, err := os.ReadDir(dir) @@ -478,16 +509,22 @@ func main() { } defer os.RemoveAll(tempDir) - // Select the appropriate logs schema based on ClickHouse version + // Select the appropriate logs and traces schemas based on ClickHouse version if supportsFullTextSearch(chMajor, chMinor) { if err := removeCompatLogsSchema(tempDir); err != nil { - log.Fatalf("Failed to remove compat schema: %v", err) + log.Fatalf("Failed to remove compat logs schema: %v", err) + } + if err := removeCompatTracesSchema(tempDir); err != nil { + log.Fatalf("Failed to remove compat traces schema: %v", err) } } else { - log.Printf("ClickHouse %d.%d < 26.2, falling back to compatibility logs schema", chMajor, chMinor) + log.Printf("ClickHouse %d.%d < 26.2, falling back to compatibility logs and traces schemas", chMajor, chMinor) if err := swapLogsSchemaForCompat(tempDir); err != nil { log.Fatalf("Failed to swap logs schema: %v", err) } + if err := swapTracesSchemaForCompat(tempDir); err != nil { + log.Fatalf("Failed to swap traces schema: %v", err) + } } // List SQL files From 6fe6c2e1341b3ceea98018211f5f62f981306668 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Fri, 29 May 2026 16:11:41 -0400 Subject: [PATCH 04/10] feat: use text indexes to power filter and autocomplete --- .../src/__tests__/clickhouseVersion.test.ts | 22 + .../src/__tests__/metadata.test.ts | 742 +++++++++++++++++- .../src/core/clickhouseVersion.ts | 17 + packages/common-utils/src/core/kvItems.ts | 239 ++++++ packages/common-utils/src/core/metadata.ts | 585 +++++++++++--- packages/common-utils/src/queryParser.ts | 233 +----- 6 files changed, 1480 insertions(+), 358 deletions(-) create mode 100644 packages/common-utils/src/core/kvItems.ts diff --git a/packages/common-utils/src/__tests__/clickhouseVersion.test.ts b/packages/common-utils/src/__tests__/clickhouseVersion.test.ts index 3232cfdec6..3abe4f41bf 100644 --- a/packages/common-utils/src/__tests__/clickhouseVersion.test.ts +++ b/packages/common-utils/src/__tests__/clickhouseVersion.test.ts @@ -3,6 +3,7 @@ import { isClickHouseVersionAtLeast, parseClickHouseVersion, supportsDirectReadMap, + supportsMergeTreeTextIndex, } from '@/core/clickhouseVersion'; type ClickHouseVersionTuple = readonly [number, number, number, number]; @@ -174,3 +175,24 @@ describe('supportsDirectReadMap', () => { expect(supportsDirectReadMap(undefined)).toBe(false); }); }); + +describe('supportsMergeTreeTextIndex', () => { + it.each([ + [[26, 3, 0, 0], true], + [[26, 3, 0, 1], true], + [[26, 3, 99, 99], true], + [[26, 4, 0, 0], true], + [[27, 0, 0, 0], true], + [[26, 2, 99, 99], false], + [[26, 2, 0, 0], false], + [[26, 1, 0, 0], false], + [[25, 12, 0, 0], false], + [[24, 0, 0, 0], false], + ])('%j → %s', (version, expected) => { + expect(supportsMergeTreeTextIndex(version)).toBe(expected); + }); + + it('returns false when the version is undefined', () => { + expect(supportsMergeTreeTextIndex(undefined)).toBe(false); + }); +}); diff --git a/packages/common-utils/src/__tests__/metadata.test.ts b/packages/common-utils/src/__tests__/metadata.test.ts index bbd40781ef..4abb1a8b4a 100644 --- a/packages/common-utils/src/__tests__/metadata.test.ts +++ b/packages/common-utils/src/__tests__/metadata.test.ts @@ -1,5 +1,10 @@ import { ClickhouseClient } from '../clickhouse/node'; -import { Metadata, MetadataCache, parseKeyPath } from '../core/metadata'; +import { + MapColumnTextIndexes, + Metadata, + MetadataCache, + parseKeyPath, +} from '../core/metadata'; import * as renderChartConfigModule from '../core/renderChartConfig'; import { timeFilterExpr } from '../core/renderChartConfig'; import { isBuilderChartConfig } from '../guards'; @@ -44,6 +49,9 @@ const source: TSource = { beforeAll(() => { jest.spyOn(console, 'warn').mockImplementation(() => {}); jest.spyOn(console, 'error').mockImplementation(() => {}); + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); }); afterAll(() => { jest.restoreAllMocks(); @@ -875,7 +883,7 @@ describe('Metadata', () => { databaseName: 'otel', tableName: 'generic_logs', column: 'LogAttributes', - key: 'service.name', + keys: ['service.name'], connectionId: 'conn-1', }); @@ -903,7 +911,7 @@ describe('Metadata', () => { databaseName: 'otel', tableName: 'generic_logs', column: 'LogAttributes', - key: 'service.name', + keys: ['service.name'], connectionId: 'conn-1', dateRange, timestampValueExpression: 'EventTime, EventDate', @@ -940,7 +948,7 @@ describe('Metadata', () => { databaseName: 'otel', tableName: 'generic_logs', column: 'LogAttributes', - key: 'service.name', + keys: ['service.name'], connectionId: 'conn-1', timestampValueExpression: 'EventTime, EventDate', }; @@ -960,8 +968,8 @@ describe('Metadata', () => { ], }); - expect(valuesA).toEqual(['morning']); - expect(valuesB).toEqual(['afternoon']); + expect(valuesA.get('service.name')).toEqual(['morning']); + expect(valuesB.get('service.name')).toEqual(['afternoon']); }); }); @@ -1434,6 +1442,728 @@ describe('Metadata', () => { expect(result).toBeNull(); }); }); + + describe('mergeTreeTextIndex attribute lookup', () => { + const buildMetadata = () => { + const realCache = new ( + jest.requireActual('../core/metadata') as any + ).MetadataCache(); + return new Metadata(mockClickhouseClient, realCache); + }; + + const itemsOnlyEntry: MapColumnTextIndexes = { + itemsIndex: { indexName: 'idx_log_attrs_items', separator: '=' }, + }; + const keysAndItemsEntry: MapColumnTextIndexes = { + keysIndex: { indexName: 'idx_log_attrs_key' }, + itemsIndex: { indexName: 'idx_log_attrs_items', separator: '=' }, + }; + const keysOnlyEntry: MapColumnTextIndexes = { + keysIndex: { indexName: 'idx_log_attrs_key' }, + }; + + beforeEach(() => { + (mockClickhouseClient.query as jest.Mock).mockReset(); + (timeFilterExpr as jest.Mock).mockClear(); + }); + + it('getMapKeys prefers the keys-only mapKeys(X) text index over the items index', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name' }, { key: 'http.method' }], + }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + }); + + expect(keys).toEqual(['service.name', 'http.method']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + // keys-only path selects `token AS key` directly, no splitByChar. + expect(call.query).not.toContain('splitByChar'); + expect(Object.values(call.query_params)).toContain('idx_log_attrs_key'); + }); + + it('getMapKeys falls back to the items index when no keys-only index exists', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'service.name' }] }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + }); + + expect(keys).toEqual(['service.name']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).toContain('splitByChar'); + expect(Object.values(call.query_params)).toContain('idx_log_attrs_items'); + }); + + it('getMapKeys falls through to MV when the text index returns empty (no cache poisoning)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock) + // keysIndex query — returns empty (e.g. fresh table, no parts yet) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + // itemsIndex query — also empty + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + // MV rollup query — has data + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ Key: 'service.name' }] }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // All three queries fired (proves cache isn't poisoned by intermediate empty results). + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 3, + ); + expect(keys).toEqual(['service.name']); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[2][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_key_rollup_15m', + ); + }); + + it('getMapKeys falls through to the MV path when no text index exists', async () => { + const md = buildMetadata(); + jest.spyOn(md, 'getMapColumnTextIndexes').mockResolvedValue(new Map()); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ Key: 'service.name' }] }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect(keys).toEqual(['service.name']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(Object.values(call.query_params)).toContain('logs_key_rollup_15m'); + expect(call.query).not.toContain('mergeTreeTextIndex'); + }); + + it('getMapValues uses mergeTreeTextIndex when keys are requested and an items index exists', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { key: 'service.name', value: 'api' }, + { key: 'service.name', value: 'worker' }, + ], + }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + }); + + expect(result.get('service.name')).toEqual(['api', 'worker']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).toContain('startsWith(token'); + }); + + it('getAllKeyValues uses mergeTreeTextIndex for map keys whose column has an index, MV for the rest', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Value: 'clickhouse', + total_count: 1, + }, + ], + }), + }); + + const result = await md.getAllKeyValues({ + databaseName: 'otel', + tableName: 'generic_logs', + keyExpressions: ["LogAttributes['service.name']", 'ServiceName'], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect(result).toEqual([ + { key: "LogAttributes['service.name']", value: ['api'] }, + { key: 'ServiceName', value: ['clickhouse'] }, + ]); + + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + // OR-chain of literal `startsWith(token, 'key=')` predicates is the + // only shape KeyCondition can range-rewrite — JOINs against + // `target_keys` would force a full dictionary scan. + expect(textIndexCall.query).toContain('startsWith(token,'); + expect(textIndexCall.query).not.toContain('INNER JOIN'); + // The map key was inserted as a String parameter (`{paramN:String}`) + // alongside its `=` separator, which CH substitutes as a literal at + // parameter-binding time. + expect(Object.values(textIndexCall.query_params)).toContain( + 'service.name=', + ); + + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_kv_rollup_15m', + ); + expect(Object.values(mvCall.query_params)).toContain('NativeColumn'); + }); + + it('getAllFieldsAndValues excludes text-indexed columns from the MV query', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + // Bypass the keys-discovery composition step so this test stays + // focused on the values + MV exclusion behavior. + jest.spyOn(md, 'getMapKeys').mockResolvedValue(['service.name']); + + (mockClickhouseClient.query as jest.Mock) + // text-index OR-chain values query for LogAttributes + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + // MV query for natives + unindexed map columns + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Values: ['clickhouse'], + }, + ], + }), + }); + + const result = await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(mvCall.query).toContain('ColumnIdentifier NOT IN'); + + expect(result).toEqual( + expect.arrayContaining([ + { key: 'ServiceName', value: ['clickhouse'] }, + { key: "LogAttributes['service.name']", value: ['api'] }, + ]), + ); + }); + + it('getMapValues falls through to the main-table scan when only a keys-only index exists', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ value: 'fallback' }] }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + }); + + expect(result.get('service.name')).toEqual(['fallback']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).not.toContain('mergeTreeTextIndex'); + }); + + it('getAllFieldsAndValues keeps keys-only columns in the MV query (excludes only items-indexed columns)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Values: ['clickhouse'], + }, + ], + }), + }); + + await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // Only one query (the MV one) — no text-index query for a keys-only column. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 1, + ); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(mvCall.query).not.toContain('ColumnIdentifier NOT IN'); + }); + + it('getMapValues falls through to main-table scan when items index returns empty (no cache poisoning)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + // text-index query — empty (e.g. no parts indexed yet) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + // main-table scan — has data + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ value: 'scan-fallback' }] }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + }); + + // Both queries fired — proves cache isn't poisoned by the empty text-index result. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + expect(result.get('service.name')).toEqual(['scan-fallback']); + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + const scanCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[1][0]; + expect(scanCall.query).not.toContain('mergeTreeTextIndex'); + }); + + it('getAllKeyValues splits across text-index and MV when only some map columns have a text index', async () => { + const md = buildMetadata(); + // LogAttributes has an items text index; ResourceAttributes does not — + // its values must come back from the MV (the user-defined attribute + // rollup row). + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + // text-index call for LogAttributes + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + // MV call for the remaining keys (ResourceAttributes + native) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'ResourceAttributes', + Key: 'host.name', + Value: 'host-1', + total_count: 1, + }, + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Value: 'clickhouse', + total_count: 1, + }, + ], + }), + }); + + const result = await md.getAllKeyValues({ + databaseName: 'otel', + tableName: 'generic_logs', + keyExpressions: [ + "LogAttributes['service.name']", + "ResourceAttributes['host.name']", + 'ServiceName', + ], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect(result).toEqual([ + { key: "LogAttributes['service.name']", value: ['api'] }, + { key: "ResourceAttributes['host.name']", value: ['host-1'] }, + { key: 'ServiceName', value: ['clickhouse'] }, + ]); + + // Both calls were made: text-index for LogAttributes, MV for the rest. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_kv_rollup_15m', + ); + // MV query must include the unindexed map column AND the native, but + // NOT the text-indexed column (LogAttributes is omitted from the IN list). + expect(Object.values(mvCall.query_params)).toContain( + 'ResourceAttributes', + ); + expect(Object.values(mvCall.query_params)).toContain('NativeColumn'); + expect(Object.values(mvCall.query_params)).not.toContain('LogAttributes'); + }); + + it('getAllFieldsAndValues composes getMapKeys + literal OR-chain (no INNER JOIN, no token scan)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + const getMapKeysSpy = jest + .spyOn(md, 'getMapKeys') + .mockResolvedValue(['service.name', 'http.method']); + + (mockClickhouseClient.query as jest.Mock) + // text-index values query + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { key: 'service.name', value: 'api' }, + { key: 'http.method', value: 'GET' }, + ], + }), + }) + // MV query (no rows for this assertion) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [] }), + }); + + await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // Composition: getMapKeys called per items-indexed column. + expect(getMapKeysSpy).toHaveBeenCalledWith( + expect.objectContaining({ column: 'LogAttributes' }), + ); + + // Values query is the literal OR-chain shape, not a JOIN. + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + expect(textIndexCall.query).toContain('startsWith(token,'); + expect(textIndexCall.query).not.toContain('INNER JOIN'); + expect(textIndexCall.query).not.toContain('target_keys'); + // Both keys appear as literal prefixes (`key=`) in query_params, so + // KeyCondition can range-rewrite each one independently. + expect(Object.values(textIndexCall.query_params)).toContain( + 'service.name=', + ); + expect(Object.values(textIndexCall.query_params)).toContain( + 'http.method=', + ); + }); + + it('getAllFieldsAndValues serves unindexed map columns from MV alongside text-indexed ones', async () => { + const md = buildMetadata(); + // LogAttributes is text-indexed; ResourceAttributes is in the MV only. + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + // Bypass keys discovery so we exercise just the values + MV merge. + jest.spyOn(md, 'getMapKeys').mockResolvedValue(['service.name']); + + (mockClickhouseClient.query as jest.Mock) + // text-index OR-chain values query for LogAttributes + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + // MV call returning ResourceAttributes + native (LogAttributes excluded) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'ResourceAttributes', + Key: 'host.name', + Values: ['host-1', 'host-2'], + }, + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Values: ['clickhouse'], + }, + ], + }), + }); + + const result = await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // Both queries fired. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + // Only LogAttributes is excluded from the MV; ResourceAttributes stays in. + expect(mvCall.query).toContain('ColumnIdentifier NOT IN'); + expect(Object.values(mvCall.query_params)).toContain('LogAttributes'); + expect(Object.values(mvCall.query_params)).not.toContain( + 'ResourceAttributes', + ); + + expect(result).toEqual( + expect.arrayContaining([ + { key: "LogAttributes['service.name']", value: ['api'] }, + { + key: "ResourceAttributes['host.name']", + value: ['host-1', 'host-2'], + }, + { key: 'ServiceName', value: ['clickhouse'] }, + ]), + ); + }); + + it('getMapColumnTextIndexes returns empty Map on ClickHouse < 26.3', async () => { + const md = buildMetadata(); + // Bypass the global beforeAll stub so we exercise the real implementation. + const globalStub = Metadata.prototype + .getMapColumnTextIndexes as jest.Mock; + globalStub.mockRestore(); + try { + jest + .spyOn(md, 'getTableMetadata') + .mockResolvedValue({ engine: 'MergeTree' } as never); + jest + .spyOn(md, 'getServerVersion') + .mockResolvedValue([26, 2, 99, 99] as const); + const getColumnsSpy = jest.spyOn(md, 'getColumns'); + const getSkipIndicesSpy = jest.spyOn(md, 'getSkipIndices'); + + const result = await md.getMapColumnTextIndexes({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + }); + + expect(result.size).toBe(0); + // Bailed out early — no follow-up column/index queries. + expect(getColumnsSpy).not.toHaveBeenCalled(); + expect(getSkipIndicesSpy).not.toHaveBeenCalled(); + } finally { + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); + } + }); + + it.each([ + ['ALIAS column name', 'LogAttributeItems'], + [ + 'resolved arrayMap expression', + "arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(LogAttributes, 'Array(Tuple(String, String))'))", + ], + ])( + 'getMapColumnTextIndexes detects items index when system.data_skipping_indices.expr is %s', + async (_label, indexExpression) => { + const md = buildMetadata(); + const globalStub = Metadata.prototype + .getMapColumnTextIndexes as jest.Mock; + globalStub.mockRestore(); + try { + jest + .spyOn(md, 'getTableMetadata') + .mockResolvedValue({ engine: 'MergeTree' } as never); + jest + .spyOn(md, 'getServerVersion') + .mockResolvedValue([26, 3, 0, 0] as const); + jest.spyOn(md, 'getColumns').mockResolvedValue([ + { + name: 'LogAttributeItems', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(LogAttributes, 'Array(Tuple(String, String))'))", + comment: '', + codec_expression: '', + ttl_expression: '', + }, + ] as never); + jest.spyOn(md, 'getSkipIndices').mockResolvedValue([ + { + name: 'idx_log_attr_items', + type: 'text', + typeFull: "text(tokenizer = 'array')", + expression: indexExpression, + granularity: 1, + }, + ] as never); + + const result = await md.getMapColumnTextIndexes({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + }); + + expect(result.get('LogAttributes')?.itemsIndex).toEqual({ + indexName: 'idx_log_attr_items', + separator: '=', + }); + } finally { + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); + } + }, + ); + }); }); describe('parseKeyPath', () => { diff --git a/packages/common-utils/src/core/clickhouseVersion.ts b/packages/common-utils/src/core/clickhouseVersion.ts index 0f781a2b10..863ac3285f 100644 --- a/packages/common-utils/src/core/clickhouseVersion.ts +++ b/packages/common-utils/src/core/clickhouseVersion.ts @@ -112,3 +112,20 @@ export function supportsDirectReadMap( if (!branchMin) return false; return compareClickHouseVersion(version, branchMin) >= 0; } + +/** + * First release that shipped the `mergeTreeTextIndex(database, table, index)` + * table function used to introspect text skip indices. + */ +const MERGE_TREE_TEXT_INDEX_MIN: ClickHouseVersion = [26, 3, 0, 0]; + +/** + * Returns true when the connected ClickHouse server supports the + * `mergeTreeTextIndex` table function (>= 26.3). Returns false when the + * version is undefined or older. + */ +export function supportsMergeTreeTextIndex( + version: ClickHouseVersion | undefined, +): boolean { + return isClickHouseVersionAtLeast(version, MERGE_TREE_TEXT_INDEX_MIN); +} diff --git a/packages/common-utils/src/core/kvItems.ts b/packages/common-utils/src/core/kvItems.ts new file mode 100644 index 0000000000..6e02da0c16 --- /dev/null +++ b/packages/common-utils/src/core/kvItems.ts @@ -0,0 +1,239 @@ +/** Describes a KV items column and its concat separator */ +export type KvItemsInfo = { + kvItemsColumn: string; + separator: string; +}; + +/** Map from map column name to its KV items info */ +export type KvItemsLookup = Map; + +/** + * Tokenizes a ClickHouse expression into meaningful tokens (identifiers, parens, + * commas, arrows, quoted strings, operators). Whitespace is skipped. + * Returns null if the expression contains unrecognized characters. + */ +function tokenizeExpression(expr: string): string[] | null { + const tokens: string[] = []; + let i = 0; + while (i < expr.length) { + // Skip whitespace + if (/\s/.test(expr[i])) { + i++; + continue; + } + // Arrow operator -> + if (expr[i] === '-' && expr[i + 1] === '>') { + tokens.push('->'); + i += 2; + continue; + } + // Cast operator :: + if (expr[i] === ':' && expr[i + 1] === ':') { + tokens.push('::'); + i += 2; + continue; + } + // Single-char tokens + if ('(),.'.includes(expr[i])) { + tokens.push(expr[i]); + i++; + continue; + } + // Quoted string (single or double) + if (expr[i] === "'" || expr[i] === '"') { + const quote = expr[i]; + let str = ''; + i++; // skip opening quote + while (i < expr.length && expr[i] !== quote) { + if (expr[i] === '\\') { + str += expr[i + 1] ?? ''; + i += 2; + } else { + str += expr[i]; + i++; + } + } + i++; // skip closing quote + tokens.push(`'${str}'`); // normalize to single-quote wrapper + continue; + } + // Identifier or keyword (word chars) + if (/\w/.test(expr[i])) { + let ident = ''; + while (i < expr.length && /\w/.test(expr[i])) { + ident += expr[i]; + i++; + } + tokens.push(ident); + continue; + } + // Unknown character — return null to signal unparseable expression + return null; + } + return tokens; +} + +/** + * Helper: parses the common arrayMap lambda prefix and concat body, returning the + * lambda variable name, separator, and remaining token position. + * Handles both parenthesized `(x) ->` and bare `x ->` lambda parameter forms. + */ +function parseArrayMapConcatPrefix( + tokens: string[], +): { lambdaVar: string; separator: string; pos: number } | undefined { + let pos = 0; + const expect = (expected: string): boolean => { + if (pos >= tokens.length || tokens[pos] !== expected) return false; + pos++; + return true; + }; + const read = (): string | undefined => tokens[pos++]; + + if (!expect('arrayMap') || !expect('(')) return undefined; + + // Lambda param: either (x) -> or x -> + let lambdaVar: string | undefined; + if (tokens[pos] === '(') { + pos++; // skip ( + lambdaVar = read(); + if (!lambdaVar || !expect(')')) return undefined; + } else { + lambdaVar = read(); + if (!lambdaVar) return undefined; + } + if (!expect('->')) return undefined; + + // concat(lambdaVar.1, '', lambdaVar.2) + if (!expect('concat') || !expect('(')) return undefined; + if (!expect(lambdaVar) || !expect('.') || !expect('1') || !expect(',')) + return undefined; + + const sepToken = read(); + if (!sepToken || !sepToken.startsWith("'") || !expect(',')) return undefined; + const separator = sepToken.slice(1, -1); + + if ( + !expect(lambdaVar) || + !expect('.') || + !expect('2') || + !expect(')') || + !expect(',') + ) + return undefined; + + return { lambdaVar, separator, pos }; +} + +/** + * Parses a KV items column's default_expression to extract the source map column name + * and the constant separator string used in the concat. + * Matches the inline-cast form: + * arrayMap((arr) -> concat(arr.1, '=', arr.2), X::Array(Tuple(String, String))) + * Also supports bare lambda param: arrayMap(x -> concat(...), ...) + * Returns { mapColumn, separator } if the expression matches, otherwise undefined. + */ +export function parseKvItemsExpression( + defaultExpression: string, +): { mapColumn: string; separator: string } | undefined { + const tokens = tokenizeExpression(defaultExpression); + if (!tokens) return undefined; + + const prefix = parseArrayMapConcatPrefix(tokens); + if (!prefix) return undefined; + + let pos = prefix.pos; + const expect = (expected: string): boolean => { + if (pos >= tokens.length || tokens[pos] !== expected) return false; + pos++; + return true; + }; + const read = (): string | undefined => tokens[pos++]; + + // X::Array(Tuple(String, String)) + const mapColumn = read(); + if (!mapColumn) return undefined; + if ( + !expect('::') || + !expect('Array') || + !expect('(') || + !expect('Tuple') || + !expect('(') || + !expect('String') || + !expect(',') || + !expect('String') || + !expect(')') || + !expect(')') || + !expect(')') + ) + return undefined; + + if (pos !== tokens.length) return undefined; + + return { mapColumn, separator: prefix.separator }; +} + +/** + * Parses a KV items column's default_expression using the CAST function form: + * arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(X, 'Array(Tuple(String, String))')) + * Also supports bare lambda param: arrayMap(x -> concat(...), ...) + * Returns { mapColumn, separator } if the expression matches, otherwise undefined. + */ +export function parseKvItemsCastExpression( + defaultExpression: string, +): { mapColumn: string; separator: string } | undefined { + const tokens = tokenizeExpression(defaultExpression); + if (!tokens) return undefined; + + const prefix = parseArrayMapConcatPrefix(tokens); + if (!prefix) return undefined; + + let pos = prefix.pos; + const expect = (expected: string): boolean => { + if (pos >= tokens.length || tokens[pos] !== expected) return false; + pos++; + return true; + }; + const read = (): string | undefined => tokens[pos++]; + + // CAST(X, 'Array(Tuple(String, String))') + if (!expect('CAST') || !expect('(')) return undefined; + const mapColumn = read(); + if (!mapColumn || !expect(',')) return undefined; + + // The type argument is a quoted string like 'Array(Tuple(String, String))' + const typeToken = read(); + if (!typeToken || !typeToken.startsWith("'")) return undefined; + const typeStr = typeToken.slice(1, -1); // strip quotes + const normalizedType = typeStr.replace(/\s+/g, ''); + if (normalizedType !== 'Array(Tuple(String,String))') return undefined; + + if (!expect(')') || !expect(')')) return undefined; + + if (pos !== tokens.length) return undefined; + + return { mapColumn, separator: prefix.separator }; +} + +/** + * Ordered list of strategies tried by callers when parsing a KV items column's + * default_expression. To add support for another shape, append a new function + * with the same signature. + */ +export const KV_ITEMS_STRATEGIES = [ + parseKvItemsExpression, + parseKvItemsCastExpression, +] as const; + +/** + * Tries each parsing strategy in order and returns the first successful match, + * or undefined when no strategy recognises the expression. + */ +export function parseKvItemsDefaultExpression( + defaultExpression: string, +): { mapColumn: string; separator: string } | undefined { + for (const strategy of KV_ITEMS_STRATEGIES) { + const parsed = strategy(defaultExpression); + if (parsed) return parsed; + } + return undefined; +} diff --git a/packages/common-utils/src/core/metadata.ts b/packages/common-utils/src/core/metadata.ts index a80feb1fad..ec26abdb99 100644 --- a/packages/common-utils/src/core/metadata.ts +++ b/packages/common-utils/src/core/metadata.ts @@ -21,7 +21,12 @@ import type { } from '@/types'; import { isLogSource, isTraceSource, SourceKind } from '@/types'; -import { ClickHouseVersion, parseClickHouseVersion } from './clickhouseVersion'; +import { + ClickHouseVersion, + parseClickHouseVersion, + supportsMergeTreeTextIndex, +} from './clickhouseVersion'; +import { parseKvItemsDefaultExpression } from './kvItems'; import { optimizeGetKeyValuesCalls, renderStartOfBucketExpr, @@ -30,6 +35,7 @@ import { getAlignedDateRange, getDistributedTableArgs, objectHash, + parseTokenizerFromTextIndex, } from './utils'; // If filters initially are taking too long to load, decrease this number. @@ -127,6 +133,11 @@ export type SkipIndexMetadata = { granularity: number; }; +export type MapColumnTextIndexes = { + keysIndex?: { indexName: string }; + itemsIndex?: { indexName: string; separator: string }; +}; + export class Metadata { private readonly clickhouseClient: BaseClickhouseClient; private readonly cache: MetadataCache; @@ -341,6 +352,79 @@ export class Metadata { })[0]; } + async getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }: TableConnection): Promise> { + return this.cache.getOrFetch( + `${connectionId}.${databaseName}.${tableName}.mapColumnTextIndexes`, + async () => { + const [tableMeta, version] = await Promise.all([ + this.getTableMetadata({ databaseName, tableName, connectionId }), + this.getServerVersion({ connectionId }), + ]); + // mergeTreeTextIndex table function requires ClickHouse >= 26.3, and + // only reads from a single local MergeTree table. + if (!supportsMergeTreeTextIndex(version)) return new Map(); + if (tableMeta?.engine === 'Distributed') return new Map(); + + const [columns, indices] = await Promise.all([ + this.getColumns({ databaseName, tableName, connectionId }), + this.getSkipIndices({ databaseName, tableName, connectionId }), + ]); + const norm = (s: string) => s.replace(/\s+/g, '').replace(/`/g, ''); + const lookup = new Map(); + const patch = (col: string, p: MapColumnTextIndexes) => + lookup.set(col, { ...lookup.get(col), ...p }); + + for (const i of indices) { + if (i.type !== 'text') continue; + if (parseTokenizerFromTextIndex(i)?.type !== 'array') continue; + const expr = norm(i.expression); + + const mapKeysMatch = expr.match(/^mapKeys\((.+)\)$/); + if (mapKeysMatch) { + patch(mapKeysMatch[1], { keysIndex: { indexName: i.name } }); + continue; + } + const alias = columns.find( + c => + (c.default_type === 'ALIAS' || + c.default_type === 'MATERIALIZED') && + !!c.default_expression && + (norm(c.name) === expr || norm(c.default_expression) === expr), + ); + const parsed = + alias && parseKvItemsDefaultExpression(alias.default_expression); + if (parsed) + patch(parsed.mapColumn, { + itemsIndex: { indexName: i.name, separator: parsed.separator }, + }); + } + return lookup; + }, + ); + } + + // `part_name IN (...)` predicate restricting a mergeTreeTextIndex scan to + // active data parts overlapping the date range. Fails loudly via throwIf() + // if any active part has min_time at the epoch sentinel — that means the + // table is not time-partitioned (or partitioning is broken) and we can't + // prove which parts overlap, so silently including them would risk + // returning data outside the requested range. + private partsOverlapFilter( + databaseName: string, + tableName: string, + dateRange?: [Date, Date], + ): ChSql { + const base = chSql`SELECT name FROM system.parts WHERE database = ${{ String: databaseName }} AND table = ${{ String: tableName }} AND active = 1`; + if (!dateRange) return chSql`part_name IN (${base})`; + return chSql`part_name IN (${base} + AND min_time <= fromUnixTimestamp64Milli(${{ Int64: dateRange[1].getTime() }}) + AND max_time >= fromUnixTimestamp64Milli(${{ Int64: dateRange[0].getTime() }}))`; + } + async getMapKeys({ databaseName, tableName, @@ -383,53 +467,104 @@ export class Metadata { return cachedKeys; } + // Prefer mergeTreeTextIndex when a covering text(tokenizer=array) index + // exists. Try the keys-only index (`mapKeys(X)`) first — each token IS a + // key, no string splitting required. Fall through to the items ALIAS + // column index (`XItems`) which carries `key${sep}value` tokens. + const indexes = ( + await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }) + ).get(column); + const partsFilter = this.partsOverlapFilter( + databaseName, + tableName, + dateRange, + ); + const runIndexQuery = async (sql: ChSql): Promise => + this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: this.getClickHouseSettings(), + }) + .then(r => r.json<{ key: string }>()) + .then(d => d.data.map(r => r.key).filter(Boolean)); + if (indexes?.keysIndex) { + const sql = chSql` + SELECT token AS key + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexes.keysIndex.indexName }}) + WHERE ${partsFilter} + GROUP BY key HAVING key != '' + LIMIT ${{ Int32: maxKeys }}`; + const keys = await runIndexQuery(sql); + if (keys.length > 0) { + this.cache.set(cacheKey, keys); + return keys; + } + } + if (indexes?.itemsIndex) { + const { indexName, separator } = indexes.itemsIndex; + const sql = chSql` + SELECT splitByChar(${{ String: separator }}, token)[1] AS key + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexName }}) + WHERE ${partsFilter} + GROUP BY key HAVING key != '' + LIMIT ${{ Int32: maxKeys }}`; + const keys = await runIndexQuery(sql); + if (keys.length > 0) { + this.cache.set(cacheKey, keys); + return keys; + } + } + // Rollup path: query the key rollup table filtered by ColumnIdentifier and date range if (metadataMVs && alignedDateRange) { - const rollupKeys = await this.cache.getOrFetch( - cacheKey, - async () => { - try { - const startExpr = renderStartOfBucketExpr( - metadataMVs.granularity, - chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[0].getTime() }})`, - ); - const endExpr = renderStartOfBucketExpr( - metadataMVs.granularity, - chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[1].getTime() }})`, - ); - const timeFilter = chSql`AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; - const sql = chSql` - SELECT Key - FROM ${tableExpr({ database: databaseName, table: metadataMVs.keyRollupTable })} - WHERE ColumnIdentifier = ${{ String: column }} - ${timeFilter} - GROUP BY Key - ORDER BY sum(count) DESC - LIMIT ${{ Int32: maxKeys }} - `; - - return await this.clickhouseClient - .query<'JSON'>({ - query: sql.sql, - query_params: sql.params, - connectionId, - clickhouse_settings: { - ...this.getClickHouseSettings(), - timeout_overflow_mode: 'break', - max_execution_time: 15, - max_rows_to_read: '0', - }, - }) - .then(res => res.json<{ Key: string }>()) - .then(d => d.data.map(row => row.Key).filter(k => k)); - } catch (e) { - console.warn('getMapKeys rollup query failed', e); - return []; - } - }, - ); + let rollupKeys: string[] = []; + try { + const startExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[0].getTime() }})`, + ); + const endExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[1].getTime() }})`, + ); + const timeFilter = chSql`AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; + const sql = chSql` + SELECT Key + FROM ${tableExpr({ database: databaseName, table: metadataMVs.keyRollupTable })} + WHERE ColumnIdentifier = ${{ String: column }} + ${timeFilter} + GROUP BY Key + ORDER BY sum(count) DESC + LIMIT ${{ Int32: maxKeys }} + `; + rollupKeys = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: { + ...this.getClickHouseSettings(), + timeout_overflow_mode: 'break', + max_execution_time: 15, + max_rows_to_read: '0', + }, + }) + .then(res => res.json<{ Key: string }>()) + .then(d => d.data.map(row => row.Key).filter(k => k)); + } catch (e) { + console.warn('getMapKeys rollup query failed', e); + } - if (rollupKeys.length > 0) return rollupKeys; + if (rollupKeys.length > 0) { + this.cache.set(cacheKey, rollupKeys); + return rollupKeys; + } } // Original path: scan main table @@ -635,31 +770,72 @@ export class Metadata { databaseName, tableName, column, - key, + keys, maxValues = 20, connectionId, dateRange, timestampValueExpression, + signal, }: { databaseName: string; tableName: string; column: string; - key?: string; + keys: string[]; maxValues?: number; dateRange?: [Date, Date]; timestampValueExpression?: string; connectionId: string; - }) { - const dateRangeCacheSuffix = - dateRange && timestampValueExpression - ? `${dateRange[0].getTime()}-${dateRange[1].getTime()}-${timestampValueExpression}` - : ''; - const cacheKey = `${connectionId}.${databaseName}.${tableName}.${column}.${key}.${dateRangeCacheSuffix}.values`; - - const cachedValues = this.cache.get(cacheKey); + signal?: AbortSignal; + }): Promise> { + if (keys.length === 0) return new Map(); - if (cachedValues != null) { - return cachedValues; + const idx = ( + await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }) + ).get(column)?.itemsIndex; + if (idx) { + const partsFilter = this.partsOverlapFilter( + databaseName, + tableName, + dateRange, + ); + const orChain = concatChSql( + ' OR ', + keys.map( + k => chSql`startsWith(token, ${{ String: `${k}${idx.separator}` }})`, + ), + ); + const sql = chSql` + SELECT substring(token, 1, position(token, ${{ String: idx.separator }}) - 1) AS key, + substring(token, position(token, ${{ String: idx.separator }}) + ${{ Int32: idx.separator.length }}) AS value + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: idx.indexName }}) + WHERE ${partsFilter} + AND (${orChain}) + GROUP BY key, value HAVING value != '' + LIMIT ${{ Int32: maxValues }} BY key`; + const rows = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: this.getClickHouseSettings(), + abort_signal: signal, + }) + .then(r => r.json<{ key: string; value: string }>()) + .then(d => d.data); + if (rows.length > 0) { + const result = new Map(); + for (const row of rows) { + if (!row.value) continue; + const arr = result.get(row.key) ?? []; + arr.push(row.value); + result.set(row.key, arr); + } + return result; + } } const timeFilterCondition = @@ -675,55 +851,43 @@ export class Metadata { metadata: this, }) : null; - // `value != ''` stays first so existing behavior is preserved; source filters - // and time filter are appended via AND when provided. const whereConditions: ChSql[] = [ chSql`value != ''`, ...(timeFilterCondition ? [timeFilterCondition] : []), ]; const where = chSql`WHERE ${concatChSql(' AND ', ...whereConditions)}`; - const sql = key - ? chSql` - SELECT DISTINCT ${{ - Identifier: column, - }}[${{ String: key }}] as value - FROM ${tableExpr({ database: databaseName, table: tableName })} - ${where} - LIMIT ${{ - Int32: maxValues, - }} - ` - : chSql` - SELECT DISTINCT ${{ - Identifier: column, - }} as value - FROM ${tableExpr({ database: databaseName, table: tableName })} - ${where} - LIMIT ${{ - Int32: maxValues, - }} - `; + const settings = { + max_rows_to_read: String( + this.getClickHouseSettings().max_rows_to_read ?? + DEFAULT_METADATA_MAX_ROWS_TO_READ, + ), + read_overflow_mode: 'break' as const, + ...this.getClickHouseSettings(), + }; - return this.cache.getOrFetch(cacheKey, async () => { - const values = await this.clickhouseClient - .query<'JSON'>({ - query: sql.sql, - query_params: sql.params, - connectionId, - clickhouse_settings: { - max_rows_to_read: String( - this.getClickHouseSettings().max_rows_to_read ?? - DEFAULT_METADATA_MAX_ROWS_TO_READ, - ), - read_overflow_mode: 'break', - ...this.getClickHouseSettings(), - }, - }) - .then(res => res.json<{ value: string }>()) - .then(d => d.data.map(row => row.value)); - return values; - }); + const result = new Map(); + await Promise.all( + keys.map(async k => { + const sql = chSql` + SELECT DISTINCT ${{ Identifier: column }}[${{ String: k }}] AS value + FROM ${tableExpr({ database: databaseName, table: tableName })} + ${where} + LIMIT ${{ Int32: maxValues }}`; + const values = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: settings, + abort_signal: signal, + }) + .then(res => res.json<{ value: string }>()) + .then(d => d.data.map(row => row.value).filter(Boolean)); + if (values.length > 0) result.set(k, values); + }), + ); + return result; } async getAllFields({ @@ -1361,6 +1525,121 @@ export class Metadata { }; }); + // Batch-query mergeTreeTextIndex for any map keys whose column has a + // covering text(tokenizer=array) index; remaining keys (native columns, + // unindexed map columns) fall through to the MV/scan path below. + const textIndexResults = new Map(); + const textIndexes = await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }); + const keysByColumn = new Map(); + for (const p of parsed) { + if (!p.mapKey || !textIndexes.get(p.column)?.itemsIndex) continue; + const arr = keysByColumn.get(p.column) ?? []; + arr.push(p.mapKey); + keysByColumn.set(p.column, arr); + } + await Promise.all( + [...keysByColumn.entries()].map(async ([column, mapKeys]) => { + const valuesByKey = await this.getMapValues({ + databaseName, + tableName, + column, + keys: mapKeys, + maxValues: maxValuesPerKey, + connectionId, + dateRange, + timestampValueExpression, + signal, + }); + for (const p of parsed) { + if (p.column !== column || !p.mapKey) continue; + const values = valuesByKey.get(p.mapKey); + if (values?.length) textIndexResults.set(p.keyExpression, values); + } + }), + ); + + const remaining = parsed.filter( + p => !textIndexResults.has(p.keyExpression), + ); + if (remaining.length === 0) { + return keyExpressions.map(k => ({ + key: k, + value: textIndexResults.get(k) ?? [], + })); + } + const mergeResults = (partial: { key: string; value: string[] }[]) => { + const m = new Map(partial.map(r => [r.key, r.value])); + return keyExpressions.map(k => ({ + key: k, + value: textIndexResults.get(k) ?? m.get(k) ?? [], + })); + }; + + // Per-key fallback when neither text-index nor MV served a key. For map + // keys, delegate to getMapValues; for native columns (no mapKey), scan + // the source column directly. + const fetchPerKeyFallback = async ( + p: (typeof parsed)[number], + ): Promise => { + if (p.mapKey) { + const m = await this.getMapValues({ + databaseName, + tableName, + column: p.column, + keys: [p.mapKey], + maxValues: maxValuesPerKey, + connectionId, + dateRange, + timestampValueExpression, + }); + return m.get(p.mapKey) || []; + } + const timeFilterCondition = + dateRange && timestampValueExpression + ? await timeFilterExpr({ + connectionId, + databaseName, + tableName, + dateRange, + dateRangeStartInclusive: true, + dateRangeEndInclusive: true, + timestampValueExpression, + metadata: this, + }) + : null; + const where = chSql`WHERE ${concatChSql( + ' AND ', + chSql`value != ''`, + ...(timeFilterCondition ? [timeFilterCondition] : []), + )}`; + const sql = chSql` + SELECT DISTINCT ${{ Identifier: p.column }} AS value + FROM ${tableExpr({ database: databaseName, table: tableName })} + ${where} + LIMIT ${{ Int32: maxValuesPerKey }}`; + return this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: { + max_rows_to_read: String( + this.getClickHouseSettings().max_rows_to_read ?? + DEFAULT_METADATA_MAX_ROWS_TO_READ, + ), + read_overflow_mode: 'break', + ...this.getClickHouseSettings(), + }, + abort_signal: signal, + }) + .then(res => res.json<{ value: string }>()) + .then(d => d.data.map(row => row.value).filter(Boolean)); + }; + // Try rollup table first when available if (metadataMVs && dateRange) { const alignedDateRange = getAlignedDateRange( @@ -1378,7 +1657,7 @@ export class Metadata { ); const timeFilter = chSql`AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; - const sortedKeyIds = parsed + const sortedKeyIds = remaining .map(p => `${p.rollupColumn}:${p.rollupKey}`) .sort() .join(','); @@ -1386,7 +1665,7 @@ export class Metadata { const tupleParams = concatChSql( ',', - parsed.map( + remaining.map( p => chSql`(${{ String: p.rollupColumn }}, ${{ String: p.rollupKey }})`, ), @@ -1448,44 +1727,30 @@ export class Metadata { } // Build results, falling back to getMapValues for keys with no rollup data - return Promise.all( - parsed.map(async p => { + const partial = await Promise.all( + remaining.map(async p => { const mapKey = `${p.rollupColumn}:${p.rollupKey}`; const values = resultMap.get(mapKey); if (values && values.length > 0) { return { key: p.keyExpression, value: values }; } - const fallback = await this.getMapValues({ - databaseName, - tableName, - column: p.column, - key: p.mapKey, - maxValues: maxValuesPerKey, - connectionId, - dateRange, - timestampValueExpression, - }); - return { key: p.keyExpression, value: fallback }; + return { + key: p.keyExpression, + value: await fetchPerKeyFallback(p), + }; }), ); + return mergeResults(partial); } // No rollup available — fall back to main table scan for all keys - return Promise.all( - parsed.map(async p => { - const value = await this.getMapValues({ - databaseName, - tableName, - column: p.column, - key: p.mapKey, - maxValues: maxValuesPerKey, - connectionId, - dateRange, - timestampValueExpression, - }); - return { key: p.keyExpression, value }; - }), + const partial = await Promise.all( + remaining.map(async p => ({ + key: p.keyExpression, + value: await fetchPerKeyFallback(p), + })), ); + return mergeResults(partial); } /** @@ -1528,7 +1793,64 @@ export class Metadata { ); const timeFilter = chSql`Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; - const cacheKey = `${connectionId}.${databaseName}.${tableName}.${alignedDateRange[0].getTime()}.${alignedDateRange[1].getTime()}.fieldsAndValues.${maxValuesPerKey}.${maxKeys ?? 'all'}`; + // Map columns with an items text index are served directly from + // mergeTreeTextIndex and excluded from the MV query below to avoid + // double-counting their entries. Map columns that only have a keys-only + // index don't carry values, so they stay in the MV path. + const textIndexes = await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }); + const itemsIndexed = [...textIndexes.entries()].flatMap(([col, info]) => + info.itemsIndex ? [[col, info.itemsIndex] as const] : [], + ); + const textIndexResults: { key: string; value: string[] }[] = []; + if (itemsIndexed.length > 0) { + const cacheKey = `${connectionId}.${databaseName}.${tableName}.${dateRange[0].getTime()}.${dateRange[1].getTime()}.fieldsAndValues.textIndex.${maxValuesPerKey}.${maxKeys ?? 'all'}`; + const indexed = await this.cache.getOrFetch(cacheKey, async () => { + const perColumn = await Promise.all( + itemsIndexed.map(async ([column]) => { + const sampledKeys = await this.getMapKeys({ + databaseName, + tableName, + column, + maxKeys: maxKeys ?? DEFAULT_MAX_KEYS, + connectionId, + dateRange, + }); + if (sampledKeys.length === 0) return []; + const valuesByKey = await this.getMapValues({ + databaseName, + tableName, + column, + keys: sampledKeys, + maxValues: maxValuesPerKey, + connectionId, + dateRange, + signal, + }); + return [...valuesByKey.entries()].map(([key, values]) => ({ + key: `${column}['${key}']`, + value: values, + })); + }), + ); + return perColumn.flat(); + }); + textIndexResults.push(...indexed); + } + + const excludedIds = itemsIndexed.map(([col]) => col); + const excludeClause = + excludedIds.length > 0 + ? chSql`AND ColumnIdentifier NOT IN (${concatChSql( + ',', + excludedIds.map(c => chSql`${{ String: c }}`), + )})` + : chSql``; + + const cacheKey = `${connectionId}.${databaseName}.${tableName}.${alignedDateRange[0].getTime()}.${alignedDateRange[1].getTime()}.fieldsAndValues.${maxValuesPerKey}.${maxKeys ?? 'all'}.excl_${excludedIds.sort().join(',')}`; type RollupRow = { ColumnIdentifier: string; @@ -1545,6 +1867,7 @@ export class Metadata { FROM ${tableExpr({ database: databaseName, table: metadataMVs.kvRollupTable })} WHERE Value != '' AND ${timeFilter} + ${excludeClause} GROUP BY ColumnIdentifier, Key ORDER BY ColumnIdentifier = 'NativeColumn' DESC, ColumnIdentifier, Key ${limitClause} @@ -1567,13 +1890,15 @@ export class Metadata { .then(d => d.data); }); - return rows.map(row => { + const mvResults = rows.map(row => { const keyExpr = row.ColumnIdentifier === 'NativeColumn' ? row.Key : `${row.ColumnIdentifier}['${row.Key}']`; return { key: keyExpr, value: row.Values }; }); + + return [...mvResults, ...textIndexResults]; } async getKeyValues({ diff --git a/packages/common-utils/src/queryParser.ts b/packages/common-utils/src/queryParser.ts index b1c5537498..50c1d55be3 100644 --- a/packages/common-utils/src/queryParser.ts +++ b/packages/common-utils/src/queryParser.ts @@ -10,6 +10,17 @@ import { JSDataType, } from '@/clickhouse'; import { supportsDirectReadMap } from '@/core/clickhouseVersion'; +import { + KV_ITEMS_STRATEGIES, + KvItemsInfo, + KvItemsLookup, +} from '@/core/kvItems'; + +export type { KvItemsInfo, KvItemsLookup } from '@/core/kvItems'; +export { + parseKvItemsCastExpression, + parseKvItemsExpression, +} from '@/core/kvItems'; import { Metadata, parseKeyPath, @@ -844,228 +855,6 @@ function renderArrayFieldExpression({ ); } -/** Describes a KV items column and its concat separator */ -export type KvItemsInfo = { - kvItemsColumn: string; - separator: string; -}; - -/** Map from map column name to its KV items info */ -export type KvItemsLookup = Map; - -/** - * Tokenizes a ClickHouse expression into meaningful tokens (identifiers, parens, - * commas, arrows, quoted strings, operators). Whitespace is skipped. - * Returns null if the expression contains unrecognized characters. - */ -function tokenizeExpression(expr: string): string[] | null { - const tokens: string[] = []; - let i = 0; - while (i < expr.length) { - // Skip whitespace - if (/\s/.test(expr[i])) { - i++; - continue; - } - // Arrow operator -> - if (expr[i] === '-' && expr[i + 1] === '>') { - tokens.push('->'); - i += 2; - continue; - } - // Cast operator :: - if (expr[i] === ':' && expr[i + 1] === ':') { - tokens.push('::'); - i += 2; - continue; - } - // Single-char tokens - if ('(),.'.includes(expr[i])) { - tokens.push(expr[i]); - i++; - continue; - } - // Quoted string (single or double) - if (expr[i] === "'" || expr[i] === '"') { - const quote = expr[i]; - let str = ''; - i++; // skip opening quote - while (i < expr.length && expr[i] !== quote) { - if (expr[i] === '\\') { - str += expr[i + 1] ?? ''; - i += 2; - } else { - str += expr[i]; - i++; - } - } - i++; // skip closing quote - tokens.push(`'${str}'`); // normalize to single-quote wrapper - continue; - } - // Identifier or keyword (word chars) - if (/\w/.test(expr[i])) { - let ident = ''; - while (i < expr.length && /\w/.test(expr[i])) { - ident += expr[i]; - i++; - } - tokens.push(ident); - continue; - } - // Unknown character — return null to signal unparseable expression - return null; - } - return tokens; -} - -/** - * Helper: parses the common arrayMap lambda prefix and concat body, returning the - * lambda variable name, separator, and remaining token position. - * Handles both parenthesized `(x) ->` and bare `x ->` lambda parameter forms. - */ -function parseArrayMapConcatPrefix( - tokens: string[], -): { lambdaVar: string; separator: string; pos: number } | undefined { - let pos = 0; - const expect = (expected: string): boolean => { - if (pos >= tokens.length || tokens[pos] !== expected) return false; - pos++; - return true; - }; - const read = (): string | undefined => tokens[pos++]; - - if (!expect('arrayMap') || !expect('(')) return undefined; - - // Lambda param: either (x) -> or x -> - let lambdaVar: string | undefined; - if (tokens[pos] === '(') { - pos++; // skip ( - lambdaVar = read(); - if (!lambdaVar || !expect(')')) return undefined; - } else { - lambdaVar = read(); - if (!lambdaVar) return undefined; - } - if (!expect('->')) return undefined; - - // concat(lambdaVar.1, '', lambdaVar.2) - if (!expect('concat') || !expect('(')) return undefined; - if (!expect(lambdaVar) || !expect('.') || !expect('1') || !expect(',')) - return undefined; - - const sepToken = read(); - if (!sepToken || !sepToken.startsWith("'") || !expect(',')) return undefined; - const separator = sepToken.slice(1, -1); - - if ( - !expect(lambdaVar) || - !expect('.') || - !expect('2') || - !expect(')') || - !expect(',') - ) - return undefined; - - return { lambdaVar, separator, pos }; -} - -/** - * Parses a KV items column's default_expression to extract the source map column name - * and the constant separator string used in the concat. - * Matches the inline-cast form: - * arrayMap((arr) -> concat(arr.1, '=', arr.2), X::Array(Tuple(String, String))) - * Also supports bare lambda param: arrayMap(x -> concat(...), ...) - * Returns { mapColumn, separator } if the expression matches, otherwise undefined. - */ -export function parseKvItemsExpression( - defaultExpression: string, -): { mapColumn: string; separator: string } | undefined { - const tokens = tokenizeExpression(defaultExpression); - if (!tokens) return undefined; - - const prefix = parseArrayMapConcatPrefix(tokens); - if (!prefix) return undefined; - - let pos = prefix.pos; - const expect = (expected: string): boolean => { - if (pos >= tokens.length || tokens[pos] !== expected) return false; - pos++; - return true; - }; - const read = (): string | undefined => tokens[pos++]; - - // X::Array(Tuple(String, String)) - const mapColumn = read(); - if (!mapColumn) return undefined; - if ( - !expect('::') || - !expect('Array') || - !expect('(') || - !expect('Tuple') || - !expect('(') || - !expect('String') || - !expect(',') || - !expect('String') || - !expect(')') || - !expect(')') || - !expect(')') - ) - return undefined; - - if (pos !== tokens.length) return undefined; - - return { mapColumn, separator: prefix.separator }; -} - -/** - * Parses a KV items column's default_expression using the CAST function form: - * arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(X, 'Array(Tuple(String, String))')) - * Also supports bare lambda param: arrayMap(x -> concat(...), ...) - * Returns { mapColumn, separator } if the expression matches, otherwise undefined. - */ -export function parseKvItemsCastExpression( - defaultExpression: string, -): { mapColumn: string; separator: string } | undefined { - const tokens = tokenizeExpression(defaultExpression); - if (!tokens) return undefined; - - const prefix = parseArrayMapConcatPrefix(tokens); - if (!prefix) return undefined; - - let pos = prefix.pos; - const expect = (expected: string): boolean => { - if (pos >= tokens.length || tokens[pos] !== expected) return false; - pos++; - return true; - }; - const read = (): string | undefined => tokens[pos++]; - - // CAST(X, 'Array(Tuple(String, String))') - if (!expect('CAST') || !expect('(')) return undefined; - const mapColumn = read(); - if (!mapColumn || !expect(',')) return undefined; - - // The type argument is a quoted string like 'Array(Tuple(String, String))' - const typeToken = read(); - if (!typeToken || !typeToken.startsWith("'")) return undefined; - const typeStr = typeToken.slice(1, -1); // strip quotes - const normalizedType = typeStr.replace(/\s+/g, ''); - if (normalizedType !== 'Array(Tuple(String,String))') return undefined; - - if (!expect(')') || !expect(')')) return undefined; - - if (pos !== tokens.length) return undefined; - - return { mapColumn, separator: prefix.separator }; -} - -// To add another known KV items parsing strategy, simply define another function with the same signature and add the strategy to this array -const KV_ITEMS_STRATEGIES = [ - parseKvItemsExpression, - parseKvItemsCastExpression, -] as const; - export class CustomSchemaSQLSerializerV2 extends SQLSerializer { private metadata: Metadata; private tableName: string; From ae2beebb932db71cebdd8456aa51edc255482555 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Fri, 29 May 2026 17:02:47 -0400 Subject: [PATCH 05/10] fix: cleanup --- packages/common-utils/src/core/metadata.ts | 175 +++++++++++++++------ 1 file changed, 123 insertions(+), 52 deletions(-) diff --git a/packages/common-utils/src/core/metadata.ts b/packages/common-utils/src/core/metadata.ts index f26a248c28..f5446113ed 100644 --- a/packages/common-utils/src/core/metadata.ts +++ b/packages/common-utils/src/core/metadata.ts @@ -21,7 +21,6 @@ import type { } from '@/types'; import { isLogSource, isTraceSource, SourceKind } from '@/types'; -import { ClickHouseVersion, parseClickHouseVersion } from './clickhouseVersion'; import { ClickHouseVersion, parseClickHouseVersion, @@ -484,8 +483,14 @@ export class Metadata { tableName, dateRange, ); - const runIndexQuery = async (sql: ChSql): Promise => - this.clickhouseClient + if (indexes?.keysIndex) { + const sql = chSql` + SELECT token AS key + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexes.keysIndex.indexName }}) + WHERE ${partsFilter} + GROUP BY key HAVING key != '' + LIMIT ${{ Int32: maxKeys }}`; + const keys = await this.clickhouseClient .query<'JSON'>({ query: sql.sql, query_params: sql.params, @@ -494,28 +499,27 @@ export class Metadata { }) .then(r => r.json<{ key: string }>()) .then(d => d.data.map(r => r.key).filter(Boolean)); - if (indexes?.keysIndex) { - const sql = chSql` - SELECT token AS key - FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexes.keysIndex.indexName }}) - WHERE ${partsFilter} - GROUP BY key HAVING key != '' - LIMIT ${{ Int32: maxKeys }}`; - const keys = await runIndexQuery(sql); if (keys.length > 0) { this.cache.set(cacheKey, keys); return keys; } } if (indexes?.itemsIndex) { - const { indexName, separator } = indexes.itemsIndex; const sql = chSql` - SELECT splitByChar(${{ String: separator }}, token)[1] AS key - FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexName }}) + SELECT splitByChar(${{ String: indexes.itemsIndex.separator }}, token)[1] AS key + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexes.itemsIndex.indexName }}) WHERE ${partsFilter} GROUP BY key HAVING key != '' LIMIT ${{ Int32: maxKeys }}`; - const keys = await runIndexQuery(sql); + const keys = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: this.getClickHouseSettings(), + }) + .then(r => r.json<{ key: string }>()) + .then(d => d.data.map(r => r.key).filter(Boolean)); if (keys.length > 0) { this.cache.set(cacheKey, keys); return keys; @@ -774,6 +778,7 @@ export class Metadata { keys, maxValues = 20, connectionId, + metadataMVs, dateRange, timestampValueExpression, signal, @@ -783,6 +788,7 @@ export class Metadata { column: string; keys: string[]; maxValues?: number; + metadataMVs?: MetadataMaterializedViews; dateRange?: [Date, Date]; timestampValueExpression?: string; connectionId: string; @@ -798,44 +804,108 @@ export class Metadata { }) ).get(column)?.itemsIndex; if (idx) { - const partsFilter = this.partsOverlapFilter( - databaseName, - tableName, - dateRange, - ); - const orChain = concatChSql( - ' OR ', - keys.map( - k => chSql`startsWith(token, ${{ String: `${k}${idx.separator}` }})`, - ), - ); - const sql = chSql` - SELECT substring(token, 1, position(token, ${{ String: idx.separator }}) - 1) AS key, - substring(token, position(token, ${{ String: idx.separator }}) + ${{ Int32: idx.separator.length }}) AS value - FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: idx.indexName }}) - WHERE ${partsFilter} - AND (${orChain}) - GROUP BY key, value HAVING value != '' - LIMIT ${{ Int32: maxValues }} BY key`; - const rows = await this.clickhouseClient - .query<'JSON'>({ - query: sql.sql, - query_params: sql.params, - connectionId, - clickhouse_settings: this.getClickHouseSettings(), - abort_signal: signal, - }) - .then(r => r.json<{ key: string; value: string }>()) - .then(d => d.data); - if (rows.length > 0) { - const result = new Map(); - for (const row of rows) { - if (!row.value) continue; - const arr = result.get(row.key) ?? []; - arr.push(row.value); - result.set(row.key, arr); + try { + const partsFilter = this.partsOverlapFilter( + databaseName, + tableName, + dateRange, + ); + const orChain = concatChSql( + ' OR ', + keys.map( + k => + chSql`startsWith(token, ${{ String: `${k}${idx.separator}` }})`, + ), + ); + const sql = chSql` + SELECT substring(token, 1, position(token, ${{ String: idx.separator }}) - 1) AS key, + substring(token, position(token, ${{ String: idx.separator }}) + ${{ Int32: idx.separator.length }}) AS value + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: idx.indexName }}) + WHERE ${partsFilter} + AND (${orChain}) + GROUP BY key, value HAVING value != '' + LIMIT ${{ Int32: maxValues }} BY key`; + const rows = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: this.getClickHouseSettings(), + abort_signal: signal, + }) + .then(r => r.json<{ key: string; value: string }>()) + .then(d => d.data); + if (rows.length > 0) { + const result = new Map(); + for (const row of rows) { + if (!row.value) continue; + const arr = result.get(row.key) ?? []; + arr.push(row.value); + result.set(row.key, arr); + } + return result; + } + } catch (e) { + console.warn('getMapValues text-index query failed', e); + } + } + + if (metadataMVs && dateRange) { + try { + const alignedDateRange = getAlignedDateRange( + dateRange, + metadataMVs.granularity, + ); + const startExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[0].getTime() }})`, + ); + const endExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[1].getTime() }})`, + ); + const keyList = concatChSql( + ',', + keys.map(k => chSql`${{ String: k }}`), + ); + const sql = chSql` + SELECT Key AS key, Value AS value + FROM ${tableExpr({ database: databaseName, table: metadataMVs.kvRollupTable })} + WHERE ColumnIdentifier = ${{ String: column }} + AND Key IN (${keyList}) + AND Value != '' + AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr} + GROUP BY Key, Value + ORDER BY sum(count) DESC + LIMIT ${{ Int32: maxValues }} BY Key + `; + const rows = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: { + ...this.getClickHouseSettings(), + timeout_overflow_mode: 'break', + max_execution_time: 15, + max_rows_to_read: '0', + }, + abort_signal: signal, + }) + .then(r => r.json<{ key: string; value: string }>()) + .then(d => d.data); + if (rows.length > 0) { + const result = new Map(); + for (const row of rows) { + if (!row.value) continue; + const arr = result.get(row.key) ?? []; + arr.push(row.value); + result.set(row.key, arr); + } + return result; } - return result; + } catch (e) { + console.warn('getMapValues MV rollup query failed', e); } } @@ -1828,6 +1898,7 @@ export class Metadata { keys: sampledKeys, maxValues: maxValuesPerKey, connectionId, + metadataMVs, dateRange, signal, }); From 5dc9015105d23562412c4e1b248171acf35c949e Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Fri, 29 May 2026 19:02:29 -0400 Subject: [PATCH 06/10] fix tests --- .../src/__tests__/metadata.test.ts | 116 ++++++++++++++++++ packages/common-utils/src/core/metadata.ts | 5 +- 2 files changed, 118 insertions(+), 3 deletions(-) diff --git a/packages/common-utils/src/__tests__/metadata.test.ts b/packages/common-utils/src/__tests__/metadata.test.ts index 4abb1a8b4a..9c52bd5925 100644 --- a/packages/common-utils/src/__tests__/metadata.test.ts +++ b/packages/common-utils/src/__tests__/metadata.test.ts @@ -1622,6 +1622,93 @@ describe('Metadata', () => { expect(call.query).toContain('startsWith(token'); }); + it('getMapValues falls through text-index -> MV -> main-table scan', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_kv_rollup_15m', + ); + expect(result.get('service.name')).toEqual(['api']); + }); + + it('getMapValues falls through to main-table scan when text-index and MV both empty', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ value: 'api' }] }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + timestampValueExpression: 'Timestamp', + }); + + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 3, + ); + const mainScanCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[2][0]; + expect(mainScanCall.query).not.toContain('mergeTreeTextIndex'); + expect(mainScanCall.query).not.toContain('logs_kv_rollup_15m'); + expect(mainScanCall.query).toContain('SELECT DISTINCT'); + expect(result.get('service.name')).toEqual(['api']); + }); + it('getAllKeyValues uses mergeTreeTextIndex for map keys whose column has an index, MV for the rest', async () => { const md = buildMetadata(); jest @@ -2104,6 +2191,35 @@ describe('Metadata', () => { } }); + it('getMapColumnTextIndexes returns empty Map when base table does not exist', async () => { + const md = buildMetadata(); + const globalStub = Metadata.prototype + .getMapColumnTextIndexes as jest.Mock; + globalStub.mockRestore(); + try { + jest.spyOn(md, 'getTableMetadata').mockResolvedValue(undefined); + jest + .spyOn(md, 'getServerVersion') + .mockResolvedValue([26, 3, 0, 0] as const); + const getColumnsSpy = jest.spyOn(md, 'getColumns'); + const getSkipIndicesSpy = jest.spyOn(md, 'getSkipIndices'); + + const result = await md.getMapColumnTextIndexes({ + databaseName: 'default', + tableName: 'unused_base_table', + connectionId: 'conn-1', + }); + + expect(result.size).toBe(0); + expect(getColumnsSpy).not.toHaveBeenCalled(); + expect(getSkipIndicesSpy).not.toHaveBeenCalled(); + } finally { + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); + } + }); + it.each([ ['ALIAS column name', 'LogAttributeItems'], [ diff --git a/packages/common-utils/src/core/metadata.ts b/packages/common-utils/src/core/metadata.ts index f5446113ed..bc437a0b19 100644 --- a/packages/common-utils/src/core/metadata.ts +++ b/packages/common-utils/src/core/metadata.ts @@ -364,10 +364,9 @@ export class Metadata { this.getTableMetadata({ databaseName, tableName, connectionId }), this.getServerVersion({ connectionId }), ]); - // mergeTreeTextIndex table function requires ClickHouse >= 26.3, and - // only reads from a single local MergeTree table. if (!supportsMergeTreeTextIndex(version)) return new Map(); - if (tableMeta?.engine === 'Distributed') return new Map(); + if (!tableMeta) return new Map(); + if (tableMeta.engine === 'Distributed') return new Map(); const [columns, indices] = await Promise.all([ this.getColumns({ databaseName, tableName, connectionId }), From 36244d07632c677d209a766fe9aafdc432aefb9e Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Mon, 1 Jun 2026 12:54:01 -0400 Subject: [PATCH 07/10] fix: date range in cache key if no metadataMVs --- .../src/__tests__/metadata.test.ts | 43 +++++++++++++++++++ packages/common-utils/src/core/metadata.ts | 14 +++--- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/packages/common-utils/src/__tests__/metadata.test.ts b/packages/common-utils/src/__tests__/metadata.test.ts index 9c52bd5925..e7dafc10f6 100644 --- a/packages/common-utils/src/__tests__/metadata.test.ts +++ b/packages/common-utils/src/__tests__/metadata.test.ts @@ -1592,6 +1592,49 @@ describe('Metadata', () => { expect(call.query).not.toContain('mergeTreeTextIndex'); }); + it('getMapKeys caches text-index results distinctly by raw dateRange when no timestampValueExpression or metadataMVs is provided', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'first.range.key' }] }), + }) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'second.range.key' }] }), + }); + + const baseArgs = { + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + }; + + const keysA = await md.getMapKeys({ + ...baseArgs, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + const keysB = await md.getMapKeys({ + ...baseArgs, + dateRange: [ + new Date('2026-05-11T18:00:00Z'), + new Date('2026-05-11T19:00:00Z'), + ], + }); + + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + expect(keysA).toEqual(['first.range.key']); + expect(keysB).toEqual(['second.range.key']); + }); + it('getMapValues uses mergeTreeTextIndex when keys are requested and an items index exists', async () => { const md = buildMetadata(); jest diff --git a/packages/common-utils/src/core/metadata.ts b/packages/common-utils/src/core/metadata.ts index bc437a0b19..49378014d5 100644 --- a/packages/common-utils/src/core/metadata.ts +++ b/packages/common-utils/src/core/metadata.ts @@ -445,21 +445,19 @@ export class Metadata { dateRange?: [Date, Date]; timestampValueExpression?: string; }) { - // Align date range to rollup granularity for consistent cache keys const alignedDateRange = metadataMVs && dateRange ? getAlignedDateRange(dateRange, metadataMVs.granularity) : undefined; - const dateRangeCacheSuffix = - dateRange && timestampValueExpression - ? `${dateRange[0].getTime()}-${dateRange[1].getTime()}-${timestampValueExpression}` - : ''; + const dateRangeCacheSuffix = dateRange + ? `${dateRange[0].getTime()}-${dateRange[1].getTime()}${ + timestampValueExpression ? `-${timestampValueExpression}` : '' + }` + : ''; const cacheKey = metricName ? `${connectionId}.${databaseName}.${tableName}.${column}.${metricName}.${dateRangeCacheSuffix}.keys` - : metadataMVs && alignedDateRange - ? `${connectionId}.${databaseName}.${tableName}.${column}.${alignedDateRange[0].getTime()}.${alignedDateRange[1].getTime()}.keys` - : `${connectionId}.${databaseName}.${tableName}.${column}.${dateRangeCacheSuffix}.keys`; + : `${connectionId}.${databaseName}.${tableName}.${column}.${dateRangeCacheSuffix}.keys`; const cachedKeys = this.cache.get(cacheKey); if (cachedKeys != null) { From 9f8ee5058e3551c9f9084edc3619f6bfe73a82ec Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:47:30 -0400 Subject: [PATCH 08/10] fix: use query date range + timestamp to get part names from table directly --- .../src/__tests__/metadata.test.ts | 59 +++++++++++++++++++ packages/common-utils/src/core/metadata.ts | 57 +++++++++++------- 2 files changed, 96 insertions(+), 20 deletions(-) diff --git a/packages/common-utils/src/__tests__/metadata.test.ts b/packages/common-utils/src/__tests__/metadata.test.ts index e7dafc10f6..519d03adce 100644 --- a/packages/common-utils/src/__tests__/metadata.test.ts +++ b/packages/common-utils/src/__tests__/metadata.test.ts @@ -1467,6 +1467,65 @@ describe('Metadata', () => { (timeFilterExpr as jest.Mock).mockClear(); }); + it('getMapKeys scopes mergeTreeTextIndex via SELECT _part subquery when dateRange and timestampValueExpression are provided', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'service.name' }] }), + }); + + await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + timestampValueExpression: 'TimestampTime', + }); + + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).toContain('part_name IN'); + expect(call.query).toContain('SELECT _part'); + expect(call.query).toContain('GROUP BY _part'); + expect(call.query).not.toContain('system.parts'); + expect(call.query).not.toContain('min_time'); + expect(call.query).not.toContain('max_time'); + }); + + it('getMapKeys emits no part-narrowing predicate when timestampValueExpression is absent', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'service.name' }] }), + }); + + await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).not.toContain('part_name'); + expect(call.query).not.toContain('SELECT _part'); + }); + it('getMapKeys prefers the keys-only mapKeys(X) text index over the items index', async () => { const md = buildMetadata(); jest diff --git a/packages/common-utils/src/core/metadata.ts b/packages/common-utils/src/core/metadata.ts index 49378014d5..e0691aba05 100644 --- a/packages/common-utils/src/core/metadata.ts +++ b/packages/common-utils/src/core/metadata.ts @@ -406,22 +406,35 @@ export class Metadata { ); } - // `part_name IN (...)` predicate restricting a mergeTreeTextIndex scan to - // active data parts overlapping the date range. Fails loudly via throwIf() - // if any active part has min_time at the epoch sentinel — that means the - // table is not time-partitioned (or partitioning is broken) and we can't - // prove which parts overlap, so silently including them would risk - // returning data outside the requested range. - private partsOverlapFilter( - databaseName: string, - tableName: string, - dateRange?: [Date, Date], - ): ChSql { - const base = chSql`SELECT name FROM system.parts WHERE database = ${{ String: databaseName }} AND table = ${{ String: tableName }} AND active = 1`; - if (!dateRange) return chSql`part_name IN (${base})`; - return chSql`part_name IN (${base} - AND min_time <= fromUnixTimestamp64Milli(${{ Int64: dateRange[1].getTime() }}) - AND max_time >= fromUnixTimestamp64Milli(${{ Int64: dateRange[0].getTime() }}))`; + private async partsOverlapFilter({ + databaseName, + tableName, + connectionId, + dateRange, + timestampValueExpression, + }: { + databaseName: string; + tableName: string; + connectionId: string; + dateRange?: [Date, Date]; + timestampValueExpression?: string; + }): Promise { + if (!dateRange || !timestampValueExpression) return chSql`1`; + const timeFilter = await timeFilterExpr({ + connectionId, + databaseName, + tableName, + dateRange, + dateRangeStartInclusive: true, + dateRangeEndInclusive: true, + timestampValueExpression, + metadata: this, + }); + return chSql`part_name IN ( + SELECT DISTINCT _part + FROM ${tableExpr({ database: databaseName, table: tableName })} + WHERE ${timeFilter} + )`; } async getMapKeys({ @@ -475,11 +488,13 @@ export class Metadata { connectionId, }) ).get(column); - const partsFilter = this.partsOverlapFilter( + const partsFilter = await this.partsOverlapFilter({ databaseName, tableName, + connectionId, dateRange, - ); + timestampValueExpression, + }); if (indexes?.keysIndex) { const sql = chSql` SELECT token AS key @@ -802,11 +817,13 @@ export class Metadata { ).get(column)?.itemsIndex; if (idx) { try { - const partsFilter = this.partsOverlapFilter( + const partsFilter = await this.partsOverlapFilter({ databaseName, tableName, + connectionId, dateRange, - ); + timestampValueExpression, + }); const orChain = concatChSql( ' OR ', keys.map( From c5a96e2513aded82bbfd3426447ba3d083f93789 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Mon, 1 Jun 2026 17:07:11 -0400 Subject: [PATCH 09/10] fix tests --- packages/common-utils/src/__tests__/metadata.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/common-utils/src/__tests__/metadata.test.ts b/packages/common-utils/src/__tests__/metadata.test.ts index 519d03adce..cf7ec37952 100644 --- a/packages/common-utils/src/__tests__/metadata.test.ts +++ b/packages/common-utils/src/__tests__/metadata.test.ts @@ -1492,8 +1492,7 @@ describe('Metadata', () => { const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; expect(call.query).toContain('mergeTreeTextIndex'); expect(call.query).toContain('part_name IN'); - expect(call.query).toContain('SELECT _part'); - expect(call.query).toContain('GROUP BY _part'); + expect(call.query).toContain('SELECT DISTINCT _part'); expect(call.query).not.toContain('system.parts'); expect(call.query).not.toContain('min_time'); expect(call.query).not.toContain('max_time'); From f39aa83a9131308fc9b7399cd3b312d5ea584532 Mon Sep 17 00:00:00 2001 From: Aaron Knudtson <87577305+knudtty@users.noreply.github.com> Date: Mon, 8 Jun 2026 12:02:33 -0400 Subject: [PATCH 10/10] fix: deduplicate rollups by not using MV --- .../schema/seed/00006_otel_logs_rollups.sql | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/docker/otel-collector/schema/seed/00006_otel_logs_rollups.sql b/docker/otel-collector/schema/seed/00006_otel_logs_rollups.sql index dfb448ce22..0240b0bf1b 100644 --- a/docker/otel-collector/schema/seed/00006_otel_logs_rollups.sql +++ b/docker/otel-collector/schema/seed/00006_otel_logs_rollups.sql @@ -45,30 +45,6 @@ GROUP BY ColumnIdentifier, Key, Timestamp; -- Single MV: CTE with UNION ALL across all columns, then aggregate CREATE MATERIALIZED VIEW IF NOT EXISTS ${DATABASE}.otel_logs_attr_kv_rollup_15m_mv TO ${DATABASE}.otel_logs_kv_rollup_15m AS WITH elements AS ( - SELECT - 'ResourceAttributes' AS ColumnIdentifier, - toStartOfFifteenMinutes(Timestamp) AS Timestamp, - replaceRegexpAll(entry.1, '\\[\\d+\\]', '[*]') AS Key, - CAST(entry.2 AS String) AS Value - FROM ${DATABASE}.otel_logs - ARRAY JOIN ResourceAttributes AS entry - UNION ALL - SELECT - 'LogAttributes' AS ColumnIdentifier, - toStartOfFifteenMinutes(Timestamp) AS Timestamp, - replaceRegexpAll(entry.1, '\\[\\d+\\]', '[*]') AS Key, - CAST(entry.2 AS String) AS Value - FROM ${DATABASE}.otel_logs - ARRAY JOIN LogAttributes AS entry - UNION ALL - SELECT - 'ScopeAttributes' AS ColumnIdentifier, - toStartOfFifteenMinutes(Timestamp) AS Timestamp, - replaceRegexpAll(entry.1, '\\[\\d+\\]', '[*]') AS Key, - CAST(entry.2 AS String) AS Value - FROM ${DATABASE}.otel_logs - ARRAY JOIN ScopeAttributes AS entry - UNION ALL SELECT 'NativeColumn' AS ColumnIdentifier, toStartOfFifteenMinutes(Timestamp) AS Timestamp,