From 713d9d45f655dd36ae833dd482d51a673c3a59ea Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 16 Feb 2026 22:02:40 +0000 Subject: [PATCH 1/2] Fix issue where interleaving non-Pk column with PK columns cause lookup panic --- bindings/python/src/lookup.rs | 4 +-- bindings/python/src/table.rs | 52 ++++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs index 718f8e50..e2edbf41 100644 --- a/bindings/python/src/lookup.rs +++ b/bindings/python/src/lookup.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::table::{internal_row_to_dict, python_to_sparse_generic_row}; +use crate::table::{internal_row_to_dict, python_to_dense_generic_row}; use crate::*; use pyo3_async_runtimes::tokio::future_into_py; use std::sync::Arc; @@ -53,7 +53,7 @@ impl Lookuper { pk: &Bound<'_, PyAny>, ) -> PyResult> { let pk_indices = self.table_info.get_schema().primary_key_indexes(); - let generic_row = python_to_sparse_generic_row(pk, &self.table_info, &pk_indices)?; + let generic_row = python_to_dense_generic_row(pk, &self.table_info, &pk_indices)?; let inner = self.inner.clone(); let table_info = self.table_info.clone(); diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 80852152..c3ea248e 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -783,6 +783,7 @@ fn process_sequence( target_indices: &[usize], fields: &[fcore::metadata::DataField], datums: &mut [fcore::row::Datum<'static>], + sparse: bool, ) -> PyResult<()> { if seq.len()? != target_indices.len() { return Err(FlussError::new_err(format!( @@ -794,7 +795,8 @@ fn process_sequence( for (i, &col_idx) in target_indices.iter().enumerate() { let field = &fields[col_idx]; let value = seq.get_item(i)?; - datums[col_idx] = python_value_to_datum(&value, field.data_type()) + let dest = if sparse { col_idx } else { i }; + datums[dest] = python_value_to_datum(&value, field.data_type()) .map_err(|e| FlussError::new_err(format!("Field '{}': {}", field.name(), e)))?; } Ok(()) @@ -806,12 +808,37 @@ pub fn python_to_sparse_generic_row( row: &Bound, table_info: &fcore::metadata::TableInfo, target_indices: &[usize], +) -> PyResult> { + python_to_generic_row_inner(row, table_info, target_indices, true) +} + +/// Build a dense GenericRow with exactly `target_indices.len()` fields, +/// containing only the target column values in order. +pub fn python_to_dense_generic_row( + row: &Bound, + table_info: &fcore::metadata::TableInfo, + target_indices: &[usize], +) -> PyResult> { + python_to_generic_row_inner(row, table_info, target_indices, false) +} + +/// Build a GenericRow from user input. When `sparse` is true, the row is full width and padded with nulls +fn python_to_generic_row_inner( + row: &Bound, + table_info: &fcore::metadata::TableInfo, + target_indices: &[usize], + sparse: bool, ) -> PyResult> { let row_type = table_info.row_type(); let fields = row_type.fields(); let target_names: Vec<&str> = target_indices.iter().map(|&i| fields[i].name()).collect(); - let mut datums: Vec> = vec![fcore::row::Datum::Null; fields.len()]; + let num_fields = if sparse { + fields.len() + } else { + target_indices.len() + }; + let mut datums: Vec> = vec![fcore::row::Datum::Null; num_fields]; let row_input: RowInput = row.extract().map_err(|_| { let type_name = row @@ -849,19 +876,30 @@ pub fn python_to_sparse_generic_row( let value = dict .get_item(name)? .ok_or_else(|| FlussError::new_err(format!("Missing field: {name}")))?; - datums[col_idx] = python_value_to_datum(&value, field.data_type()) + let dest = if sparse { col_idx } else { i }; + datums[dest] = python_value_to_datum(&value, field.data_type()) .map_err(|e| FlussError::new_err(format!("Field '{name}': {e}")))?; } } RowInput::List(list) => { - let seq = list.as_sequence(); - process_sequence(seq, target_indices, fields, &mut datums)?; + process_sequence( + list.as_sequence(), + target_indices, + fields, + &mut datums, + sparse, + )?; } RowInput::Tuple(tuple) => { - let seq = tuple.as_sequence(); - process_sequence(seq, target_indices, fields, &mut datums)?; + process_sequence( + tuple.as_sequence(), + target_indices, + fields, + &mut datums, + sparse, + )?; } } From 3661b0ebc1517ad1a47336c74a38ed5692fc1dd0 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 17 Feb 2026 12:15:33 +0000 Subject: [PATCH 2/2] Add IT --- bindings/python/test/test_kv_table.py | 4 +++- bindings/python/test/test_log_table.py | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/bindings/python/test/test_kv_table.py b/bindings/python/test/test_kv_table.py index 98b0cee9..36aa3e46 100644 --- a/bindings/python/test/test_kv_table.py +++ b/bindings/python/test/test_kv_table.py @@ -101,12 +101,14 @@ async def test_composite_primary_keys(connection, admin): table_path = fluss.TablePath("fluss", "py_test_composite_pk") await admin.drop_table(table_path, ignore_if_not_exists=True) + # PK columns intentionally interleaved with non-PK column to verify + # that lookup correctly handles non-contiguous primary key indices. schema = fluss.Schema( pa.schema( [ pa.field("region", pa.string()), - pa.field("user_id", pa.int32()), pa.field("score", pa.int64()), + pa.field("user_id", pa.int32()), ] ), primary_keys=["region", "user_id"], diff --git a/bindings/python/test/test_log_table.py b/bindings/python/test/test_log_table.py index 3219f03c..09586aa8 100644 --- a/bindings/python/test/test_log_table.py +++ b/bindings/python/test/test_log_table.py @@ -179,6 +179,9 @@ async def test_list_offsets(connection, admin): ) assert ts_before[0] == 0 + # Intentional sleep to avoid race condition FlussError(code=38) The timestamp is invalid + await asyncio.sleep(1) + # Timestamp after append should resolve to offset 3 ts_after = await admin.list_offsets( table_path,