From 8a95b9d1a335b0f5ea4ca35281bfee4d6386d534 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 13 Feb 2026 11:13:16 +0800 Subject: [PATCH] feat: add support of iceberg key encoder --- .../src/row/encode/iceberg_key_encoder.rs | 311 ++++++++++++++++++ crates/fluss/src/row/encode/mod.rs | 8 +- 2 files changed, 316 insertions(+), 3 deletions(-) create mode 100644 crates/fluss/src/row/encode/iceberg_key_encoder.rs diff --git a/crates/fluss/src/row/encode/iceberg_key_encoder.rs b/crates/fluss/src/row/encode/iceberg_key_encoder.rs new file mode 100644 index 00000000..99bc0501 --- /dev/null +++ b/crates/fluss/src/row/encode/iceberg_key_encoder.rs @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg key encoding. +//! +//! This encoder follows Fluss Java's `IcebergKeyEncoder` / `IcebergBinaryRowWriter`: +//! - **Exactly one** key field is supported for Iceberg format. +//! - INT / DATE are encoded as **8-byte little-endian long** (int promoted to long). +//! - TIME is encoded as **microseconds since midnight** in 8-byte little-endian long. +//! - TIMESTAMP (without time zone) is encoded as **microseconds since epoch** in 8-byte +//! little-endian long: `millis * 1000 + (nanos_of_millis / 1000)`. +//! - BIGINT / FLOAT / DOUBLE are encoded as little-endian primitives. +//! - DECIMAL is encoded as **unscaled value bytes** (two's complement big-endian) with **no** +//! length prefix. +//! - STRING / CHAR are encoded as UTF-8 bytes with **no** length prefix. +//! - BYTES / BINARY are encoded as raw bytes with **no** length prefix. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::{DataType, RowType}; +use crate::row::encode::KeyEncoder; +use crate::row::field_getter::FieldGetter; +use crate::row::{Datum, InternalRow}; +use bytes::Bytes; + +#[allow(dead_code)] +pub struct IcebergKeyEncoder { + field_getter: FieldGetter, + field_type: DataType, +} + +impl IcebergKeyEncoder { + pub fn create_key_encoder(row_type: &RowType, keys: &[String]) -> Result { + if keys.len() != 1 { + return Err(IllegalArgument { + message: format!( + "Key fields must have exactly one field for iceberg format, but got: {keys:?}" + ), + }); + } + + let key = &keys[0]; + let key_index = row_type + .get_field_index(key) + .ok_or_else(|| IllegalArgument { + message: format!("Field {key:?} not found in input row type {row_type:?}"), + })?; + + let field_type = row_type + .fields() + .get(key_index) + .ok_or_else(|| IllegalArgument { + message: format!("Invalid key field index {key_index} for row type {row_type:?}"), + })? + .data_type() + .clone(); + + // Fail fast on unsupported types to match Java behavior. + Self::validate_supported_type(&field_type)?; + + Ok(IcebergKeyEncoder { + field_getter: FieldGetter::create(&field_type, key_index), + field_type, + }) + } + + fn validate_supported_type(field_type: &DataType) -> Result<()> { + match field_type { + DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Date(_) + | DataType::Time(_) + | DataType::Timestamp(_) + | DataType::Decimal(_) + | DataType::String(_) + | DataType::Char(_) + | DataType::Binary(_) + | DataType::Bytes(_) => Ok(()), + + DataType::Array(_) => Err(IllegalArgument { + message: + "Array types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + DataType::Map(_) => Err(IllegalArgument { + message: + "Map types cannot be used as bucket keys. Bucket keys must be scalar types." + .to_string(), + }), + other => Err(IllegalArgument { + message: format!("Unsupported type for Iceberg key encoder: {other}"), + }), + } + } +} + +#[allow(dead_code)] +impl KeyEncoder for IcebergKeyEncoder { + fn encode_key(&mut self, row: &dyn InternalRow) -> Result { + let value = self.field_getter.get_field(row); + if value.is_null() { + return Err(IllegalArgument { + message: "Cannot encode Iceberg key with null value".to_string(), + }); + } + + let bytes: Vec = match (&self.field_type, value) { + (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), + (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), + + (DataType::Time(_), Datum::Time(v)) => { + let micros = v.get_inner() as i64 * 1000; + micros.to_le_bytes().to_vec() + } + + (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), + (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), + (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), + + (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { + let micros = + ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); + micros.to_le_bytes().to_vec() + } + + (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), + (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), + (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), + (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), + + // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. + (expected_type, actual) => { + return Err(IllegalArgument { + message: format!( + "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" + ), + }); + } + }; + + Ok(Bytes::from(bytes)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::datum::{Date, Time, TimestampNtz}; + use crate::row::{Decimal, GenericRow}; + use bigdecimal::BigDecimal; + use std::str::FromStr; + + #[test] + fn test_single_key_field_requirement() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "name"], + ); + + // ok with single key + let _ = IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + // error with multiple keys + let err = IcebergKeyEncoder::create_key_encoder( + &row_type, + &["id".to_string(), "name".to_string()], + ) + .err() + .unwrap(); + assert!(matches!(err, crate::error::Error::IllegalArgument { .. })); + assert!( + err.to_string() + .contains("Key fields must have exactly one field for iceberg format") + ); + } + + #[test] + fn test_integer_encoding() { + let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int()], vec!["id"]); + let row = GenericRow::from_data(vec![Datum::from(42i32)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (42i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_long_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bigint()], vec!["id"]); + let v = 1234567890123456789i64; + let row = GenericRow::from_data(vec![Datum::from(v)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), v.to_le_bytes().as_slice()); + } + + #[test] + fn test_string_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::string()], vec!["name"]); + let s = "Hello Iceberg, Fluss this side!"; + let row = GenericRow::from_data(vec![Datum::from(s)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["name".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), s.as_bytes()); + } + + #[test] + fn test_decimal_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::decimal(10, 2)], + vec!["amount"], + ); + + let bd = BigDecimal::from_str("123.45").unwrap(); + let decimal = Decimal::from_big_decimal(bd.clone(), 10, 2).unwrap(); + let row = GenericRow::from_data(vec![Datum::Decimal(decimal.clone())]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["amount".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), decimal.to_unscaled_bytes().as_slice()); + } + + #[test] + fn test_timestamp_encoding() { + let row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::timestamp_with_precision(6)], + vec!["event_time"], + ); + + let millis = 1698235273182i64; + let nanos = 123_456i32; + let ts = TimestampNtz::from_millis_nanos(millis, nanos).unwrap(); + let micros = millis * 1000 + (nanos as i64 / 1000); + + let row = GenericRow::from_data(vec![Datum::TimestampNtz(ts)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["event_time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_date_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::date()], vec!["date"]); + + let days = 19655i32; // 2023-10-25 + let row = GenericRow::from_data(vec![Datum::Date(Date::new(days))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["date".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), (days as i64).to_le_bytes().as_slice()); + } + + #[test] + fn test_time_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::time()], vec!["time"]); + + let millis_since_midnight = 34_200_000i32; + let micros = millis_since_midnight as i64 * 1000; + let row = GenericRow::from_data(vec![Datum::Time(Time::new(millis_since_midnight))]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["time".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), micros.to_le_bytes().as_slice()); + } + + #[test] + fn test_binary_encoding() { + let row_type = + RowType::with_data_types_and_field_names(vec![DataTypes::bytes()], vec!["data"]); + + let bytes = b"Hello i only understand binary data".as_slice(); + let row = GenericRow::from_data(vec![Datum::from(bytes)]); + let mut encoder = + IcebergKeyEncoder::create_key_encoder(&row_type, &["data".to_string()]).unwrap(); + + let encoded = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded.as_ref(), bytes); + } +} diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index 1ce7aef7..227724e0 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -17,11 +17,13 @@ mod compacted_key_encoder; mod compacted_row_encoder; +mod iceberg_key_encoder; use crate::error::{Error, Result}; use crate::metadata::{DataLakeFormat, KvFormat, RowType}; use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; use crate::row::encode::compacted_row_encoder::CompactedRowEncoder; +use crate::row::encode::iceberg_key_encoder::IcebergKeyEncoder; use crate::row::{Datum, InternalRow}; use bytes::Bytes; @@ -54,9 +56,9 @@ impl KeyEncoderFactory { Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( row_type, key_fields, )?)), - Some(DataLakeFormat::Iceberg) => Err(Error::UnsupportedOperation { - message: "KeyEncoder for Iceberg format is not yet implemented".to_string(), - }), + Some(DataLakeFormat::Iceberg) => Ok(Box::new(IcebergKeyEncoder::create_key_encoder( + row_type, key_fields, + )?)), None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder( row_type, key_fields, )?)),