What's the feature are you trying to implement?
Add support for SQL MERGE INTO (UPSERT) operations in the iceberg-datafusion integration. This enables atomic row-level updates and inserts based on join conditions, essential for CDC pipelines, incremental updates, and data synchronization. I already have a PoC branch.
Refer to the insert_into support
The Spark SPJ (Storage Partition Join) style is the key optimization I wanted to introduce. The Datafusion currently doesn't support merge_into sql parsing and logic plan yet. I am contributing the "MERGE INTO" in datafusion as well: apache/datafusion#20746.
SQL Example:
MERGE INTO target_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET t.value = s.value
WHEN NOT MATCHED THEN
INSERT (id, value) VALUES (s.id, s.value)
Query Execution Plan (CoW Mode)
Baseline Plan (Unpartitioned Table)
┌─────────────────────────────┐
│ IcebergMergeCommitExec │ Commits via RowDelta transaction
│ (add + remove data files) │ Outputs: record count
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ CoalescePartitionsExec │ Merges all partitions into
│ │ single stream for atomic commit
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ IcebergMergeWriteExec │ Writes merged rows to new Parquet
│ │ files via TaskWriter; tracks
│ │ _file values as deleted_files
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ IcebergMergeExec │ FULL OUTER JOIN (HashJoinExec)
│ │ Classifies rows:
│ │ MATCHED → apply UPDATE exprs
│ │ NOT MATCHED → apply INSERT exprs
└──────┬──────────────┬───────┘
│ │
┌──────▼──────┐ ┌─────▼───────┐
│ IcebergTable│ │ Source Plan │
│ Scan │ │ (any exec) │
│ (target, │ │ │
│ with _file)│ │ │
└─────────────┘ └─────────────┘
SPJ-Optimized Plan (Partitioned Table)
When all partition columns appear in the join keys and use hash-compatible
transforms (Identity or Bucket), the optimizer wraps both sides with
repartitioning to eliminate cross-partition shuffles:
┌─────────────────────────────┐
│ IcebergMergeCommitExec │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ CoalescePartitionsExec │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ IcebergMergeWriteExec │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ IcebergMergeExec │
└──────┬──────────────┬───────┘
│ │
┌──────▼──────┐ ┌─────▼───────┐
│Repartition │ │Repartition │
│Exec (Hash │ │Exec (Hash │
│on _partition)│ │on _partition)│
└──────┬──────┘ └──────┬──────┘
│ │
┌──────▼──────┐ ┌──────▼──────┐
│Projection │ │Projection │
│Exec (adds │ │Exec (adds │
│_partition) │ │_partition) │
└──────┬──────┘ └──────┬──────┘
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ IcebergTable│ │ Source Plan │
│ Scan │ │ (any exec) │
│(target, │ │ │
│ with _file) │ │ │
└─────────────┘ └─────────────┘
The following tasks are already completed on the PoC branch. Will raise formal PRs one after another as the fork repo doesn't support stacking PRs.
Willingness to contribute
I would be willing to contribute to this feature with guidance from the Iceberg Rust community
What's the feature are you trying to implement?
Add support for SQL
MERGE INTO(UPSERT) operations in the iceberg-datafusion integration. This enables atomic row-level updates and inserts based on join conditions, essential for CDC pipelines, incremental updates, and data synchronization. I already have a PoC branch.Refer to the insert_into support
The Spark SPJ (Storage Partition Join) style is the key optimization I wanted to introduce. The Datafusion currently doesn't support
merge_intosql parsing and logic plan yet. I am contributing the "MERGE INTO" in datafusion as well: apache/datafusion#20746.SQL Example:
Query Execution Plan (CoW Mode)
Baseline Plan (Unpartitioned Table)
SPJ-Optimized Plan (Partitioned Table)
When all partition columns appear in the join keys and use hash-compatible
transforms (Identity or Bucket), the optimizer wraps both sides with
repartitioning to eliminate cross-partition shuffles:
The following tasks are already completed on the PoC branch. Will raise formal PRs one after another as the fork repo doesn't support stacking PRs.
Willingness to contribute
I would be willing to contribute to this feature with guidance from the Iceberg Rust community