Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/includes-aggregates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/db': patch
---

fix: support aggregates (e.g. count) in child/includes subqueries with per-parent scoping
89 changes: 60 additions & 29 deletions packages/db/src/query/compiler/group-by.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export function processGroupBy(
havingClauses?: Array<Having>,
selectClause?: Select,
fnHavingClauses?: Array<(row: any) => any>,
mainSource?: string,
): NamespacedAndKeyedStream {
// Handle empty GROUP BY (single-group aggregation)
if (groupByClause.length === 0) {
Expand Down Expand Up @@ -110,8 +111,15 @@ export function processGroupBy(
}
}

// Use a constant key for single group
const keyExtractor = () => ({ __singleGroup: true })
// Use a constant key for single group.
// When mainSource is set (includes mode), include __correlationKey so that
// rows from different parents aggregate separately.
const keyExtractor = mainSource
? ([, row]: [string, NamespacedRow]) => ({
__singleGroup: true,
__correlationKey: (row as any)?.[mainSource]?.__correlationKey,
})
: () => ({ __singleGroup: true })

// Apply the groupBy operator with single group
pipeline = pipeline.pipe(
Expand Down Expand Up @@ -139,14 +147,24 @@ export function processGroupBy(
)
}

// Use a single key for the result and update $selected
return [
`single_group`,
{
...aggregatedRow,
$selected: finalResults,
},
] as [unknown, Record<string, any>]
// Use a single key for the result and update $selected.
// When in includes mode, restore the namespaced source structure with
// __correlationKey so output extraction can route results per-parent.
const correlationKey = mainSource
? (aggregatedRow as any).__correlationKey
: undefined
const resultKey =
correlationKey !== undefined
? `single_group_${serializeValue(correlationKey)}`
: `single_group`
const resultRow: Record<string, any> = {
...aggregatedRow,
$selected: finalResults,
}
if (mainSource && correlationKey !== undefined) {
resultRow[mainSource] = { __correlationKey: correlationKey }
}
return [resultKey, resultRow] as [unknown, Record<string, any>]
}),
)

Expand Down Expand Up @@ -196,7 +214,9 @@ export function processGroupBy(
compileExpression(e),
)

// Create a key extractor function using simple __key_X format
// Create a key extractor function using simple __key_X format.
// When mainSource is set (includes mode), include __correlationKey so that
// rows from different parents with the same group key aggregate separately.
const keyExtractor = ([, row]: [
string,
NamespacedRow & { $selected?: any },
Expand All @@ -214,6 +234,10 @@ export function processGroupBy(
key[`__key_${i}`] = value
}

if (mainSource) {
key.__correlationKey = (row as any)?.[mainSource]?.__correlationKey
}

return key
}

Expand Down Expand Up @@ -278,25 +302,32 @@ export function processGroupBy(
}
}

// Generate a simple key for the live collection using group values
let finalKey: unknown
if (groupByClause.length === 1) {
finalKey = aggregatedRow[`__key_0`]
} else {
const keyParts: Array<unknown> = []
for (let i = 0; i < groupByClause.length; i++) {
keyParts.push(aggregatedRow[`__key_${i}`])
}
finalKey = serializeValue(keyParts)
// Generate a simple key for the live collection using group values.
// When in includes mode, include the correlation key so that groups
// from different parents don't collide.
const correlationKey = mainSource
? (aggregatedRow as any).__correlationKey
: undefined
const keyParts: Array<unknown> = []
for (let i = 0; i < groupByClause.length; i++) {
keyParts.push(aggregatedRow[`__key_${i}`])
}

return [
finalKey,
{
...aggregatedRow,
$selected: finalResults,
},
] as [unknown, Record<string, any>]
if (correlationKey !== undefined) {
keyParts.push(correlationKey)
}
const finalKey =
keyParts.length === 1 ? keyParts[0] : serializeValue(keyParts)

// When in includes mode, restore the namespaced source structure with
// __correlationKey so output extraction can route results per-parent.
const resultRow: Record<string, any> = {
...aggregatedRow,
$selected: finalResults,
}
if (mainSource && correlationKey !== undefined) {
resultRow[mainSource] = { __correlationKey: correlationKey }
}
return [finalKey, resultRow] as [unknown, Record<string, any>]
}),
)

Expand Down
7 changes: 6 additions & 1 deletion packages/db/src/query/compiler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,18 @@ export function compileQuery(
)
}

// Process the GROUP BY clause if it exists
// Process the GROUP BY clause if it exists.
// When in includes mode (parentKeyStream), pass mainSource so that groupBy
// preserves __correlationKey for per-parent aggregation.
const groupByMainSource = parentKeyStream ? mainSource : undefined
if (query.groupBy && query.groupBy.length > 0) {
pipeline = processGroupBy(
pipeline,
query.groupBy,
query.having,
query.select,
query.fnHaving,
groupByMainSource,
)
} else if (query.select) {
// Check if SELECT contains aggregates but no GROUP BY (implicit single-group aggregation)
Expand All @@ -551,6 +555,7 @@ export function compileQuery(
query.having,
query.select,
query.fnHaving,
groupByMainSource,
)
}
}
Expand Down
Loading
Loading