Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions rust/sedona-query-planner/src/ensure_loaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use sedona_common::sedona_internal_err;
use sedona_expr::scalar_udf::SedonaScalarUDF;
use sedona_schema::datatypes::SedonaType;

use crate::restore_metadata::RESTORE_METADATA_NAME;

/// `SedonaScalarUDF` metadata key marking a UDF whose kernels read raster
/// pixel bytes. Duplicated from `sedona_raster_functions` (the owner),
/// which this crate can't depend on — keep the literal in sync with
Expand Down Expand Up @@ -213,11 +215,22 @@ fn wrap_for_loading(arg: Expr, ensure_loaded_udf: &Arc<ScalarUDF>) -> Expr {
})
}

/// True if `expr` is already wrapped with `rs_ensureloaded`, either
/// directly or through an alias (e.g. `rs_ensureloaded(rast) AS loaded`).
/// True if `expr` is already wrapped with `rs_ensureloaded`, looking
/// through the wrappers that sit between this rule's fixpoint passes:
///
/// - an alias, e.g. `rs_ensureloaded(rast) AS loaded`;
/// - the `sd_restore_metadata(...)` wrapper that [`WrapAsyncUdfRule`]
/// stamps onto the async `rs_ensureloaded` call. That rule runs between
/// our passes, so on the next pass the argument is no longer a bare
/// `rs_ensureloaded(...)` but `sd_restore_metadata(rs_ensureloaded(...))`.
/// Without seeing through it the guard would re-inject another wrapper
/// every pass, producing a tower of unresolved async calls.
fn is_loaded_wrap(expr: &Expr) -> bool {
match expr {
Expr::ScalarFunction(sf) => sf.func.name() == "rs_ensureloaded",
Expr::ScalarFunction(sf) if sf.func.name() == "rs_ensureloaded" => true,
Expr::ScalarFunction(sf) if sf.func.name() == RESTORE_METADATA_NAME => {
sf.args.first().is_some_and(is_loaded_wrap)
}
Expr::Alias(alias) => is_loaded_wrap(&alias.expr),
_ => false,
}
Expand Down Expand Up @@ -402,6 +415,39 @@ mod tests {
);
}

#[test]
fn idempotent_through_restore_metadata_wrapper() {
// Models the cross-rule fixpoint: after the first pass wraps the arg,
// WrapAsyncUdfRule re-stamps it as `sd_restore_metadata(rs_ensureloaded(rast))`
// (aliased back to the original name). A later pass must recognise this
// as already-loaded and not inject another wrapper — otherwise the
// async calls tower up and only the innermost is ever extracted, so the
// rest are invoked synchronously and the query fails at runtime.
use crate::restore_metadata::restore_metadata_udf;
use std::collections::HashMap;

let schema = raster_schema_named("rast");
let udf = fake_ensure_loaded_udf();

let restore = restore_metadata_udf(HashMap::new());
let wrapped = Expr::ScalarFunction(ScalarFunction {
func: restore,
args: vec![wrap_for_loading(col("rast"), &udf)],
})
.alias("rs_ensureloaded(rast)");

let call = Expr::ScalarFunction(ScalarFunction {
func: needs_bytes_udf("rs_mock"),
args: vec![wrapped],
});
let out = rewrite(call, &schema, &udf);
assert_eq!(
count_ensure_loaded(&out),
1,
"arg already wrapped as sd_restore_metadata(rs_ensureloaded(..)) must not be re-wrapped: {out:?}"
);
}

#[test]
fn registers_before_cse_with_wrap_async_between() {
use crate::optimizer::register_ensure_loaded_optimizer;
Expand Down
Loading