Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bindings/python/src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::table::{internal_row_to_dict, python_to_sparse_generic_row};
use crate::table::{internal_row_to_dict, python_to_dense_generic_row};
use crate::*;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
Expand Down Expand Up @@ -53,7 +53,7 @@ impl Lookuper {
pk: &Bound<'_, PyAny>,
) -> PyResult<Bound<'py, PyAny>> {
let pk_indices = self.table_info.get_schema().primary_key_indexes();
let generic_row = python_to_sparse_generic_row(pk, &self.table_info, &pk_indices)?;
let generic_row = python_to_dense_generic_row(pk, &self.table_info, &pk_indices)?;
let inner = self.inner.clone();
let table_info = self.table_info.clone();

Expand Down
52 changes: 45 additions & 7 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ fn process_sequence(
target_indices: &[usize],
fields: &[fcore::metadata::DataField],
datums: &mut [fcore::row::Datum<'static>],
sparse: bool,
) -> PyResult<()> {
if seq.len()? != target_indices.len() {
return Err(FlussError::new_err(format!(
Expand All @@ -794,7 +795,8 @@ fn process_sequence(
for (i, &col_idx) in target_indices.iter().enumerate() {
let field = &fields[col_idx];
let value = seq.get_item(i)?;
datums[col_idx] = python_value_to_datum(&value, field.data_type())
let dest = if sparse { col_idx } else { i };
datums[dest] = python_value_to_datum(&value, field.data_type())
.map_err(|e| FlussError::new_err(format!("Field '{}': {}", field.name(), e)))?;
}
Ok(())
Expand All @@ -806,12 +808,37 @@ pub fn python_to_sparse_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
target_indices: &[usize],
) -> PyResult<fcore::row::GenericRow<'static>> {
python_to_generic_row_inner(row, table_info, target_indices, true)
}

/// Build a dense GenericRow with exactly `target_indices.len()` fields,
/// containing only the target column values in order.
pub fn python_to_dense_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
target_indices: &[usize],
) -> PyResult<fcore::row::GenericRow<'static>> {
python_to_generic_row_inner(row, table_info, target_indices, false)
}

/// Build a GenericRow from user input. When `sparse` is true, the row is full width and padded with nulls
fn python_to_generic_row_inner(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
target_indices: &[usize],
sparse: bool,
) -> PyResult<fcore::row::GenericRow<'static>> {
let row_type = table_info.row_type();
let fields = row_type.fields();
let target_names: Vec<&str> = target_indices.iter().map(|&i| fields[i].name()).collect();

let mut datums: Vec<fcore::row::Datum<'static>> = vec![fcore::row::Datum::Null; fields.len()];
let num_fields = if sparse {
fields.len()
} else {
target_indices.len()
};
let mut datums: Vec<fcore::row::Datum<'static>> = vec![fcore::row::Datum::Null; num_fields];

let row_input: RowInput = row.extract().map_err(|_| {
let type_name = row
Expand Down Expand Up @@ -849,19 +876,30 @@ pub fn python_to_sparse_generic_row(
let value = dict
.get_item(name)?
.ok_or_else(|| FlussError::new_err(format!("Missing field: {name}")))?;
datums[col_idx] = python_value_to_datum(&value, field.data_type())
let dest = if sparse { col_idx } else { i };
datums[dest] = python_value_to_datum(&value, field.data_type())
.map_err(|e| FlussError::new_err(format!("Field '{name}': {e}")))?;
}
}

RowInput::List(list) => {
let seq = list.as_sequence();
process_sequence(seq, target_indices, fields, &mut datums)?;
process_sequence(
list.as_sequence(),
target_indices,
fields,
&mut datums,
sparse,
)?;
}

RowInput::Tuple(tuple) => {
let seq = tuple.as_sequence();
process_sequence(seq, target_indices, fields, &mut datums)?;
process_sequence(
tuple.as_sequence(),
target_indices,
fields,
&mut datums,
sparse,
)?;
}
}

Expand Down
4 changes: 3 additions & 1 deletion bindings/python/test/test_kv_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ async def test_composite_primary_keys(connection, admin):
table_path = fluss.TablePath("fluss", "py_test_composite_pk")
await admin.drop_table(table_path, ignore_if_not_exists=True)

# PK columns intentionally interleaved with non-PK column to verify
# that lookup correctly handles non-contiguous primary key indices.
schema = fluss.Schema(
pa.schema(
[
pa.field("region", pa.string()),
pa.field("user_id", pa.int32()),
pa.field("score", pa.int64()),
pa.field("user_id", pa.int32()),
]
),
primary_keys=["region", "user_id"],
Expand Down
3 changes: 3 additions & 0 deletions bindings/python/test/test_log_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ async def test_list_offsets(connection, admin):
)
assert ts_before[0] == 0

# Intentional sleep to avoid race condition FlussError(code=38) The timestamp is invalid
await asyncio.sleep(1)

# Timestamp after append should resolve to offset 3
ts_after = await admin.list_offsets(
table_path,
Expand Down
Loading