diff --git a/packages/common-utils/src/__tests__/clickhouseVersion.test.ts b/packages/common-utils/src/__tests__/clickhouseVersion.test.ts index 3232cfdec6..3abe4f41bf 100644 --- a/packages/common-utils/src/__tests__/clickhouseVersion.test.ts +++ b/packages/common-utils/src/__tests__/clickhouseVersion.test.ts @@ -3,6 +3,7 @@ import { isClickHouseVersionAtLeast, parseClickHouseVersion, supportsDirectReadMap, + supportsMergeTreeTextIndex, } from '@/core/clickhouseVersion'; type ClickHouseVersionTuple = readonly [number, number, number, number]; @@ -174,3 +175,24 @@ describe('supportsDirectReadMap', () => { expect(supportsDirectReadMap(undefined)).toBe(false); }); }); + +describe('supportsMergeTreeTextIndex', () => { + it.each([ + [[26, 3, 0, 0], true], + [[26, 3, 0, 1], true], + [[26, 3, 99, 99], true], + [[26, 4, 0, 0], true], + [[27, 0, 0, 0], true], + [[26, 2, 99, 99], false], + [[26, 2, 0, 0], false], + [[26, 1, 0, 0], false], + [[25, 12, 0, 0], false], + [[24, 0, 0, 0], false], + ])('%j → %s', (version, expected) => { + expect(supportsMergeTreeTextIndex(version)).toBe(expected); + }); + + it('returns false when the version is undefined', () => { + expect(supportsMergeTreeTextIndex(undefined)).toBe(false); + }); +}); diff --git a/packages/common-utils/src/__tests__/metadata.test.ts b/packages/common-utils/src/__tests__/metadata.test.ts index bbd40781ef..cf7ec37952 100644 --- a/packages/common-utils/src/__tests__/metadata.test.ts +++ b/packages/common-utils/src/__tests__/metadata.test.ts @@ -1,5 +1,10 @@ import { ClickhouseClient } from '../clickhouse/node'; -import { Metadata, MetadataCache, parseKeyPath } from '../core/metadata'; +import { + MapColumnTextIndexes, + Metadata, + MetadataCache, + parseKeyPath, +} from '../core/metadata'; import * as renderChartConfigModule from '../core/renderChartConfig'; import { timeFilterExpr } from '../core/renderChartConfig'; import { isBuilderChartConfig } from '../guards'; @@ -44,6 +49,9 @@ const source: TSource = { beforeAll(() => { jest.spyOn(console, 'warn').mockImplementation(() => {}); jest.spyOn(console, 'error').mockImplementation(() => {}); + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); }); afterAll(() => { jest.restoreAllMocks(); @@ -875,7 +883,7 @@ describe('Metadata', () => { databaseName: 'otel', tableName: 'generic_logs', column: 'LogAttributes', - key: 'service.name', + keys: ['service.name'], connectionId: 'conn-1', }); @@ -903,7 +911,7 @@ describe('Metadata', () => { databaseName: 'otel', tableName: 'generic_logs', column: 'LogAttributes', - key: 'service.name', + keys: ['service.name'], connectionId: 'conn-1', dateRange, timestampValueExpression: 'EventTime, EventDate', @@ -940,7 +948,7 @@ describe('Metadata', () => { databaseName: 'otel', tableName: 'generic_logs', column: 'LogAttributes', - key: 'service.name', + keys: ['service.name'], connectionId: 'conn-1', timestampValueExpression: 'EventTime, EventDate', }; @@ -960,8 +968,8 @@ describe('Metadata', () => { ], }); - expect(valuesA).toEqual(['morning']); - expect(valuesB).toEqual(['afternoon']); + expect(valuesA.get('service.name')).toEqual(['morning']); + expect(valuesB.get('service.name')).toEqual(['afternoon']); }); }); @@ -1434,6 +1442,945 @@ describe('Metadata', () => { expect(result).toBeNull(); }); }); + + describe('mergeTreeTextIndex attribute lookup', () => { + const buildMetadata = () => { + const realCache = new ( + jest.requireActual('../core/metadata') as any + ).MetadataCache(); + return new Metadata(mockClickhouseClient, realCache); + }; + + const itemsOnlyEntry: MapColumnTextIndexes = { + itemsIndex: { indexName: 'idx_log_attrs_items', separator: '=' }, + }; + const keysAndItemsEntry: MapColumnTextIndexes = { + keysIndex: { indexName: 'idx_log_attrs_key' }, + itemsIndex: { indexName: 'idx_log_attrs_items', separator: '=' }, + }; + const keysOnlyEntry: MapColumnTextIndexes = { + keysIndex: { indexName: 'idx_log_attrs_key' }, + }; + + beforeEach(() => { + (mockClickhouseClient.query as jest.Mock).mockReset(); + (timeFilterExpr as jest.Mock).mockClear(); + }); + + it('getMapKeys scopes mergeTreeTextIndex via SELECT _part subquery when dateRange and timestampValueExpression are provided', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'service.name' }] }), + }); + + await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + timestampValueExpression: 'TimestampTime', + }); + + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).toContain('part_name IN'); + expect(call.query).toContain('SELECT DISTINCT _part'); + expect(call.query).not.toContain('system.parts'); + expect(call.query).not.toContain('min_time'); + expect(call.query).not.toContain('max_time'); + }); + + it('getMapKeys emits no part-narrowing predicate when timestampValueExpression is absent', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'service.name' }] }), + }); + + await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).not.toContain('part_name'); + expect(call.query).not.toContain('SELECT _part'); + }); + + it('getMapKeys prefers the keys-only mapKeys(X) text index over the items index', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name' }, { key: 'http.method' }], + }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + }); + + expect(keys).toEqual(['service.name', 'http.method']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + // keys-only path selects `token AS key` directly, no splitByChar. + expect(call.query).not.toContain('splitByChar'); + expect(Object.values(call.query_params)).toContain('idx_log_attrs_key'); + }); + + it('getMapKeys falls back to the items index when no keys-only index exists', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'service.name' }] }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + }); + + expect(keys).toEqual(['service.name']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).toContain('splitByChar'); + expect(Object.values(call.query_params)).toContain('idx_log_attrs_items'); + }); + + it('getMapKeys falls through to MV when the text index returns empty (no cache poisoning)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock) + // keysIndex query — returns empty (e.g. fresh table, no parts yet) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + // itemsIndex query — also empty + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + // MV rollup query — has data + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ Key: 'service.name' }] }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // All three queries fired (proves cache isn't poisoned by intermediate empty results). + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 3, + ); + expect(keys).toEqual(['service.name']); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[2][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_key_rollup_15m', + ); + }); + + it('getMapKeys falls through to the MV path when no text index exists', async () => { + const md = buildMetadata(); + jest.spyOn(md, 'getMapColumnTextIndexes').mockResolvedValue(new Map()); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ Key: 'service.name' }] }), + }); + + const keys = await md.getMapKeys({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect(keys).toEqual(['service.name']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(Object.values(call.query_params)).toContain('logs_key_rollup_15m'); + expect(call.query).not.toContain('mergeTreeTextIndex'); + }); + + it('getMapKeys caches text-index results distinctly by raw dateRange when no timestampValueExpression or metadataMVs is provided', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysAndItemsEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'first.range.key' }] }), + }) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ key: 'second.range.key' }] }), + }); + + const baseArgs = { + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + connectionId: 'conn-1', + }; + + const keysA = await md.getMapKeys({ + ...baseArgs, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + const keysB = await md.getMapKeys({ + ...baseArgs, + dateRange: [ + new Date('2026-05-11T18:00:00Z'), + new Date('2026-05-11T19:00:00Z'), + ], + }); + + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + expect(keysA).toEqual(['first.range.key']); + expect(keysB).toEqual(['second.range.key']); + }); + + it('getMapValues uses mergeTreeTextIndex when keys are requested and an items index exists', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { key: 'service.name', value: 'api' }, + { key: 'service.name', value: 'worker' }, + ], + }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + }); + + expect(result.get('service.name')).toEqual(['api', 'worker']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).toContain('mergeTreeTextIndex'); + expect(call.query).toContain('startsWith(token'); + }); + + it('getMapValues falls through text-index -> MV -> main-table scan', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_kv_rollup_15m', + ); + expect(result.get('service.name')).toEqual(['api']); + }); + + it('getMapValues falls through to main-table scan when text-index and MV both empty', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ value: 'api' }] }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + timestampValueExpression: 'Timestamp', + }); + + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 3, + ); + const mainScanCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[2][0]; + expect(mainScanCall.query).not.toContain('mergeTreeTextIndex'); + expect(mainScanCall.query).not.toContain('logs_kv_rollup_15m'); + expect(mainScanCall.query).toContain('SELECT DISTINCT'); + expect(result.get('service.name')).toEqual(['api']); + }); + + it('getAllKeyValues uses mergeTreeTextIndex for map keys whose column has an index, MV for the rest', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Value: 'clickhouse', + total_count: 1, + }, + ], + }), + }); + + const result = await md.getAllKeyValues({ + databaseName: 'otel', + tableName: 'generic_logs', + keyExpressions: ["LogAttributes['service.name']", 'ServiceName'], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect(result).toEqual([ + { key: "LogAttributes['service.name']", value: ['api'] }, + { key: 'ServiceName', value: ['clickhouse'] }, + ]); + + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + // OR-chain of literal `startsWith(token, 'key=')` predicates is the + // only shape KeyCondition can range-rewrite — JOINs against + // `target_keys` would force a full dictionary scan. + expect(textIndexCall.query).toContain('startsWith(token,'); + expect(textIndexCall.query).not.toContain('INNER JOIN'); + // The map key was inserted as a String parameter (`{paramN:String}`) + // alongside its `=` separator, which CH substitutes as a literal at + // parameter-binding time. + expect(Object.values(textIndexCall.query_params)).toContain( + 'service.name=', + ); + + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_kv_rollup_15m', + ); + expect(Object.values(mvCall.query_params)).toContain('NativeColumn'); + }); + + it('getAllFieldsAndValues excludes text-indexed columns from the MV query', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + // Bypass the keys-discovery composition step so this test stays + // focused on the values + MV exclusion behavior. + jest.spyOn(md, 'getMapKeys').mockResolvedValue(['service.name']); + + (mockClickhouseClient.query as jest.Mock) + // text-index OR-chain values query for LogAttributes + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + // MV query for natives + unindexed map columns + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Values: ['clickhouse'], + }, + ], + }), + }); + + const result = await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(mvCall.query).toContain('ColumnIdentifier NOT IN'); + + expect(result).toEqual( + expect.arrayContaining([ + { key: 'ServiceName', value: ['clickhouse'] }, + { key: "LogAttributes['service.name']", value: ['api'] }, + ]), + ); + }); + + it('getMapValues falls through to the main-table scan when only a keys-only index exists', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ value: 'fallback' }] }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + }); + + expect(result.get('service.name')).toEqual(['fallback']); + const call = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(call.query).not.toContain('mergeTreeTextIndex'); + }); + + it('getAllFieldsAndValues keeps keys-only columns in the MV query (excludes only items-indexed columns)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', keysOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock).mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Values: ['clickhouse'], + }, + ], + }), + }); + + await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // Only one query (the MV one) — no text-index query for a keys-only column. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 1, + ); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[0][0]; + expect(mvCall.query).not.toContain('ColumnIdentifier NOT IN'); + }); + + it('getMapValues falls through to main-table scan when items index returns empty (no cache poisoning)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + // text-index query — empty (e.g. no parts indexed yet) + .mockResolvedValueOnce({ json: () => Promise.resolve({ data: [] }) }) + // main-table scan — has data + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [{ value: 'scan-fallback' }] }), + }); + + const result = await md.getMapValues({ + databaseName: 'otel', + tableName: 'generic_logs', + column: 'LogAttributes', + keys: ['service.name'], + connectionId: 'conn-1', + }); + + // Both queries fired — proves cache isn't poisoned by the empty text-index result. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + expect(result.get('service.name')).toEqual(['scan-fallback']); + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + const scanCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[1][0]; + expect(scanCall.query).not.toContain('mergeTreeTextIndex'); + }); + + it('getAllKeyValues splits across text-index and MV when only some map columns have a text index', async () => { + const md = buildMetadata(); + // LogAttributes has an items text index; ResourceAttributes does not — + // its values must come back from the MV (the user-defined attribute + // rollup row). + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + + (mockClickhouseClient.query as jest.Mock) + // text-index call for LogAttributes + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + // MV call for the remaining keys (ResourceAttributes + native) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'ResourceAttributes', + Key: 'host.name', + Value: 'host-1', + total_count: 1, + }, + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Value: 'clickhouse', + total_count: 1, + }, + ], + }), + }); + + const result = await md.getAllKeyValues({ + databaseName: 'otel', + tableName: 'generic_logs', + keyExpressions: [ + "LogAttributes['service.name']", + "ResourceAttributes['host.name']", + 'ServiceName', + ], + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + expect(result).toEqual([ + { key: "LogAttributes['service.name']", value: ['api'] }, + { key: "ResourceAttributes['host.name']", value: ['host-1'] }, + { key: 'ServiceName', value: ['clickhouse'] }, + ]); + + // Both calls were made: text-index for LogAttributes, MV for the rest. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + expect(Object.values(mvCall.query_params)).toContain( + 'logs_kv_rollup_15m', + ); + // MV query must include the unindexed map column AND the native, but + // NOT the text-indexed column (LogAttributes is omitted from the IN list). + expect(Object.values(mvCall.query_params)).toContain( + 'ResourceAttributes', + ); + expect(Object.values(mvCall.query_params)).toContain('NativeColumn'); + expect(Object.values(mvCall.query_params)).not.toContain('LogAttributes'); + }); + + it('getAllFieldsAndValues composes getMapKeys + literal OR-chain (no INNER JOIN, no token scan)', async () => { + const md = buildMetadata(); + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + const getMapKeysSpy = jest + .spyOn(md, 'getMapKeys') + .mockResolvedValue(['service.name', 'http.method']); + + (mockClickhouseClient.query as jest.Mock) + // text-index values query + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { key: 'service.name', value: 'api' }, + { key: 'http.method', value: 'GET' }, + ], + }), + }) + // MV query (no rows for this assertion) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ data: [] }), + }); + + await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // Composition: getMapKeys called per items-indexed column. + expect(getMapKeysSpy).toHaveBeenCalledWith( + expect.objectContaining({ column: 'LogAttributes' }), + ); + + // Values query is the literal OR-chain shape, not a JOIN. + const textIndexCall = (mockClickhouseClient.query as jest.Mock).mock + .calls[0][0]; + expect(textIndexCall.query).toContain('mergeTreeTextIndex'); + expect(textIndexCall.query).toContain('startsWith(token,'); + expect(textIndexCall.query).not.toContain('INNER JOIN'); + expect(textIndexCall.query).not.toContain('target_keys'); + // Both keys appear as literal prefixes (`key=`) in query_params, so + // KeyCondition can range-rewrite each one independently. + expect(Object.values(textIndexCall.query_params)).toContain( + 'service.name=', + ); + expect(Object.values(textIndexCall.query_params)).toContain( + 'http.method=', + ); + }); + + it('getAllFieldsAndValues serves unindexed map columns from MV alongside text-indexed ones', async () => { + const md = buildMetadata(); + // LogAttributes is text-indexed; ResourceAttributes is in the MV only. + jest + .spyOn(md, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map([['LogAttributes', itemsOnlyEntry]])); + // Bypass keys discovery so we exercise just the values + MV merge. + jest.spyOn(md, 'getMapKeys').mockResolvedValue(['service.name']); + + (mockClickhouseClient.query as jest.Mock) + // text-index OR-chain values query for LogAttributes + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [{ key: 'service.name', value: 'api' }], + }), + }) + // MV call returning ResourceAttributes + native (LogAttributes excluded) + .mockResolvedValueOnce({ + json: () => + Promise.resolve({ + data: [ + { + ColumnIdentifier: 'ResourceAttributes', + Key: 'host.name', + Values: ['host-1', 'host-2'], + }, + { + ColumnIdentifier: 'NativeColumn', + Key: 'ServiceName', + Values: ['clickhouse'], + }, + ], + }), + }); + + const result = await md.getAllFieldsAndValues({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + metadataMVs: { + keyRollupTable: 'logs_key_rollup_15m', + kvRollupTable: 'logs_kv_rollup_15m', + granularity: '15 minute', + }, + dateRange: [ + new Date('2026-05-11T16:00:00Z'), + new Date('2026-05-11T17:00:00Z'), + ], + }); + + // Both queries fired. + expect((mockClickhouseClient.query as jest.Mock).mock.calls.length).toBe( + 2, + ); + const mvCall = (mockClickhouseClient.query as jest.Mock).mock.calls[1][0]; + // Only LogAttributes is excluded from the MV; ResourceAttributes stays in. + expect(mvCall.query).toContain('ColumnIdentifier NOT IN'); + expect(Object.values(mvCall.query_params)).toContain('LogAttributes'); + expect(Object.values(mvCall.query_params)).not.toContain( + 'ResourceAttributes', + ); + + expect(result).toEqual( + expect.arrayContaining([ + { key: "LogAttributes['service.name']", value: ['api'] }, + { + key: "ResourceAttributes['host.name']", + value: ['host-1', 'host-2'], + }, + { key: 'ServiceName', value: ['clickhouse'] }, + ]), + ); + }); + + it('getMapColumnTextIndexes returns empty Map on ClickHouse < 26.3', async () => { + const md = buildMetadata(); + // Bypass the global beforeAll stub so we exercise the real implementation. + const globalStub = Metadata.prototype + .getMapColumnTextIndexes as jest.Mock; + globalStub.mockRestore(); + try { + jest + .spyOn(md, 'getTableMetadata') + .mockResolvedValue({ engine: 'MergeTree' } as never); + jest + .spyOn(md, 'getServerVersion') + .mockResolvedValue([26, 2, 99, 99] as const); + const getColumnsSpy = jest.spyOn(md, 'getColumns'); + const getSkipIndicesSpy = jest.spyOn(md, 'getSkipIndices'); + + const result = await md.getMapColumnTextIndexes({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + }); + + expect(result.size).toBe(0); + // Bailed out early — no follow-up column/index queries. + expect(getColumnsSpy).not.toHaveBeenCalled(); + expect(getSkipIndicesSpy).not.toHaveBeenCalled(); + } finally { + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); + } + }); + + it('getMapColumnTextIndexes returns empty Map when base table does not exist', async () => { + const md = buildMetadata(); + const globalStub = Metadata.prototype + .getMapColumnTextIndexes as jest.Mock; + globalStub.mockRestore(); + try { + jest.spyOn(md, 'getTableMetadata').mockResolvedValue(undefined); + jest + .spyOn(md, 'getServerVersion') + .mockResolvedValue([26, 3, 0, 0] as const); + const getColumnsSpy = jest.spyOn(md, 'getColumns'); + const getSkipIndicesSpy = jest.spyOn(md, 'getSkipIndices'); + + const result = await md.getMapColumnTextIndexes({ + databaseName: 'default', + tableName: 'unused_base_table', + connectionId: 'conn-1', + }); + + expect(result.size).toBe(0); + expect(getColumnsSpy).not.toHaveBeenCalled(); + expect(getSkipIndicesSpy).not.toHaveBeenCalled(); + } finally { + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); + } + }); + + it.each([ + ['ALIAS column name', 'LogAttributeItems'], + [ + 'resolved arrayMap expression', + "arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(LogAttributes, 'Array(Tuple(String, String))'))", + ], + ])( + 'getMapColumnTextIndexes detects items index when system.data_skipping_indices.expr is %s', + async (_label, indexExpression) => { + const md = buildMetadata(); + const globalStub = Metadata.prototype + .getMapColumnTextIndexes as jest.Mock; + globalStub.mockRestore(); + try { + jest + .spyOn(md, 'getTableMetadata') + .mockResolvedValue({ engine: 'MergeTree' } as never); + jest + .spyOn(md, 'getServerVersion') + .mockResolvedValue([26, 3, 0, 0] as const); + jest.spyOn(md, 'getColumns').mockResolvedValue([ + { + name: 'LogAttributeItems', + type: 'Array(String)', + default_type: 'ALIAS', + default_expression: + "arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(LogAttributes, 'Array(Tuple(String, String))'))", + comment: '', + codec_expression: '', + ttl_expression: '', + }, + ] as never); + jest.spyOn(md, 'getSkipIndices').mockResolvedValue([ + { + name: 'idx_log_attr_items', + type: 'text', + typeFull: "text(tokenizer = 'array')", + expression: indexExpression, + granularity: 1, + }, + ] as never); + + const result = await md.getMapColumnTextIndexes({ + databaseName: 'otel', + tableName: 'generic_logs', + connectionId: 'conn-1', + }); + + expect(result.get('LogAttributes')?.itemsIndex).toEqual({ + indexName: 'idx_log_attr_items', + separator: '=', + }); + } finally { + jest + .spyOn(Metadata.prototype, 'getMapColumnTextIndexes') + .mockResolvedValue(new Map()); + } + }, + ); + }); }); describe('parseKeyPath', () => { diff --git a/packages/common-utils/src/core/clickhouseVersion.ts b/packages/common-utils/src/core/clickhouseVersion.ts index 0f781a2b10..863ac3285f 100644 --- a/packages/common-utils/src/core/clickhouseVersion.ts +++ b/packages/common-utils/src/core/clickhouseVersion.ts @@ -112,3 +112,20 @@ export function supportsDirectReadMap( if (!branchMin) return false; return compareClickHouseVersion(version, branchMin) >= 0; } + +/** + * First release that shipped the `mergeTreeTextIndex(database, table, index)` + * table function used to introspect text skip indices. + */ +const MERGE_TREE_TEXT_INDEX_MIN: ClickHouseVersion = [26, 3, 0, 0]; + +/** + * Returns true when the connected ClickHouse server supports the + * `mergeTreeTextIndex` table function (>= 26.3). Returns false when the + * version is undefined or older. + */ +export function supportsMergeTreeTextIndex( + version: ClickHouseVersion | undefined, +): boolean { + return isClickHouseVersionAtLeast(version, MERGE_TREE_TEXT_INDEX_MIN); +} diff --git a/packages/common-utils/src/core/kvItems.ts b/packages/common-utils/src/core/kvItems.ts new file mode 100644 index 0000000000..6e02da0c16 --- /dev/null +++ b/packages/common-utils/src/core/kvItems.ts @@ -0,0 +1,239 @@ +/** Describes a KV items column and its concat separator */ +export type KvItemsInfo = { + kvItemsColumn: string; + separator: string; +}; + +/** Map from map column name to its KV items info */ +export type KvItemsLookup = Map; + +/** + * Tokenizes a ClickHouse expression into meaningful tokens (identifiers, parens, + * commas, arrows, quoted strings, operators). Whitespace is skipped. + * Returns null if the expression contains unrecognized characters. + */ +function tokenizeExpression(expr: string): string[] | null { + const tokens: string[] = []; + let i = 0; + while (i < expr.length) { + // Skip whitespace + if (/\s/.test(expr[i])) { + i++; + continue; + } + // Arrow operator -> + if (expr[i] === '-' && expr[i + 1] === '>') { + tokens.push('->'); + i += 2; + continue; + } + // Cast operator :: + if (expr[i] === ':' && expr[i + 1] === ':') { + tokens.push('::'); + i += 2; + continue; + } + // Single-char tokens + if ('(),.'.includes(expr[i])) { + tokens.push(expr[i]); + i++; + continue; + } + // Quoted string (single or double) + if (expr[i] === "'" || expr[i] === '"') { + const quote = expr[i]; + let str = ''; + i++; // skip opening quote + while (i < expr.length && expr[i] !== quote) { + if (expr[i] === '\\') { + str += expr[i + 1] ?? ''; + i += 2; + } else { + str += expr[i]; + i++; + } + } + i++; // skip closing quote + tokens.push(`'${str}'`); // normalize to single-quote wrapper + continue; + } + // Identifier or keyword (word chars) + if (/\w/.test(expr[i])) { + let ident = ''; + while (i < expr.length && /\w/.test(expr[i])) { + ident += expr[i]; + i++; + } + tokens.push(ident); + continue; + } + // Unknown character — return null to signal unparseable expression + return null; + } + return tokens; +} + +/** + * Helper: parses the common arrayMap lambda prefix and concat body, returning the + * lambda variable name, separator, and remaining token position. + * Handles both parenthesized `(x) ->` and bare `x ->` lambda parameter forms. + */ +function parseArrayMapConcatPrefix( + tokens: string[], +): { lambdaVar: string; separator: string; pos: number } | undefined { + let pos = 0; + const expect = (expected: string): boolean => { + if (pos >= tokens.length || tokens[pos] !== expected) return false; + pos++; + return true; + }; + const read = (): string | undefined => tokens[pos++]; + + if (!expect('arrayMap') || !expect('(')) return undefined; + + // Lambda param: either (x) -> or x -> + let lambdaVar: string | undefined; + if (tokens[pos] === '(') { + pos++; // skip ( + lambdaVar = read(); + if (!lambdaVar || !expect(')')) return undefined; + } else { + lambdaVar = read(); + if (!lambdaVar) return undefined; + } + if (!expect('->')) return undefined; + + // concat(lambdaVar.1, '', lambdaVar.2) + if (!expect('concat') || !expect('(')) return undefined; + if (!expect(lambdaVar) || !expect('.') || !expect('1') || !expect(',')) + return undefined; + + const sepToken = read(); + if (!sepToken || !sepToken.startsWith("'") || !expect(',')) return undefined; + const separator = sepToken.slice(1, -1); + + if ( + !expect(lambdaVar) || + !expect('.') || + !expect('2') || + !expect(')') || + !expect(',') + ) + return undefined; + + return { lambdaVar, separator, pos }; +} + +/** + * Parses a KV items column's default_expression to extract the source map column name + * and the constant separator string used in the concat. + * Matches the inline-cast form: + * arrayMap((arr) -> concat(arr.1, '=', arr.2), X::Array(Tuple(String, String))) + * Also supports bare lambda param: arrayMap(x -> concat(...), ...) + * Returns { mapColumn, separator } if the expression matches, otherwise undefined. + */ +export function parseKvItemsExpression( + defaultExpression: string, +): { mapColumn: string; separator: string } | undefined { + const tokens = tokenizeExpression(defaultExpression); + if (!tokens) return undefined; + + const prefix = parseArrayMapConcatPrefix(tokens); + if (!prefix) return undefined; + + let pos = prefix.pos; + const expect = (expected: string): boolean => { + if (pos >= tokens.length || tokens[pos] !== expected) return false; + pos++; + return true; + }; + const read = (): string | undefined => tokens[pos++]; + + // X::Array(Tuple(String, String)) + const mapColumn = read(); + if (!mapColumn) return undefined; + if ( + !expect('::') || + !expect('Array') || + !expect('(') || + !expect('Tuple') || + !expect('(') || + !expect('String') || + !expect(',') || + !expect('String') || + !expect(')') || + !expect(')') || + !expect(')') + ) + return undefined; + + if (pos !== tokens.length) return undefined; + + return { mapColumn, separator: prefix.separator }; +} + +/** + * Parses a KV items column's default_expression using the CAST function form: + * arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(X, 'Array(Tuple(String, String))')) + * Also supports bare lambda param: arrayMap(x -> concat(...), ...) + * Returns { mapColumn, separator } if the expression matches, otherwise undefined. + */ +export function parseKvItemsCastExpression( + defaultExpression: string, +): { mapColumn: string; separator: string } | undefined { + const tokens = tokenizeExpression(defaultExpression); + if (!tokens) return undefined; + + const prefix = parseArrayMapConcatPrefix(tokens); + if (!prefix) return undefined; + + let pos = prefix.pos; + const expect = (expected: string): boolean => { + if (pos >= tokens.length || tokens[pos] !== expected) return false; + pos++; + return true; + }; + const read = (): string | undefined => tokens[pos++]; + + // CAST(X, 'Array(Tuple(String, String))') + if (!expect('CAST') || !expect('(')) return undefined; + const mapColumn = read(); + if (!mapColumn || !expect(',')) return undefined; + + // The type argument is a quoted string like 'Array(Tuple(String, String))' + const typeToken = read(); + if (!typeToken || !typeToken.startsWith("'")) return undefined; + const typeStr = typeToken.slice(1, -1); // strip quotes + const normalizedType = typeStr.replace(/\s+/g, ''); + if (normalizedType !== 'Array(Tuple(String,String))') return undefined; + + if (!expect(')') || !expect(')')) return undefined; + + if (pos !== tokens.length) return undefined; + + return { mapColumn, separator: prefix.separator }; +} + +/** + * Ordered list of strategies tried by callers when parsing a KV items column's + * default_expression. To add support for another shape, append a new function + * with the same signature. + */ +export const KV_ITEMS_STRATEGIES = [ + parseKvItemsExpression, + parseKvItemsCastExpression, +] as const; + +/** + * Tries each parsing strategy in order and returns the first successful match, + * or undefined when no strategy recognises the expression. + */ +export function parseKvItemsDefaultExpression( + defaultExpression: string, +): { mapColumn: string; separator: string } | undefined { + for (const strategy of KV_ITEMS_STRATEGIES) { + const parsed = strategy(defaultExpression); + if (parsed) return parsed; + } + return undefined; +} diff --git a/packages/common-utils/src/core/metadata.ts b/packages/common-utils/src/core/metadata.ts index a80feb1fad..e0691aba05 100644 --- a/packages/common-utils/src/core/metadata.ts +++ b/packages/common-utils/src/core/metadata.ts @@ -21,7 +21,12 @@ import type { } from '@/types'; import { isLogSource, isTraceSource, SourceKind } from '@/types'; -import { ClickHouseVersion, parseClickHouseVersion } from './clickhouseVersion'; +import { + ClickHouseVersion, + parseClickHouseVersion, + supportsMergeTreeTextIndex, +} from './clickhouseVersion'; +import { parseKvItemsDefaultExpression } from './kvItems'; import { optimizeGetKeyValuesCalls, renderStartOfBucketExpr, @@ -30,6 +35,7 @@ import { getAlignedDateRange, getDistributedTableArgs, objectHash, + parseTokenizerFromTextIndex, } from './utils'; // If filters initially are taking too long to load, decrease this number. @@ -127,6 +133,11 @@ export type SkipIndexMetadata = { granularity: number; }; +export type MapColumnTextIndexes = { + keysIndex?: { indexName: string }; + itemsIndex?: { indexName: string; separator: string }; +}; + export class Metadata { private readonly clickhouseClient: BaseClickhouseClient; private readonly cache: MetadataCache; @@ -341,6 +352,91 @@ export class Metadata { })[0]; } + async getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }: TableConnection): Promise> { + return this.cache.getOrFetch( + `${connectionId}.${databaseName}.${tableName}.mapColumnTextIndexes`, + async () => { + const [tableMeta, version] = await Promise.all([ + this.getTableMetadata({ databaseName, tableName, connectionId }), + this.getServerVersion({ connectionId }), + ]); + if (!supportsMergeTreeTextIndex(version)) return new Map(); + if (!tableMeta) return new Map(); + if (tableMeta.engine === 'Distributed') return new Map(); + + const [columns, indices] = await Promise.all([ + this.getColumns({ databaseName, tableName, connectionId }), + this.getSkipIndices({ databaseName, tableName, connectionId }), + ]); + const norm = (s: string) => s.replace(/\s+/g, '').replace(/`/g, ''); + const lookup = new Map(); + const patch = (col: string, p: MapColumnTextIndexes) => + lookup.set(col, { ...lookup.get(col), ...p }); + + for (const i of indices) { + if (i.type !== 'text') continue; + if (parseTokenizerFromTextIndex(i)?.type !== 'array') continue; + const expr = norm(i.expression); + + const mapKeysMatch = expr.match(/^mapKeys\((.+)\)$/); + if (mapKeysMatch) { + patch(mapKeysMatch[1], { keysIndex: { indexName: i.name } }); + continue; + } + const alias = columns.find( + c => + (c.default_type === 'ALIAS' || + c.default_type === 'MATERIALIZED') && + !!c.default_expression && + (norm(c.name) === expr || norm(c.default_expression) === expr), + ); + const parsed = + alias && parseKvItemsDefaultExpression(alias.default_expression); + if (parsed) + patch(parsed.mapColumn, { + itemsIndex: { indexName: i.name, separator: parsed.separator }, + }); + } + return lookup; + }, + ); + } + + private async partsOverlapFilter({ + databaseName, + tableName, + connectionId, + dateRange, + timestampValueExpression, + }: { + databaseName: string; + tableName: string; + connectionId: string; + dateRange?: [Date, Date]; + timestampValueExpression?: string; + }): Promise { + if (!dateRange || !timestampValueExpression) return chSql`1`; + const timeFilter = await timeFilterExpr({ + connectionId, + databaseName, + tableName, + dateRange, + dateRangeStartInclusive: true, + dateRangeEndInclusive: true, + timestampValueExpression, + metadata: this, + }); + return chSql`part_name IN ( + SELECT DISTINCT _part + FROM ${tableExpr({ database: databaseName, table: tableName })} + WHERE ${timeFilter} + )`; + } + async getMapKeys({ databaseName, tableName, @@ -362,74 +458,130 @@ export class Metadata { dateRange?: [Date, Date]; timestampValueExpression?: string; }) { - // Align date range to rollup granularity for consistent cache keys const alignedDateRange = metadataMVs && dateRange ? getAlignedDateRange(dateRange, metadataMVs.granularity) : undefined; - const dateRangeCacheSuffix = - dateRange && timestampValueExpression - ? `${dateRange[0].getTime()}-${dateRange[1].getTime()}-${timestampValueExpression}` - : ''; + const dateRangeCacheSuffix = dateRange + ? `${dateRange[0].getTime()}-${dateRange[1].getTime()}${ + timestampValueExpression ? `-${timestampValueExpression}` : '' + }` + : ''; const cacheKey = metricName ? `${connectionId}.${databaseName}.${tableName}.${column}.${metricName}.${dateRangeCacheSuffix}.keys` - : metadataMVs && alignedDateRange - ? `${connectionId}.${databaseName}.${tableName}.${column}.${alignedDateRange[0].getTime()}.${alignedDateRange[1].getTime()}.keys` - : `${connectionId}.${databaseName}.${tableName}.${column}.${dateRangeCacheSuffix}.keys`; + : `${connectionId}.${databaseName}.${tableName}.${column}.${dateRangeCacheSuffix}.keys`; const cachedKeys = this.cache.get(cacheKey); if (cachedKeys != null) { return cachedKeys; } + // Prefer mergeTreeTextIndex when a covering text(tokenizer=array) index + // exists. Try the keys-only index (`mapKeys(X)`) first — each token IS a + // key, no string splitting required. Fall through to the items ALIAS + // column index (`XItems`) which carries `key${sep}value` tokens. + const indexes = ( + await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }) + ).get(column); + const partsFilter = await this.partsOverlapFilter({ + databaseName, + tableName, + connectionId, + dateRange, + timestampValueExpression, + }); + if (indexes?.keysIndex) { + const sql = chSql` + SELECT token AS key + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexes.keysIndex.indexName }}) + WHERE ${partsFilter} + GROUP BY key HAVING key != '' + LIMIT ${{ Int32: maxKeys }}`; + const keys = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: this.getClickHouseSettings(), + }) + .then(r => r.json<{ key: string }>()) + .then(d => d.data.map(r => r.key).filter(Boolean)); + if (keys.length > 0) { + this.cache.set(cacheKey, keys); + return keys; + } + } + if (indexes?.itemsIndex) { + const sql = chSql` + SELECT splitByChar(${{ String: indexes.itemsIndex.separator }}, token)[1] AS key + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: indexes.itemsIndex.indexName }}) + WHERE ${partsFilter} + GROUP BY key HAVING key != '' + LIMIT ${{ Int32: maxKeys }}`; + const keys = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: this.getClickHouseSettings(), + }) + .then(r => r.json<{ key: string }>()) + .then(d => d.data.map(r => r.key).filter(Boolean)); + if (keys.length > 0) { + this.cache.set(cacheKey, keys); + return keys; + } + } + // Rollup path: query the key rollup table filtered by ColumnIdentifier and date range if (metadataMVs && alignedDateRange) { - const rollupKeys = await this.cache.getOrFetch( - cacheKey, - async () => { - try { - const startExpr = renderStartOfBucketExpr( - metadataMVs.granularity, - chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[0].getTime() }})`, - ); - const endExpr = renderStartOfBucketExpr( - metadataMVs.granularity, - chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[1].getTime() }})`, - ); - const timeFilter = chSql`AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; - const sql = chSql` - SELECT Key - FROM ${tableExpr({ database: databaseName, table: metadataMVs.keyRollupTable })} - WHERE ColumnIdentifier = ${{ String: column }} - ${timeFilter} - GROUP BY Key - ORDER BY sum(count) DESC - LIMIT ${{ Int32: maxKeys }} - `; - - return await this.clickhouseClient - .query<'JSON'>({ - query: sql.sql, - query_params: sql.params, - connectionId, - clickhouse_settings: { - ...this.getClickHouseSettings(), - timeout_overflow_mode: 'break', - max_execution_time: 15, - max_rows_to_read: '0', - }, - }) - .then(res => res.json<{ Key: string }>()) - .then(d => d.data.map(row => row.Key).filter(k => k)); - } catch (e) { - console.warn('getMapKeys rollup query failed', e); - return []; - } - }, - ); + let rollupKeys: string[] = []; + try { + const startExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[0].getTime() }})`, + ); + const endExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[1].getTime() }})`, + ); + const timeFilter = chSql`AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; + const sql = chSql` + SELECT Key + FROM ${tableExpr({ database: databaseName, table: metadataMVs.keyRollupTable })} + WHERE ColumnIdentifier = ${{ String: column }} + ${timeFilter} + GROUP BY Key + ORDER BY sum(count) DESC + LIMIT ${{ Int32: maxKeys }} + `; + rollupKeys = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: { + ...this.getClickHouseSettings(), + timeout_overflow_mode: 'break', + max_execution_time: 15, + max_rows_to_read: '0', + }, + }) + .then(res => res.json<{ Key: string }>()) + .then(d => d.data.map(row => row.Key).filter(k => k)); + } catch (e) { + console.warn('getMapKeys rollup query failed', e); + } - if (rollupKeys.length > 0) return rollupKeys; + if (rollupKeys.length > 0) { + this.cache.set(cacheKey, rollupKeys); + return rollupKeys; + } } // Original path: scan main table @@ -635,31 +787,140 @@ export class Metadata { databaseName, tableName, column, - key, + keys, maxValues = 20, connectionId, + metadataMVs, dateRange, timestampValueExpression, + signal, }: { databaseName: string; tableName: string; column: string; - key?: string; + keys: string[]; maxValues?: number; + metadataMVs?: MetadataMaterializedViews; dateRange?: [Date, Date]; timestampValueExpression?: string; connectionId: string; - }) { - const dateRangeCacheSuffix = - dateRange && timestampValueExpression - ? `${dateRange[0].getTime()}-${dateRange[1].getTime()}-${timestampValueExpression}` - : ''; - const cacheKey = `${connectionId}.${databaseName}.${tableName}.${column}.${key}.${dateRangeCacheSuffix}.values`; + signal?: AbortSignal; + }): Promise> { + if (keys.length === 0) return new Map(); - const cachedValues = this.cache.get(cacheKey); + const idx = ( + await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }) + ).get(column)?.itemsIndex; + if (idx) { + try { + const partsFilter = await this.partsOverlapFilter({ + databaseName, + tableName, + connectionId, + dateRange, + timestampValueExpression, + }); + const orChain = concatChSql( + ' OR ', + keys.map( + k => + chSql`startsWith(token, ${{ String: `${k}${idx.separator}` }})`, + ), + ); + const sql = chSql` + SELECT substring(token, 1, position(token, ${{ String: idx.separator }}) - 1) AS key, + substring(token, position(token, ${{ String: idx.separator }}) + ${{ Int32: idx.separator.length }}) AS value + FROM mergeTreeTextIndex(${{ String: databaseName }}, ${{ String: tableName }}, ${{ String: idx.indexName }}) + WHERE ${partsFilter} + AND (${orChain}) + GROUP BY key, value HAVING value != '' + LIMIT ${{ Int32: maxValues }} BY key`; + const rows = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: this.getClickHouseSettings(), + abort_signal: signal, + }) + .then(r => r.json<{ key: string; value: string }>()) + .then(d => d.data); + if (rows.length > 0) { + const result = new Map(); + for (const row of rows) { + if (!row.value) continue; + const arr = result.get(row.key) ?? []; + arr.push(row.value); + result.set(row.key, arr); + } + return result; + } + } catch (e) { + console.warn('getMapValues text-index query failed', e); + } + } - if (cachedValues != null) { - return cachedValues; + if (metadataMVs && dateRange) { + try { + const alignedDateRange = getAlignedDateRange( + dateRange, + metadataMVs.granularity, + ); + const startExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[0].getTime() }})`, + ); + const endExpr = renderStartOfBucketExpr( + metadataMVs.granularity, + chSql`fromUnixTimestamp64Milli(${{ Int64: alignedDateRange[1].getTime() }})`, + ); + const keyList = concatChSql( + ',', + keys.map(k => chSql`${{ String: k }}`), + ); + const sql = chSql` + SELECT Key AS key, Value AS value + FROM ${tableExpr({ database: databaseName, table: metadataMVs.kvRollupTable })} + WHERE ColumnIdentifier = ${{ String: column }} + AND Key IN (${keyList}) + AND Value != '' + AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr} + GROUP BY Key, Value + ORDER BY sum(count) DESC + LIMIT ${{ Int32: maxValues }} BY Key + `; + const rows = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: { + ...this.getClickHouseSettings(), + timeout_overflow_mode: 'break', + max_execution_time: 15, + max_rows_to_read: '0', + }, + abort_signal: signal, + }) + .then(r => r.json<{ key: string; value: string }>()) + .then(d => d.data); + if (rows.length > 0) { + const result = new Map(); + for (const row of rows) { + if (!row.value) continue; + const arr = result.get(row.key) ?? []; + arr.push(row.value); + result.set(row.key, arr); + } + return result; + } + } catch (e) { + console.warn('getMapValues MV rollup query failed', e); + } } const timeFilterCondition = @@ -675,55 +936,43 @@ export class Metadata { metadata: this, }) : null; - // `value != ''` stays first so existing behavior is preserved; source filters - // and time filter are appended via AND when provided. const whereConditions: ChSql[] = [ chSql`value != ''`, ...(timeFilterCondition ? [timeFilterCondition] : []), ]; const where = chSql`WHERE ${concatChSql(' AND ', ...whereConditions)}`; - const sql = key - ? chSql` - SELECT DISTINCT ${{ - Identifier: column, - }}[${{ String: key }}] as value - FROM ${tableExpr({ database: databaseName, table: tableName })} - ${where} - LIMIT ${{ - Int32: maxValues, - }} - ` - : chSql` - SELECT DISTINCT ${{ - Identifier: column, - }} as value - FROM ${tableExpr({ database: databaseName, table: tableName })} - ${where} - LIMIT ${{ - Int32: maxValues, - }} - `; + const settings = { + max_rows_to_read: String( + this.getClickHouseSettings().max_rows_to_read ?? + DEFAULT_METADATA_MAX_ROWS_TO_READ, + ), + read_overflow_mode: 'break' as const, + ...this.getClickHouseSettings(), + }; - return this.cache.getOrFetch(cacheKey, async () => { - const values = await this.clickhouseClient - .query<'JSON'>({ - query: sql.sql, - query_params: sql.params, - connectionId, - clickhouse_settings: { - max_rows_to_read: String( - this.getClickHouseSettings().max_rows_to_read ?? - DEFAULT_METADATA_MAX_ROWS_TO_READ, - ), - read_overflow_mode: 'break', - ...this.getClickHouseSettings(), - }, - }) - .then(res => res.json<{ value: string }>()) - .then(d => d.data.map(row => row.value)); - return values; - }); + const result = new Map(); + await Promise.all( + keys.map(async k => { + const sql = chSql` + SELECT DISTINCT ${{ Identifier: column }}[${{ String: k }}] AS value + FROM ${tableExpr({ database: databaseName, table: tableName })} + ${where} + LIMIT ${{ Int32: maxValues }}`; + const values = await this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: settings, + abort_signal: signal, + }) + .then(res => res.json<{ value: string }>()) + .then(d => d.data.map(row => row.value).filter(Boolean)); + if (values.length > 0) result.set(k, values); + }), + ); + return result; } async getAllFields({ @@ -1361,6 +1610,121 @@ export class Metadata { }; }); + // Batch-query mergeTreeTextIndex for any map keys whose column has a + // covering text(tokenizer=array) index; remaining keys (native columns, + // unindexed map columns) fall through to the MV/scan path below. + const textIndexResults = new Map(); + const textIndexes = await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }); + const keysByColumn = new Map(); + for (const p of parsed) { + if (!p.mapKey || !textIndexes.get(p.column)?.itemsIndex) continue; + const arr = keysByColumn.get(p.column) ?? []; + arr.push(p.mapKey); + keysByColumn.set(p.column, arr); + } + await Promise.all( + [...keysByColumn.entries()].map(async ([column, mapKeys]) => { + const valuesByKey = await this.getMapValues({ + databaseName, + tableName, + column, + keys: mapKeys, + maxValues: maxValuesPerKey, + connectionId, + dateRange, + timestampValueExpression, + signal, + }); + for (const p of parsed) { + if (p.column !== column || !p.mapKey) continue; + const values = valuesByKey.get(p.mapKey); + if (values?.length) textIndexResults.set(p.keyExpression, values); + } + }), + ); + + const remaining = parsed.filter( + p => !textIndexResults.has(p.keyExpression), + ); + if (remaining.length === 0) { + return keyExpressions.map(k => ({ + key: k, + value: textIndexResults.get(k) ?? [], + })); + } + const mergeResults = (partial: { key: string; value: string[] }[]) => { + const m = new Map(partial.map(r => [r.key, r.value])); + return keyExpressions.map(k => ({ + key: k, + value: textIndexResults.get(k) ?? m.get(k) ?? [], + })); + }; + + // Per-key fallback when neither text-index nor MV served a key. For map + // keys, delegate to getMapValues; for native columns (no mapKey), scan + // the source column directly. + const fetchPerKeyFallback = async ( + p: (typeof parsed)[number], + ): Promise => { + if (p.mapKey) { + const m = await this.getMapValues({ + databaseName, + tableName, + column: p.column, + keys: [p.mapKey], + maxValues: maxValuesPerKey, + connectionId, + dateRange, + timestampValueExpression, + }); + return m.get(p.mapKey) || []; + } + const timeFilterCondition = + dateRange && timestampValueExpression + ? await timeFilterExpr({ + connectionId, + databaseName, + tableName, + dateRange, + dateRangeStartInclusive: true, + dateRangeEndInclusive: true, + timestampValueExpression, + metadata: this, + }) + : null; + const where = chSql`WHERE ${concatChSql( + ' AND ', + chSql`value != ''`, + ...(timeFilterCondition ? [timeFilterCondition] : []), + )}`; + const sql = chSql` + SELECT DISTINCT ${{ Identifier: p.column }} AS value + FROM ${tableExpr({ database: databaseName, table: tableName })} + ${where} + LIMIT ${{ Int32: maxValuesPerKey }}`; + return this.clickhouseClient + .query<'JSON'>({ + query: sql.sql, + query_params: sql.params, + connectionId, + clickhouse_settings: { + max_rows_to_read: String( + this.getClickHouseSettings().max_rows_to_read ?? + DEFAULT_METADATA_MAX_ROWS_TO_READ, + ), + read_overflow_mode: 'break', + ...this.getClickHouseSettings(), + }, + abort_signal: signal, + }) + .then(res => res.json<{ value: string }>()) + .then(d => d.data.map(row => row.value).filter(Boolean)); + }; + // Try rollup table first when available if (metadataMVs && dateRange) { const alignedDateRange = getAlignedDateRange( @@ -1378,7 +1742,7 @@ export class Metadata { ); const timeFilter = chSql`AND Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; - const sortedKeyIds = parsed + const sortedKeyIds = remaining .map(p => `${p.rollupColumn}:${p.rollupKey}`) .sort() .join(','); @@ -1386,7 +1750,7 @@ export class Metadata { const tupleParams = concatChSql( ',', - parsed.map( + remaining.map( p => chSql`(${{ String: p.rollupColumn }}, ${{ String: p.rollupKey }})`, ), @@ -1448,44 +1812,30 @@ export class Metadata { } // Build results, falling back to getMapValues for keys with no rollup data - return Promise.all( - parsed.map(async p => { + const partial = await Promise.all( + remaining.map(async p => { const mapKey = `${p.rollupColumn}:${p.rollupKey}`; const values = resultMap.get(mapKey); if (values && values.length > 0) { return { key: p.keyExpression, value: values }; } - const fallback = await this.getMapValues({ - databaseName, - tableName, - column: p.column, - key: p.mapKey, - maxValues: maxValuesPerKey, - connectionId, - dateRange, - timestampValueExpression, - }); - return { key: p.keyExpression, value: fallback }; + return { + key: p.keyExpression, + value: await fetchPerKeyFallback(p), + }; }), ); + return mergeResults(partial); } // No rollup available — fall back to main table scan for all keys - return Promise.all( - parsed.map(async p => { - const value = await this.getMapValues({ - databaseName, - tableName, - column: p.column, - key: p.mapKey, - maxValues: maxValuesPerKey, - connectionId, - dateRange, - timestampValueExpression, - }); - return { key: p.keyExpression, value }; - }), + const partial = await Promise.all( + remaining.map(async p => ({ + key: p.keyExpression, + value: await fetchPerKeyFallback(p), + })), ); + return mergeResults(partial); } /** @@ -1528,7 +1878,65 @@ export class Metadata { ); const timeFilter = chSql`Timestamp >= ${startExpr} AND Timestamp <= ${endExpr}`; - const cacheKey = `${connectionId}.${databaseName}.${tableName}.${alignedDateRange[0].getTime()}.${alignedDateRange[1].getTime()}.fieldsAndValues.${maxValuesPerKey}.${maxKeys ?? 'all'}`; + // Map columns with an items text index are served directly from + // mergeTreeTextIndex and excluded from the MV query below to avoid + // double-counting their entries. Map columns that only have a keys-only + // index don't carry values, so they stay in the MV path. + const textIndexes = await this.getMapColumnTextIndexes({ + databaseName, + tableName, + connectionId, + }); + const itemsIndexed = [...textIndexes.entries()].flatMap(([col, info]) => + info.itemsIndex ? [[col, info.itemsIndex] as const] : [], + ); + const textIndexResults: { key: string; value: string[] }[] = []; + if (itemsIndexed.length > 0) { + const cacheKey = `${connectionId}.${databaseName}.${tableName}.${dateRange[0].getTime()}.${dateRange[1].getTime()}.fieldsAndValues.textIndex.${maxValuesPerKey}.${maxKeys ?? 'all'}`; + const indexed = await this.cache.getOrFetch(cacheKey, async () => { + const perColumn = await Promise.all( + itemsIndexed.map(async ([column]) => { + const sampledKeys = await this.getMapKeys({ + databaseName, + tableName, + column, + maxKeys: maxKeys ?? DEFAULT_MAX_KEYS, + connectionId, + dateRange, + }); + if (sampledKeys.length === 0) return []; + const valuesByKey = await this.getMapValues({ + databaseName, + tableName, + column, + keys: sampledKeys, + maxValues: maxValuesPerKey, + connectionId, + metadataMVs, + dateRange, + signal, + }); + return [...valuesByKey.entries()].map(([key, values]) => ({ + key: `${column}['${key}']`, + value: values, + })); + }), + ); + return perColumn.flat(); + }); + textIndexResults.push(...indexed); + } + + const excludedIds = itemsIndexed.map(([col]) => col); + const excludeClause = + excludedIds.length > 0 + ? chSql`AND ColumnIdentifier NOT IN (${concatChSql( + ',', + excludedIds.map(c => chSql`${{ String: c }}`), + )})` + : chSql``; + + const cacheKey = `${connectionId}.${databaseName}.${tableName}.${alignedDateRange[0].getTime()}.${alignedDateRange[1].getTime()}.fieldsAndValues.${maxValuesPerKey}.${maxKeys ?? 'all'}.excl_${excludedIds.sort().join(',')}`; type RollupRow = { ColumnIdentifier: string; @@ -1545,6 +1953,7 @@ export class Metadata { FROM ${tableExpr({ database: databaseName, table: metadataMVs.kvRollupTable })} WHERE Value != '' AND ${timeFilter} + ${excludeClause} GROUP BY ColumnIdentifier, Key ORDER BY ColumnIdentifier = 'NativeColumn' DESC, ColumnIdentifier, Key ${limitClause} @@ -1567,13 +1976,15 @@ export class Metadata { .then(d => d.data); }); - return rows.map(row => { + const mvResults = rows.map(row => { const keyExpr = row.ColumnIdentifier === 'NativeColumn' ? row.Key : `${row.ColumnIdentifier}['${row.Key}']`; return { key: keyExpr, value: row.Values }; }); + + return [...mvResults, ...textIndexResults]; } async getKeyValues({ diff --git a/packages/common-utils/src/queryParser.ts b/packages/common-utils/src/queryParser.ts index fb195d016b..9d04d6c18f 100644 --- a/packages/common-utils/src/queryParser.ts +++ b/packages/common-utils/src/queryParser.ts @@ -10,6 +10,17 @@ import { JSDataType, } from '@/clickhouse'; import { supportsDirectReadMap } from '@/core/clickhouseVersion'; +import { + KV_ITEMS_STRATEGIES, + KvItemsInfo, + KvItemsLookup, +} from '@/core/kvItems'; + +export type { KvItemsInfo, KvItemsLookup } from '@/core/kvItems'; +export { + parseKvItemsCastExpression, + parseKvItemsExpression, +} from '@/core/kvItems'; import { Metadata, parseKeyPath, @@ -857,228 +868,6 @@ function renderArrayFieldExpression({ ); } -/** Describes a KV items column and its concat separator */ -export type KvItemsInfo = { - kvItemsColumn: string; - separator: string; -}; - -/** Map from map column name to its KV items info */ -export type KvItemsLookup = Map; - -/** - * Tokenizes a ClickHouse expression into meaningful tokens (identifiers, parens, - * commas, arrows, quoted strings, operators). Whitespace is skipped. - * Returns null if the expression contains unrecognized characters. - */ -function tokenizeExpression(expr: string): string[] | null { - const tokens: string[] = []; - let i = 0; - while (i < expr.length) { - // Skip whitespace - if (/\s/.test(expr[i])) { - i++; - continue; - } - // Arrow operator -> - if (expr[i] === '-' && expr[i + 1] === '>') { - tokens.push('->'); - i += 2; - continue; - } - // Cast operator :: - if (expr[i] === ':' && expr[i + 1] === ':') { - tokens.push('::'); - i += 2; - continue; - } - // Single-char tokens - if ('(),.'.includes(expr[i])) { - tokens.push(expr[i]); - i++; - continue; - } - // Quoted string (single or double) - if (expr[i] === "'" || expr[i] === '"') { - const quote = expr[i]; - let str = ''; - i++; // skip opening quote - while (i < expr.length && expr[i] !== quote) { - if (expr[i] === '\\') { - str += expr[i + 1] ?? ''; - i += 2; - } else { - str += expr[i]; - i++; - } - } - i++; // skip closing quote - tokens.push(`'${str}'`); // normalize to single-quote wrapper - continue; - } - // Identifier or keyword (word chars) - if (/\w/.test(expr[i])) { - let ident = ''; - while (i < expr.length && /\w/.test(expr[i])) { - ident += expr[i]; - i++; - } - tokens.push(ident); - continue; - } - // Unknown character — return null to signal unparseable expression - return null; - } - return tokens; -} - -/** - * Helper: parses the common arrayMap lambda prefix and concat body, returning the - * lambda variable name, separator, and remaining token position. - * Handles both parenthesized `(x) ->` and bare `x ->` lambda parameter forms. - */ -function parseArrayMapConcatPrefix( - tokens: string[], -): { lambdaVar: string; separator: string; pos: number } | undefined { - let pos = 0; - const expect = (expected: string): boolean => { - if (pos >= tokens.length || tokens[pos] !== expected) return false; - pos++; - return true; - }; - const read = (): string | undefined => tokens[pos++]; - - if (!expect('arrayMap') || !expect('(')) return undefined; - - // Lambda param: either (x) -> or x -> - let lambdaVar: string | undefined; - if (tokens[pos] === '(') { - pos++; // skip ( - lambdaVar = read(); - if (!lambdaVar || !expect(')')) return undefined; - } else { - lambdaVar = read(); - if (!lambdaVar) return undefined; - } - if (!expect('->')) return undefined; - - // concat(lambdaVar.1, '', lambdaVar.2) - if (!expect('concat') || !expect('(')) return undefined; - if (!expect(lambdaVar) || !expect('.') || !expect('1') || !expect(',')) - return undefined; - - const sepToken = read(); - if (!sepToken || !sepToken.startsWith("'") || !expect(',')) return undefined; - const separator = sepToken.slice(1, -1); - - if ( - !expect(lambdaVar) || - !expect('.') || - !expect('2') || - !expect(')') || - !expect(',') - ) - return undefined; - - return { lambdaVar, separator, pos }; -} - -/** - * Parses a KV items column's default_expression to extract the source map column name - * and the constant separator string used in the concat. - * Matches the inline-cast form: - * arrayMap((arr) -> concat(arr.1, '=', arr.2), X::Array(Tuple(String, String))) - * Also supports bare lambda param: arrayMap(x -> concat(...), ...) - * Returns { mapColumn, separator } if the expression matches, otherwise undefined. - */ -export function parseKvItemsExpression( - defaultExpression: string, -): { mapColumn: string; separator: string } | undefined { - const tokens = tokenizeExpression(defaultExpression); - if (!tokens) return undefined; - - const prefix = parseArrayMapConcatPrefix(tokens); - if (!prefix) return undefined; - - let pos = prefix.pos; - const expect = (expected: string): boolean => { - if (pos >= tokens.length || tokens[pos] !== expected) return false; - pos++; - return true; - }; - const read = (): string | undefined => tokens[pos++]; - - // X::Array(Tuple(String, String)) - const mapColumn = read(); - if (!mapColumn) return undefined; - if ( - !expect('::') || - !expect('Array') || - !expect('(') || - !expect('Tuple') || - !expect('(') || - !expect('String') || - !expect(',') || - !expect('String') || - !expect(')') || - !expect(')') || - !expect(')') - ) - return undefined; - - if (pos !== tokens.length) return undefined; - - return { mapColumn, separator: prefix.separator }; -} - -/** - * Parses a KV items column's default_expression using the CAST function form: - * arrayMap((arr) -> concat(arr.1, '=', arr.2), CAST(X, 'Array(Tuple(String, String))')) - * Also supports bare lambda param: arrayMap(x -> concat(...), ...) - * Returns { mapColumn, separator } if the expression matches, otherwise undefined. - */ -export function parseKvItemsCastExpression( - defaultExpression: string, -): { mapColumn: string; separator: string } | undefined { - const tokens = tokenizeExpression(defaultExpression); - if (!tokens) return undefined; - - const prefix = parseArrayMapConcatPrefix(tokens); - if (!prefix) return undefined; - - let pos = prefix.pos; - const expect = (expected: string): boolean => { - if (pos >= tokens.length || tokens[pos] !== expected) return false; - pos++; - return true; - }; - const read = (): string | undefined => tokens[pos++]; - - // CAST(X, 'Array(Tuple(String, String))') - if (!expect('CAST') || !expect('(')) return undefined; - const mapColumn = read(); - if (!mapColumn || !expect(',')) return undefined; - - // The type argument is a quoted string like 'Array(Tuple(String, String))' - const typeToken = read(); - if (!typeToken || !typeToken.startsWith("'")) return undefined; - const typeStr = typeToken.slice(1, -1); // strip quotes - const normalizedType = typeStr.replace(/\s+/g, ''); - if (normalizedType !== 'Array(Tuple(String,String))') return undefined; - - if (!expect(')') || !expect(')')) return undefined; - - if (pos !== tokens.length) return undefined; - - return { mapColumn, separator: prefix.separator }; -} - -// To add another known KV items parsing strategy, simply define another function with the same signature and add the strategy to this array -const KV_ITEMS_STRATEGIES = [ - parseKvItemsExpression, - parseKvItemsCastExpression, -] as const; - export class CustomSchemaSQLSerializerV2 extends SQLSerializer { private metadata: Metadata; private tableName: string;