Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod primitive_filter;
mod result;
mod static_filter;
mod strategy;
mod transform;

use static_filter::StaticFilter;
use strategy::instantiate_static_filter;
Expand Down
190 changes: 99 additions & 91 deletions datafusion/physical-expr/src/expressions/in_list/primitive_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,113 +29,106 @@ use std::hash::{Hash, Hasher};
use super::result::build_in_list_result;
use super::static_filter::{StaticFilter, handle_dictionary};

/// Bitmap filter for O(1) set membership via single bit test.
/// Storage for the bits used by [`BitmapFilter`].
///
/// `UInt8` has only 256 possible values, so the filter stores membership in a
/// 256-bit bitmap instead of using a hash table.
pub(super) struct UInt8BitmapFilter {
null_count: usize,
bits: [u64; 4],
/// `BitmapFilter` represents an `IN` list with one bit for each possible
/// value, so membership checks become direct bit tests. This trait lets the
/// same filter code use different storage sizes for different integer widths.
pub(super) trait BitmapStorage: Send + Sync {
fn new_zeroed() -> Self;
fn set_bit(&mut self, index: usize);
fn get_bit(&self, index: usize) -> bool;
}

impl UInt8BitmapFilter {
pub(super) fn try_new(in_array: &ArrayRef) -> Result<Self> {
let prim_array = in_array.as_primitive_opt::<UInt8Type>().ok_or_else(|| {
exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array")
})?;
let mut bits = [0u64; 4];
let mut set_bit = |v: u8| {
let index = usize::from(v);
bits[index / 64] |= 1u64 << (index % 64);
};

let values = prim_array.values();
match prim_array.nulls() {
None => {
for &v in values {
set_bit(v);
}
}
Some(nulls) => {
for i in
BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
{
set_bit(values[i]);
}
}
}
Ok(Self {
null_count: prim_array.null_count(),
bits,
})
// `UInt8` has 256 possible values, 0 through 255. One bit per value takes
// 256 bits, which fits in four `u64` words.
impl BitmapStorage for [u64; 4] {
#[inline]
fn new_zeroed() -> Self {
[0u64; 4]
}
#[inline]
fn set_bit(&mut self, index: usize) {
self[index / 64] |= 1u64 << (index % 64);
}

#[inline(always)]
fn check(&self, needle: u8) -> bool {
let index = needle as usize;
(self.bits[index / 64] >> (index % 64)) & 1 != 0
fn get_bit(&self, index: usize) -> bool {
(self[index / 64] >> (index % 64)) & 1 != 0
}
}

impl StaticFilter for UInt8BitmapFilter {
fn null_count(&self) -> usize {
self.null_count
// `UInt16` has 65,536 possible values. One bit per value takes 65,536 bits,
// which is 1,024 `u64` words, or 8 KiB. Box the array so the filter stores a
// pointer instead of carrying an 8 KiB array inline.
impl BitmapStorage for Box<[u64; 1024]> {
#[inline]
fn new_zeroed() -> Self {
Box::new([0u64; 1024])
}

fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
handle_dictionary!(self, v, negated);
let v = v.as_primitive_opt::<UInt8Type>().ok_or_else(|| {
exec_datafusion_err!("UInt8BitmapFilter: expected UInt8 array")
})?;
let input_values = v.values();
Ok(build_in_list_result(
v.len(),
v.nulls(),
self.null_count > 0,
negated,
#[inline(always)]
|i| {
// SAFETY: `build_in_list_result` invokes this closure for
// indices in `0..v.len()`, which matches `input_values.len()`.
let needle = unsafe { *input_values.get_unchecked(i) };
self.check(needle)
},
))
#[inline]
fn set_bit(&mut self, index: usize) {
self[index / 64] |= 1u64 << (index % 64);
}
#[inline(always)]
fn get_bit(&self, index: usize) -> bool {
(self[index / 64] >> (index % 64)) & 1 != 0
}
}

/// Bitmap filter for O(1) `UInt16` set membership via single bit test.
/// Arrow primitive types supported by [`BitmapFilter`].
///
/// Arrow already defines the Rust value type as `T::Native`. This trait only
/// supplies the bitmap storage size for the two integer domains that are small
/// enough to represent with one bit per possible value.
pub(super) trait BitmapFilterType:
ArrowPrimitiveType + Send + Sync + 'static
{
type Storage: BitmapStorage;
}

/// `UInt8` has 256 possible values, so four `u64` words cover the full domain.
impl BitmapFilterType for UInt8Type {
type Storage = [u64; 4];
}

/// `UInt16` has 65,536 possible values, so 1,024 `u64` words cover the full
/// domain.
impl BitmapFilterType for UInt16Type {
type Storage = Box<[u64; 1024]>;
}

/// `IN` filter backed by one bit per possible value.
///
/// `UInt16` has 65,536 possible values, so the filter stores membership in an
/// 8 KiB heap-allocated bitmap instead of using a hash table.
pub(super) struct UInt16BitmapFilter {
/// Building the filter scans the non-null values in the IN-list and turns on
/// the bit selected by each value. Evaluating input values checks the same bit
/// position. Null handling and `NOT IN` inversion are handled by
/// `build_in_list_result`.
pub(super) struct BitmapFilter<T: BitmapFilterType> {
null_count: usize,
bits: Box<[u64; 1024]>,
bits: T::Storage,
}

impl UInt16BitmapFilter {
impl<T> BitmapFilter<T>
where
T: BitmapFilterType,
{
pub(super) fn try_new(in_array: &ArrayRef) -> Result<Self> {
let prim_array = in_array.as_primitive_opt::<UInt16Type>().ok_or_else(|| {
exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array")
let prim_array = in_array.as_primitive_opt::<T>().ok_or_else(|| {
exec_datafusion_err!("BitmapFilter: expected {} array", T::DATA_TYPE)
})?;
let mut bits = Box::new([0u64; 1024]);
let mut set_bit = |v: u16| {
let index = usize::from(v);
bits[index / 64] |= 1u64 << (index % 64);
};

let mut bits = T::Storage::new_zeroed();
let values = prim_array.values();
match prim_array.nulls() {
None => {
for &v in values {
set_bit(v);
bits.set_bit(v.as_usize());
}
}
Some(nulls) => {
for i in
BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
{
set_bit(values[i]);
bits.set_bit(values[i].as_usize());
}
}
}
Expand All @@ -146,21 +139,39 @@ impl UInt16BitmapFilter {
}

#[inline(always)]
fn check(&self, needle: u16) -> bool {
let index = needle as usize;
(self.bits[index / 64] >> (index % 64)) & 1 != 0
fn check(&self, needle: T::Native) -> bool {
self.bits.get_bit(needle.as_usize())
}

/// Check membership using a raw values slice (zero-copy path for type reinterpretation).
#[inline]
pub(super) fn contains_slice(
&self,
values: &[T::Native],
nulls: Option<&NullBuffer>,
negated: bool,
) -> BooleanArray {
build_in_list_result(values.len(), nulls, self.null_count > 0, negated, |i| {
// SAFETY: `build_in_list_result` invokes this closure for
// indices in `0..values.len()`.
let needle = unsafe { *values.get_unchecked(i) };
self.check(needle)
})
}
}

impl StaticFilter for UInt16BitmapFilter {
impl<T> StaticFilter for BitmapFilter<T>
where
T: BitmapFilterType,
{
fn null_count(&self) -> usize {
self.null_count
}

fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
handle_dictionary!(self, v, negated);
let v = v.as_primitive_opt::<UInt16Type>().ok_or_else(|| {
exec_datafusion_err!("UInt16BitmapFilter: expected UInt16 array")
let v = v.as_primitive_opt::<T>().ok_or_else(|| {
exec_datafusion_err!("BitmapFilter: expected {} array", T::DATA_TYPE)
})?;
let input_values = v.values();
Ok(build_in_list_result(
Expand Down Expand Up @@ -364,9 +375,6 @@ macro_rules! primitive_static_filter {
};
}

// Generate specialized filters for all integer primitive types
primitive_static_filter!(Int8StaticFilter, Int8Type);
primitive_static_filter!(Int16StaticFilter, Int16Type);
primitive_static_filter!(Int32StaticFilter, Int32Type);
primitive_static_filter!(Int64StaticFilter, Int64Type);
primitive_static_filter!(UInt32StaticFilter, UInt32Type);
Expand Down Expand Up @@ -406,7 +414,7 @@ mod tests {
#[test]
fn bitmap_filter_u8_handles_nulls() -> Result<()> {
let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)]));
let filter = UInt8BitmapFilter::try_new(&haystack)?;
let filter = BitmapFilter::<UInt8Type>::try_new(&haystack)?;
let needles = UInt8Array::from(vec![Some(1), Some(2), None, Some(3)]);

assert_contains(&filter, &needles, vec![Some(true), None, None, Some(true)])?;
Expand All @@ -421,7 +429,7 @@ mod tests {
#[test]
fn bitmap_filter_u8_handles_dictionary_needles() -> Result<()> {
let haystack: ArrayRef = Arc::new(UInt8Array::from(vec![Some(1), None, Some(3)]));
let filter = UInt8BitmapFilter::try_new(&haystack)?;
let filter = BitmapFilter::<UInt8Type>::try_new(&haystack)?;

let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]);
let values = Arc::new(UInt8Array::from(vec![Some(1), Some(2), Some(3)]));
Expand All @@ -438,7 +446,7 @@ mod tests {
Some(1024),
Some(u16::MAX),
]));
let filter = UInt16BitmapFilter::try_new(&haystack)?;
let filter = BitmapFilter::<UInt16Type>::try_new(&haystack)?;
let needles =
UInt16Array::from(vec![Some(0), Some(1), Some(1024), Some(u16::MAX), None]);

Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-expr/src/expressions/in_list/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, UInt8Type, UInt16Type};
use datafusion_common::Result;

use super::array_static_filter::ArrayStaticFilter;
use super::primitive_filter::*;
use super::static_filter::StaticFilter;
use super::transform::make_bitmap_filter;

pub(super) fn instantiate_static_filter(
in_array: ArrayRef,
Expand All @@ -37,13 +38,10 @@ pub(super) fn instantiate_static_filter(
_ => in_array,
};
match in_array.data_type() {
// Integer primitive types
DataType::Int8 => Ok(Arc::new(Int8StaticFilter::try_new(&in_array)?)),
DataType::Int16 => Ok(Arc::new(Int16StaticFilter::try_new(&in_array)?)),
DataType::Int8 | DataType::UInt8 => make_bitmap_filter::<UInt8Type>(&in_array),
DataType::Int16 | DataType::UInt16 => make_bitmap_filter::<UInt16Type>(&in_array),
DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)),
DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)),
DataType::UInt8 => Ok(Arc::new(UInt8BitmapFilter::try_new(&in_array)?)),
DataType::UInt16 => Ok(Arc::new(UInt16BitmapFilter::try_new(&in_array)?)),
DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)),
DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)),
// Float primitive types (use ordered wrappers for Hash/Eq)
Expand Down
Loading
Loading