Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion bulker/bulkerlib/implementations/sql/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
// stages 2 and 3 read a much smaller, pre-deduped relation. Each stage
// emits its own WarehouseState so timing for dedup/update/insert is
// visible in the run report.
sfDedupStatement = `CREATE OR REPLACE TEMPORARY TABLE {{.NamespaceFrom}}{{.DedupTable}} AS SELECT {{.Columns}} FROM (SELECT {{.Columns}}, ROW_NUMBER() OVER (PARTITION BY {{.PrimaryKeyColumns}}{{.Discriminator}}) rn FROM {{.NamespaceFrom}}{{.TableFrom}}) QUALIFY rn = MAX(rn) OVER (PARTITION BY {{.PrimaryKeyColumns}})`
sfDedupStatement = `CREATE OR REPLACE TEMPORARY TABLE {{.NamespaceFrom}}{{.DedupTable}} AS SELECT {{.Columns}} FROM (SELECT {{.Columns}}, ROW_NUMBER() OVER (PARTITION BY {{.PrimaryKeyColumns}}{{.Discriminator}}) rn FROM {{.NamespaceFrom}}{{.TableFrom}}) QUALIFY rn = MAX(rn) OVER (PARTITION BY {{.PrimaryKeyColumns}}){{if .DedupOrderBy}} ORDER BY {{.DedupOrderBy}}{{end}}`
sfMergeUpdateStatement = `UPDATE {{.Namespace}}{{.TableTo}} T SET {{.UpdateSet}} FROM {{.NamespaceFrom}}{{.DedupTable}} S WHERE {{.JoinConditions}}`
sfMergeInsertStatement = `INSERT INTO {{.Namespace}}{{.TableTo}} ({{.Columns}}) SELECT {{.SourceColumns}} FROM {{.NamespaceFrom}}{{.DedupTable}} S WHERE NOT EXISTS (SELECT 1 FROM {{.Namespace}}{{.TableTo}} T WHERE {{.JoinConditions}})`
sfDropDedupStatement = `DROP TABLE IF EXISTS {{.NamespaceFrom}}{{.DedupTable}}`
Expand Down Expand Up @@ -562,12 +562,25 @@ func (s *Snowflake) copyOrMergeSplit(ctx context.Context, targetTable *Table, so
})
pkColumns := utils.ArrayMap(targetTable.GetPKFields(), s.quotedColumnName)

// Pre-sort the dedup CTAS by the target's timestamp column so the
// later INSERT writes new T micro-partitions whose TO_DATE(ts) min/max
// are tight along T's CLUSTER BY (TO_DATE(timestamp)) key. This
// minimises the work auto-clustering has to do to re-partition new
// data. UPDATE rewrites preserve clustering on their own, so this is
// strictly an INSERT-side optimisation; no benefit (and no harm)
// when there is no timestamp column.
var dedupOrderBy string
if targetTable.TimestampColumn != "" {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible runtime regression: this uses targetTable.TimestampColumn unconditionally when the target has one, but the dedup CTAS reads from sourceTable columns for the current batch. If a batch doesn’t carry that timestamp field (while destination metadata still has it), the generated ORDER BY references a missing column and dedup fails with an invalid identifier. Should we gate this with sourceTable.Columns.Get(targetTable.TimestampColumn) before setting DedupOrderBy?

dedupOrderBy = s.quotedColumnName(targetTable.TimestampColumn)
}

payload := QueryPayload{
Namespace: s.namespacePrefix(targetTable.Namespace),
NamespaceFrom: s.namespacePrefix(sourceTable.Namespace),
TableTo: s.quotedTableName(targetTable.Name),
TableFrom: s.quotedTableName(sourceTable.Name),
DedupTable: s.quotedTableName(sourceTable.Name + "_DEDUP"),
DedupOrderBy: dedupOrderBy,
Columns: strings.Join(columnNames, ","),
PrimaryKeyName: targetTable.PrimaryKeyName,
PrimaryKeyColumns: strings.Join(pkColumns, ","),
Expand Down
1 change: 1 addition & 0 deletions bulker/bulkerlib/implementations/sql/sql_adapter_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ type QueryPayload struct {
TableTo string
TableFrom string
DedupTable string
DedupOrderBy string
JoinConditions string
SourceColumns string
}
Expand Down
Loading