diff --git a/packages/server/api/src/app/flows/flow/flow.service.ts b/packages/server/api/src/app/flows/flow/flow.service.ts index 87584b58d6..88c373034d 100644 --- a/packages/server/api/src/app/flows/flow/flow.service.ts +++ b/packages/server/api/src/app/flows/flow/flow.service.ts @@ -145,6 +145,11 @@ export const flowService = { columnName: 'fv.updated', columnType: 'timestamp with time zone', }, + customPaginationSecondaryColumn: { + columnPath: 'id', + columnName: 'flow.id', + columnType: 'string', + }, }); const queryWhere: Record = { diff --git a/packages/server/api/src/app/helper/pagination/build-paginator.ts b/packages/server/api/src/app/helper/pagination/build-paginator.ts index 5a7c364ac1..da8d9fe7f0 100644 --- a/packages/server/api/src/app/helper/pagination/build-paginator.ts +++ b/packages/server/api/src/app/helper/pagination/build-paginator.ts @@ -8,16 +8,28 @@ export type PagingQuery = { order?: Order | 'ASC' | 'DESC'; }; +type CustomPaginationColumnOptions = { + columnPath: string; + columnName: string; + columnType?: string; +}; + +// Secondary custom pagination is only valid when primary custom pagination is configured. +type CustomPaginationColumns = + | { + customPaginationColumn?: never; + customPaginationSecondaryColumn?: never; + } + | { + customPaginationColumn: CustomPaginationColumnOptions; + customPaginationSecondaryColumn?: CustomPaginationColumnOptions; + }; + export type PaginationOptions = { entity: EntitySchema; alias?: string; query?: PagingQuery; - customPaginationColumn?: { - columnPath: string; - columnName: string; - columnType?: string; - }; -}; +} & CustomPaginationColumns; export function buildPaginator( options: PaginationOptions, @@ -32,6 +44,15 @@ export function buildPaginator( paginator.setAlias(alias); + if ( + options.customPaginationSecondaryColumn && + !options.customPaginationColumn + ) { + throw new Error( + 'customPaginationSecondaryColumn requires customPaginationColumn', + ); + } + if (options.customPaginationColumn) { paginator.setPaginationColumn( options.customPaginationColumn.columnPath, @@ -40,6 +61,14 @@ export function buildPaginator( ); } + if (options.customPaginationSecondaryColumn) { + paginator.setPaginationSecondaryColumn( + options.customPaginationSecondaryColumn.columnPath, + options.customPaginationSecondaryColumn.columnName, + options.customPaginationSecondaryColumn.columnType, + ); + } + if (query.afterCursor) { paginator.setAfterCursor(query.afterCursor); } diff --git a/packages/server/api/src/app/helper/pagination/paginator.ts b/packages/server/api/src/app/helper/pagination/paginator.ts index 2f43fc285a..3e2999b738 100644 --- a/packages/server/api/src/app/helper/pagination/paginator.ts +++ b/packages/server/api/src/app/helper/pagination/paginator.ts @@ -31,8 +31,16 @@ export type PagingResult = { cursor: CursorResult; }; +type CursorContext = { + primaryColumnName: string; + primaryParamName: string; + secondaryColumnName: string | null; + secondaryParamName: string | null; +}; + const PAGINATION_KEY = 'created'; const CUSTOM_PAGINATION_KEY = 'custom_pagination'; +const CUSTOM_PAGINATION_SECONDARY_KEY = 'custom_pagination_tie_breaker'; const DEFAULT_TIMESTAMP_TYPE = 'timestamp with time zone'; export default class Paginator { @@ -56,6 +64,12 @@ export default class Paginator { private paginationColumnType: string | null = null; + private paginationSecondaryColumnPath: string | null = null; + + private paginationSecondaryColumnName: string | null = null; + + private paginationSecondaryColumnType: string | null = null; + public constructor(private readonly entity: EntitySchema) {} public setPaginationColumn( @@ -72,6 +86,16 @@ export default class Paginator { this.alias = alias; } + public setPaginationSecondaryColumn( + columnPath: string, + columnName: string, + columnType = 'string', + ): void { + this.paginationSecondaryColumnPath = columnPath; + this.paginationSecondaryColumnName = columnName; + this.paginationSecondaryColumnType = columnType; + } + public setAfterCursor(cursor: string): void { this.afterCursor = cursor; } @@ -169,30 +193,28 @@ export default class Paginator { where: WhereExpressionBuilder, cursors: CursorParam, ): void { - const dbType = system.get(AppSystemProp.DB_TYPE); + const dbType = this.getSupportedDbType(); const operator = this.getOperator(); - let queryString: string; - - const isCustomColumn = - this.paginationColumnName && cursors[CUSTOM_PAGINATION_KEY]; - const columnName = isCustomColumn - ? this.paginationColumnName - : `${this.alias}.${PAGINATION_KEY}`; - const paramName = isCustomColumn ? CUSTOM_PAGINATION_KEY : PAGINATION_KEY; - - if (dbType === DatabaseType.SQLITE3) { - queryString = `${columnName} ${operator} :${paramName}`; - } else if (dbType === DatabaseType.POSTGRES) { - if (this.hasBeforeCursor() && !this.hasAfterCursor()) { - queryString = `${columnName} ${operator} (:${paramName}::timestamp + INTERVAL '1 millisecond')`; - } else { - queryString = `${columnName} ${operator} :${paramName}::timestamp`; - } - } else { - throw new Error('Unsupported database type'); + const context = this.resolveCursorContext(cursors); + + if (context.secondaryColumnName && context.secondaryParamName) { + this.applyCompositeCursorFilter( + where, + cursors, + dbType, + operator, + context, + ); + return; } - where.orWhere(queryString, cursors); + this.applySingleColumnCursorFilter( + where, + cursors, + dbType, + operator, + context, + ); } private getOperator(): string { @@ -222,6 +244,10 @@ export default class Paginator { orderByCondition[`${this.alias}.${PAGINATION_KEY}`] = order; } + if (this.paginationColumnName && this.paginationSecondaryColumnName) { + orderByCondition[this.paginationSecondaryColumnName] = order; + } + return orderByCondition; } @@ -257,9 +283,31 @@ export default class Paginator { this.paginationColumnType || DEFAULT_TIMESTAMP_TYPE, value, ); - const payload = `${CUSTOM_PAGINATION_KEY}:${encodedValue}`; + const payload = [`${CUSTOM_PAGINATION_KEY}:${encodedValue}`]; - return btoa(payload); + if ( + this.paginationSecondaryColumnPath && + this.paginationSecondaryColumnName + ) { + const secondaryValue = getValueByPath( + entity, + this.paginationSecondaryColumnPath, + ); + if (secondaryValue === null || secondaryValue === undefined) { + throw new Error( + `Pagination secondary column not found at path: ${this.paginationSecondaryColumnPath}`, + ); + } + const encodedSecondaryValue = encodeByType( + this.paginationSecondaryColumnType || 'string', + secondaryValue, + ); + payload.push( + `${CUSTOM_PAGINATION_SECONDARY_KEY}:${encodedSecondaryValue}`, + ); + } + + return btoa(payload.join(',')); } private decode(cursor: string): CursorParam { @@ -279,6 +327,9 @@ export default class Paginator { if (key === CUSTOM_PAGINATION_KEY) { return this.paginationColumnType || DEFAULT_TIMESTAMP_TYPE; } + if (key === CUSTOM_PAGINATION_SECONDARY_KEY) { + return this.paginationSecondaryColumnType || 'string'; + } const col = this.entity.options.columns[key]; if (col === undefined) { @@ -291,6 +342,162 @@ export default class Paginator { return order === Order.ASC ? Order.DESC : Order.ASC; } + private buildComparisonClause({ + dbType, + columnName, + paramName, + operator, + }: { + dbType: DatabaseType; + columnName: string; + paramName: string; + operator: string; + }): string { + if (dbType === DatabaseType.SQLITE3) { + return `${columnName} ${operator} :${paramName}`; + } + + if (dbType === DatabaseType.POSTGRES) { + const type = this.getEntityPropertyType(paramName); + if (this.isTimestampType(type)) { + if (operator === '<') { + return `${columnName} < :${paramName}::timestamptz`; + } + if (operator === '>') { + return `${columnName} >= (:${paramName}::timestamptz + INTERVAL '1 millisecond')`; + } + if (operator === '=') { + return `(${columnName} >= :${paramName}::timestamptz AND ${columnName} < (:${paramName}::timestamptz + INTERVAL '1 millisecond'))`; + } + return `${columnName} ${operator} :${paramName}::timestamptz`; + } + return `${columnName} ${operator} :${paramName}`; + } + + throw new Error('Unsupported database type'); + } + + private isTimestampType(type: string): boolean { + return ( + type === 'timestamp with time zone' || + type === 'datetime' || + type === 'date' + ); + } + + private getSupportedDbType(): DatabaseType { + const dbType = system.get(AppSystemProp.DB_TYPE); + if (dbType === DatabaseType.SQLITE3 || dbType === DatabaseType.POSTGRES) { + return dbType; + } + throw new Error('Unsupported database type'); + } + + private resolveCursorContext(cursors: CursorParam): CursorContext { + const customPaginationColumnName = this.paginationColumnName; + const hasCustomPaginationCursor = + customPaginationColumnName !== null && + cursors[CUSTOM_PAGINATION_KEY] !== undefined; + + const primaryColumnName = + hasCustomPaginationCursor && customPaginationColumnName + ? customPaginationColumnName + : `${this.alias}.${PAGINATION_KEY}`; + const primaryParamName = hasCustomPaginationCursor + ? CUSTOM_PAGINATION_KEY + : PAGINATION_KEY; + + const hasCustomSecondaryCursor = + this.paginationSecondaryColumnName !== null && + cursors[CUSTOM_PAGINATION_SECONDARY_KEY] !== undefined; + + if (hasCustomPaginationCursor && hasCustomSecondaryCursor) { + return { + primaryColumnName, + primaryParamName, + secondaryColumnName: this.paginationSecondaryColumnName, + secondaryParamName: CUSTOM_PAGINATION_SECONDARY_KEY, + }; + } + + return { + primaryColumnName, + primaryParamName, + secondaryColumnName: null, + secondaryParamName: null, + }; + } + + private applySingleColumnCursorFilter( + where: WhereExpressionBuilder, + cursors: CursorParam, + dbType: DatabaseType, + operator: string, + context: CursorContext, + ): void { + where.orWhere( + this.buildComparisonClause({ + dbType, + columnName: context.primaryColumnName, + paramName: context.primaryParamName, + operator, + }), + cursors, + ); + } + + private applyCompositeCursorFilter( + where: WhereExpressionBuilder, + cursors: CursorParam, + dbType: DatabaseType, + operator: string, + context: CursorContext, + ): void { + const { + primaryColumnName, + primaryParamName, + secondaryColumnName, + secondaryParamName, + } = context; + if (!secondaryColumnName || !secondaryParamName) { + throw new Error('Pagination secondary context is not configured'); + } + + where.orWhere( + this.buildComparisonClause({ + dbType, + columnName: primaryColumnName, + paramName: primaryParamName, + operator, + }), + cursors, + ); + + // Lexicographic cursor compare: primary equals, then compare secondary key. + where.orWhere( + new Brackets((nestedWhere) => { + nestedWhere.where( + this.buildComparisonClause({ + dbType, + columnName: primaryColumnName, + paramName: primaryParamName, + operator: '=', + }), + cursors, + ); + nestedWhere.andWhere( + this.buildComparisonClause({ + dbType, + columnName: secondaryColumnName, + paramName: secondaryParamName, + operator, + }), + cursors, + ); + }), + ); + } + private toPagingResult(entities: Entity[]): PagingResult { return { data: entities, diff --git a/packages/server/api/test/integration/helper/pagination/paginator.integration.test.ts b/packages/server/api/test/integration/helper/pagination/paginator.integration.test.ts index bd292934a7..eea727759e 100644 --- a/packages/server/api/test/integration/helper/pagination/paginator.integration.test.ts +++ b/packages/server/api/test/integration/helper/pagination/paginator.integration.test.ts @@ -102,6 +102,10 @@ const FOUR_RUNS_TEST_DATA = [ describe('Paginator Integration Tests', () => { let dataSource: DataSource; + const buildFlowRunsQuery = (projectId = 'proj1') => + dataSource + .createQueryBuilder(TestFlowRunEntity, 'fr') + .where('fr.projectId = :projectId', { projectId }); beforeAll(async () => { dataSource = new DataSource({ @@ -174,9 +178,7 @@ describe('Paginator Integration Tests', () => { paginator.setOrder(Order.DESC); paginator.setLimit(2); - const query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + const query = buildFlowRunsQuery(); const result = await paginator.paginate(query); @@ -229,9 +231,7 @@ describe('Paginator Integration Tests', () => { paginator.setOrder(Order.DESC); paginator.setLimit(2); - let query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + let query = buildFlowRunsQuery(); const firstPage = await paginator.paginate(query); @@ -241,9 +241,7 @@ describe('Paginator Integration Tests', () => { paginator2.setLimit(2); paginator2.setAfterCursor(firstPage.cursor.afterCursor!); - query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + query = buildFlowRunsQuery(); const secondPage = await paginator2.paginate(query); @@ -253,9 +251,7 @@ describe('Paginator Integration Tests', () => { paginator3.setLimit(2); paginator3.setBeforeCursor(secondPage.cursor.beforeCursor!); - query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + query = buildFlowRunsQuery(); const backwardPage = await paginator3.paginate(query); @@ -298,9 +294,7 @@ describe('Paginator Integration Tests', () => { paginator.setPaginationColumn('created', 'fr.created', 'datetime'); - const query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + const query = buildFlowRunsQuery(); const result = await paginator.paginate(query); @@ -310,6 +304,101 @@ describe('Paginator Integration Tests', () => { }); }); + describe('Composite Pagination with Secondary Column', () => { + const buildProjectQuery = () => buildFlowRunsQuery(); + + test('should paginate same-updated rows without skipping (limit 10, >20 rows)', async () => { + const sharedCreated = '2025-01-01 08:51:00.123'; + const totalRows = 25; + const rows = Array.from({ length: totalRows }, (_, index) => ({ + id: `run-${String(index + 1).padStart(3, '0')}`, + created: sharedCreated, + projectId: 'proj1', + status: 'SUCCEEDED', + })); + + for (const row of rows) { + await dataSource + .createQueryBuilder() + .insert() + .into('test_flow_runs') + .values(row) + .execute(); + } + + const expectedSortedIds = rows + .map((row) => row.id) + .sort((a, b) => b.localeCompare(a)); + + const page1Paginator = new Paginator(TestFlowRunEntity); + page1Paginator.setAlias('fr'); + page1Paginator.setOrder(Order.DESC); + page1Paginator.setLimit(10); + page1Paginator.setPaginationColumn('created', 'fr.created', 'datetime'); + page1Paginator.setPaginationSecondaryColumn('id', 'fr.id', 'string'); + const page1 = await page1Paginator.paginate(buildProjectQuery()); + + const page2Paginator = new Paginator(TestFlowRunEntity); + page2Paginator.setAlias('fr'); + page2Paginator.setOrder(Order.DESC); + page2Paginator.setLimit(10); + page2Paginator.setPaginationColumn('created', 'fr.created', 'datetime'); + page2Paginator.setPaginationSecondaryColumn('id', 'fr.id', 'string'); + page2Paginator.setAfterCursor(page1.cursor.afterCursor!); + const page2 = await page2Paginator.paginate(buildProjectQuery()); + + const combinedIds = [...page1.data, ...page2.data].map((row) => row.id); + + expect(page1.data).toHaveLength(10); + expect(page2.data).toHaveLength(10); + expect(combinedIds).toEqual(expectedSortedIds.slice(0, 20)); + }); + + test('should not duplicate rows across consecutive pages', async () => { + const sharedCreated = '2025-01-01 08:51:00.456'; + const rows = Array.from({ length: 15 }, (_, index) => ({ + id: `row-${String(index + 1).padStart(3, '0')}`, + created: sharedCreated, + projectId: 'proj1', + status: 'RUNNING', + })); + + for (const row of rows) { + await dataSource + .createQueryBuilder() + .insert() + .into('test_flow_runs') + .values(row) + .execute(); + } + + const page1Paginator = new Paginator(TestFlowRunEntity); + page1Paginator.setAlias('fr'); + page1Paginator.setOrder(Order.DESC); + page1Paginator.setLimit(10); + page1Paginator.setPaginationColumn('created', 'fr.created', 'datetime'); + page1Paginator.setPaginationSecondaryColumn('id', 'fr.id', 'string'); + const page1 = await page1Paginator.paginate(buildProjectQuery()); + + const page2Paginator = new Paginator(TestFlowRunEntity); + page2Paginator.setAlias('fr'); + page2Paginator.setOrder(Order.DESC); + page2Paginator.setLimit(10); + page2Paginator.setPaginationColumn('created', 'fr.created', 'datetime'); + page2Paginator.setPaginationSecondaryColumn('id', 'fr.id', 'string'); + page2Paginator.setAfterCursor(page1.cursor.afterCursor!); + const page2 = await page2Paginator.paginate(buildProjectQuery()); + + const page1Ids = page1.data.map((row) => row.id); + const page2Ids = new Set(page2.data.map((row) => row.id)); + const duplicateIds = page1Ids.filter((id) => page2Ids.has(id)); + + expect(page1.data).toHaveLength(10); + expect(page2.data).toHaveLength(5); + expect(duplicateIds).toHaveLength(0); + }); + }); + describe('Edge Cases', () => { describe('refetch when backward result is shorter than limit', () => { test.each([3, 4])( @@ -326,10 +415,7 @@ describe('Paginator Integration Tests', () => { .execute(); } - const queryBase = () => - dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + const queryBase = () => buildFlowRunsQuery(); const paginator1 = new Paginator(TestFlowRunEntity); paginator1.setAlias('fr'); @@ -376,10 +462,7 @@ describe('Paginator Integration Tests', () => { .execute(); } - const queryBase = () => - dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + const queryBase = () => buildFlowRunsQuery(); const paginator1 = new Paginator(TestFlowRunEntity); paginator1.setAlias('fr'); @@ -415,9 +498,7 @@ describe('Paginator Integration Tests', () => { paginator.setAlias('fr'); paginator.setLimit(10); - const query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'nonexistent' }); + const query = buildFlowRunsQuery('nonexistent'); const result = await paginator.paginate(query); @@ -443,9 +524,7 @@ describe('Paginator Integration Tests', () => { paginator.setAlias('fr'); paginator.setLimit(10); - const query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + const query = buildFlowRunsQuery(); const result = await paginator.paginate(query); @@ -491,9 +570,7 @@ describe('Paginator Integration Tests', () => { paginator.setOrder(Order.DESC); paginator.setLimit(3); - const query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + const query = buildFlowRunsQuery(); const result = await paginator.paginate(query); @@ -546,9 +623,7 @@ describe('Paginator Integration Tests', () => { paginator1.setOrder(Order.DESC); paginator1.setLimit(2); - let query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + let query = buildFlowRunsQuery(); const page1 = await paginator1.paginate(query); @@ -558,9 +633,7 @@ describe('Paginator Integration Tests', () => { paginator2.setLimit(2); paginator2.setAfterCursor(page1.cursor.afterCursor!); - query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + query = buildFlowRunsQuery(); const page2 = await paginator2.paginate(query); @@ -570,9 +643,7 @@ describe('Paginator Integration Tests', () => { paginator3.setLimit(2); paginator3.setBeforeCursor(page2.cursor.beforeCursor!); - query = dataSource - .createQueryBuilder(TestFlowRunEntity, 'fr') - .where('fr.projectId = :projectId', { projectId: 'proj1' }); + query = buildFlowRunsQuery(); const backPage = await paginator3.paginate(query);