diff --git a/Cargo.lock b/Cargo.lock index a9aef94f1..b263df67c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6142,8 +6142,11 @@ dependencies = [ "arrow-schema", "async-trait", "criterion", + "datafusion", "datafusion-common", + "datafusion-common-runtime", "datafusion-expr", + "futures", "lru 0.18.0", "sedona-common", "sedona-expr", diff --git a/c/sedona-gdal/src/dyn_load.rs b/c/sedona-gdal/src/dyn_load.rs index 9b8612ada..f3ceb3d1c 100644 --- a/c/sedona-gdal/src/dyn_load.rs +++ b/c/sedona-gdal/src/dyn_load.rs @@ -157,6 +157,9 @@ fn load_all_symbols(lib: &Library, api: &mut SedonaGdalApi) -> Result<(), GdalIn load_fn!(lib, api, VSIFileFromMemBuffer); load_fn!(lib, api, VSIFCloseL); load_fn!(lib, api, VSIUnlink); + load_fn!(lib, api, VSIOpenDir); + load_fn!(lib, api, VSIGetNextDirEntry); + load_fn!(lib, api, VSICloseDir); load_fn!(lib, api, VSIGetMemFileBuffer); load_fn!(lib, api, VSIFree); load_fn!(lib, api, VSIMalloc); diff --git a/c/sedona-gdal/src/gdal.rs b/c/sedona-gdal/src/gdal.rs index 481f9d273..3547c20b4 100644 --- a/c/sedona-gdal/src/gdal.rs +++ b/c/sedona-gdal/src/gdal.rs @@ -171,6 +171,17 @@ impl Gdal { vsi::get_vsi_mem_file_bytes_owned(self.api, file_name) } + /// Open a VSI directory for iteration. + /// See also [`vsi::open_dir`]. + pub fn open_vsi_dir( + &self, + path: &str, + recurse_depth: i32, + options: Option<&crate::cpl::CslStringList>, + ) -> Result { + crate::vsi::open_dir(self.api, path, recurse_depth, options) + } + // -- Raster operations --------------------------------------------------- /// Create a bare in-memory MEM dataset with GDAL-owned bands. diff --git a/c/sedona-gdal/src/gdal_dyn_bindgen.rs b/c/sedona-gdal/src/gdal_dyn_bindgen.rs index 1a63f4cd5..87a83be0e 100644 --- a/c/sedona-gdal/src/gdal_dyn_bindgen.rs +++ b/c/sedona-gdal/src/gdal_dyn_bindgen.rs @@ -43,6 +43,24 @@ pub type OGRLayerH = *mut c_void; pub type OGRFeatureH = *mut c_void; pub type OGRFieldDefnH = *mut c_void; pub type VSILFILE = *mut c_void; +pub type VSIDIR = c_void; + +#[repr(C)] +pub struct VSIDIREntry { + pub pszName: *const c_char, + pub nMode: c_int, + pub bModeKnown: c_int, + pub nSize: vsi_l_offset, + pub bSizeKnown: c_int, + pub nMTime: GIntBig, + pub bMTimeKnown: c_int, +} + +pub type vsi_l_offset = u64; +pub type GIntBig = i64; + +pub const VSI_S_IFMT: c_int = 0o170000; +pub const VSI_S_IFREG: c_int = 0o100000; // --- Enum types --- @@ -459,6 +477,15 @@ pub(crate) struct SedonaGdalApi { >, pub VSIFCloseL: Option c_int>, pub VSIUnlink: Option c_int>, + pub VSIOpenDir: Option< + unsafe extern "C" fn( + pszPath: *const c_char, + nRecurseDepth: c_int, + papszOptions: *const *const c_char, + ) -> *mut VSIDIR, + >, + pub VSIGetNextDirEntry: Option *const VSIDIREntry>, + pub VSICloseDir: Option, pub VSIGetMemFileBuffer: Option< unsafe extern "C" fn( pszFilename: *const c_char, diff --git a/c/sedona-gdal/src/vsi.rs b/c/sedona-gdal/src/vsi.rs index f1684eebd..9a51682f3 100644 --- a/c/sedona-gdal/src/vsi.rs +++ b/c/sedona-gdal/src/vsi.rs @@ -301,3 +301,77 @@ mod tests { .unwrap(); } } + +pub struct VsiDirEntry { + pub name: String, + pub mode: Option, + pub size: Option, + pub mtime: Option, +} + +pub struct VsiDir { + api: &'static crate::gdal_api::GdalApi, + handle: *mut crate::gdal_dyn_bindgen::VSIDIR, +} + +impl VsiDir { + pub fn next_entry(&mut self) -> Option { + let entry = unsafe { (self.api.inner.VSIGetNextDirEntry?)(self.handle) }; + if entry.is_null() { + return None; + } + let entry = unsafe { &*entry }; + + let name = if entry.pszName.is_null() { + String::new() + } else { + unsafe { std::ffi::CStr::from_ptr(entry.pszName) } + .to_string_lossy() + .into_owned() + }; + + Some(VsiDirEntry { + name, + mode: (entry.bModeKnown != 0).then_some(entry.nMode), + size: (entry.bSizeKnown != 0).then_some(entry.nSize), + mtime: (entry.bMTimeKnown != 0).then_some(entry.nMTime), + }) + } +} + +impl Iterator for VsiDir { + type Item = VsiDirEntry; + + fn next(&mut self) -> Option { + self.next_entry() + } +} + +impl Drop for VsiDir { + fn drop(&mut self) { + if !self.handle.is_null() { + if let Some(close) = self.api.inner.VSICloseDir { + unsafe { close(self.handle) }; + } + self.handle = std::ptr::null_mut(); + } + } +} + +pub fn open_dir( + api: &'static crate::gdal_api::GdalApi, + path: &str, + recurse_depth: i32, + options: Option<&crate::cpl::CslStringList>, +) -> crate::errors::Result { + let c_path = std::ffi::CString::new(path)?; + let options_ptr: *const *const std::os::raw::c_char = options + .map(|opts| opts.as_ptr() as *const *const std::os::raw::c_char) + .unwrap_or(std::ptr::null()); + let handle = + unsafe { (api.inner.VSIOpenDir.unwrap())(c_path.as_ptr(), recurse_depth, options_ptr) }; + if handle.is_null() { + return Err(api.last_null_pointer_err("VSIOpenDir")); + } + Ok(VsiDir { api, handle }) +} diff --git a/docs/reference/sql/rs_geotiff_tiles.qmd b/docs/reference/sql/rs_geotiff_tiles.qmd new file mode 100644 index 000000000..4a419b936 --- /dev/null +++ b/docs/reference/sql/rs_geotiff_tiles.qmd @@ -0,0 +1,51 @@ +--- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +title: rs_geotiff_tiles +description: Reads a GeoTIFF file or directory of GeoTIFF files as tile rows. +kernels: + - returns: table + args: + - name: path + type: string + - returns: table + args: + - name: path + type: string + - name: recursive + type: bool +--- + +## Description + +`rs_geotiff_tiles()` reads a GeoTIFF file, or a directory of GeoTIFF files, and +returns one row per internal GDAL block. Each row contains the source `path`, +the zero-based tile indices `x` and `y`, and an out-db `rast` value pointing +back to the source file. + +## Examples + +```sql +SELECT path, x, y +FROM rs_geotiff_tiles('../../../submodules/sedona-testing/data/raster/test4.tiff'); +``` + +```sql +SELECT RS_MetaData(rast) +FROM rs_geotiff_tiles('../../../submodules/sedona-testing/data/raster/test4.tiff'); +``` diff --git a/rust/sedona-raster-gdal/Cargo.toml b/rust/sedona-raster-gdal/Cargo.toml index c014d6761..c85bbc751 100644 --- a/rust/sedona-raster-gdal/Cargo.toml +++ b/rust/sedona-raster-gdal/Cargo.toml @@ -35,8 +35,11 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } +datafusion = { workspace = true, default_features = false } datafusion-common = { workspace = true } +datafusion-common-runtime = { workspace = true } datafusion-expr = { workspace = true } +futures = { workspace = true } lru = { workspace = true } sedona-common = { workspace = true } sedona-expr = { workspace = true } @@ -63,3 +66,8 @@ path = "benches/rs_frompath.rs" harness = false name = "rs_metadata" path = "benches/rs_metadata.rs" + +[[bench]] +harness = false +name = "rs_geotiff_tiles" +path = "benches/rs_geotiff_tiles.rs" diff --git a/rust/sedona-raster-gdal/benches/rs_geotiff_tiles.rs b/rust/sedona-raster-gdal/benches/rs_geotiff_tiles.rs new file mode 100644 index 000000000..b7e2c0b4d --- /dev/null +++ b/rust/sedona-raster-gdal/benches/rs_geotiff_tiles.rs @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for rs_geotiff_tiles. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use datafusion::catalog::TableProvider; +use sedona_gdal::driver::DriverManager; +use sedona_gdal::global::with_global_gdal_api; +use sedona_gdal::raster::types::Buffer; +use tempfile::TempDir; + +fn write_test_geotiff(dir: &TempDir, name: &str) -> String { + let path = dir.path().join(name); + let path_str = path.to_string_lossy().to_string(); + with_global_gdal_api(|api| { + let driver = DriverManager::get_driver_by_name(api, "GTiff").unwrap(); + let dataset = driver + .create_with_band_type::(&path_str, 10, 10, 1) + .unwrap(); + dataset + .set_geo_transform(&[0.0, 1.0, 0.0, 10.0, 0.0, -1.0]) + .unwrap(); + let band = dataset.rasterband(1).unwrap(); + let mut buffer = Buffer::new((10, 10), (0..100u8).collect::>()); + band.write((0, 0), (10, 10), &mut buffer).unwrap(); + }) + .unwrap(); + path_str +} + +fn provider_schema() -> SchemaRef { + sedona_raster_gdal::rs_geotiff_tiles::GeoTiffTilesProvider::try_new("/tmp".to_string(), false) + .unwrap() + .schema() +} + +fn bench_rs_geotiff_tiles(c: &mut Criterion) { + let tmp = TempDir::new().unwrap(); + let path = write_test_geotiff(&tmp, "bench.tiff"); + let schema = Arc::new(provider_schema()); + let mut group = c.benchmark_group("rs_geotiff_tiles"); + group.throughput(Throughput::Elements(1)); + group.bench_with_input( + BenchmarkId::new("fixture", "test4.tiff"), + &path, + |b, input| { + b.iter(|| { + black_box( + sedona_raster_gdal::rs_geotiff_tiles::build_batch_for_file( + input, + (*schema).clone(), + ) + .unwrap(), + ) + }) + }, + ); + group.finish(); +} + +criterion_group!(benches, bench_rs_geotiff_tiles); +criterion_main!(benches); diff --git a/rust/sedona-raster-gdal/src/lib.rs b/rust/sedona-raster-gdal/src/lib.rs index 6682d41f6..d1e0e3ef7 100644 --- a/rust/sedona-raster-gdal/src/lib.rs +++ b/rust/sedona-raster-gdal/src/lib.rs @@ -34,6 +34,7 @@ mod gdal_dataset_provider; mod raster_loader; mod rs_frompath; +pub mod rs_geotiff_tiles; mod rs_metadata; mod source_uri; mod utils; @@ -45,6 +46,7 @@ pub use gdal_common::{ }; pub use raster_loader::{GdalLoader, GDAL_FORMAT}; pub use rs_frompath::rs_frompath_udf; +pub use rs_geotiff_tiles::rs_geotiff_tiles_udtf; pub use rs_metadata::rs_metadata_udf; pub use utils::{ append_as_indb_raster, append_as_outdb_raster, append_nd_from_dataset, dataset_to_indb_raster, diff --git a/rust/sedona-raster-gdal/src/rs_geotiff_tiles.rs b/rust/sedona-raster-gdal/src/rs_geotiff_tiles.rs new file mode 100644 index 000000000..594e02620 --- /dev/null +++ b/rust/sedona-raster-gdal/src/rs_geotiff_tiles.rs @@ -0,0 +1,634 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! rs_geotiff_tiles UDTF + +use std::any::Any; +use std::path::Path; +use std::sync::Arc; + +use arrow_array::{builder::StringBuilder, builder::UInt32Builder, ArrayRef, RecordBatch}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::TableFunctionImpl; +use datafusion::execution::context::TaskContext; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::expressions::Column; +use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, +}; +use datafusion::{ + common::{plan_err, Result}, + datasource::TableType, + physical_expr::EquivalenceProperties, + physical_plan::PlanProperties, + prelude::Expr, +}; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, ScalarValue}; +use datafusion_common_runtime::SpawnedTask; +use futures::{StreamExt, TryStreamExt}; +use sedona_gdal::gdal_dyn_bindgen::{VSI_S_IFMT, VSI_S_IFREG}; +use sedona_gdal::spatial_ref::SpatialRef; +use sedona_raster::builder::RasterBuilder; +use sedona_raster::traits::{BandMetadata, RasterMetadata}; +use sedona_schema::raster::StorageType; + +use crate::gdal_common::{ + convert_gdal_err, gdal_to_band_data_type, nodata_f64_to_bytes, normalize_outdb_source_path, + open_gdal_dataset, with_gdal, +}; + +pub fn rs_geotiff_tiles_udtf() -> Arc { + Arc::new(RsGeoTiffTilesFunction {}) +} + +#[derive(Debug)] +pub struct RsGeoTiffTilesFunction {} + +impl TableFunctionImpl for RsGeoTiffTilesFunction { + fn call(&self, exprs: &[Expr]) -> Result> { + if exprs.is_empty() || exprs.len() > 2 { + return plan_err!( + "rs_geotiff_tiles() expected 1 or 2 arguments (path[, recursive]) but got {}", + exprs.len() + ); + } + + let dir = match &exprs[0] { + Expr::Literal(ScalarValue::Utf8(Some(s)), _) => s.clone(), + Expr::Literal(ScalarValue::Utf8View(Some(s)), _) => s.to_string(), + Expr::Literal(ScalarValue::LargeUtf8(Some(s)), _) => s.clone(), + other => { + return plan_err!("rs_geotiff_tiles() expected literal string path but got {other}") + } + }; + + let recursive = if exprs.len() == 2 { + match &exprs[1] { + Expr::Literal(ScalarValue::Boolean(Some(v)), _) => *v, + other => { + return plan_err!( + "rs_geotiff_tiles() expected literal boolean recursive but got {other}" + ) + } + } + } else { + false + }; + + Ok(Arc::new(GeoTiffTilesProvider::try_new(dir, recursive)?)) + } +} + +#[derive(Debug)] +pub struct GeoTiffTilesProvider { + dir: String, + recursive: bool, + schema: SchemaRef, +} + +impl GeoTiffTilesProvider { + pub fn try_new(dir: String, recursive: bool) -> Result { + let rast_field = sedona_schema::datatypes::RASTER + .to_storage_field("rast", false) + .map_err(|e| exec_datafusion_err!("{e}"))?; + let schema = Schema::new(vec![ + Field::new("path", DataType::Utf8, false), + Field::new("x", DataType::UInt32, false), + Field::new("y", DataType::UInt32, false), + rast_field, + ]); + + Ok(Self { + dir, + recursive, + schema: Arc::new(schema), + }) + } +} + +#[async_trait] +impl datafusion::catalog::TableProvider for GeoTiffTilesProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn datafusion::catalog::Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let exec = Arc::new(GeoTiffTilesExec::new( + self.dir.clone(), + self.recursive, + self.schema.clone(), + )); + + if let Some(projection) = projection { + let schema = self.schema(); + let exprs: Vec<_> = projection + .iter() + .map(|index| -> (Arc, String) { + let name = schema.field(*index).name(); + (Arc::new(Column::new(name, *index)), name.clone()) + }) + .collect(); + Ok(Arc::new(ProjectionExec::try_new(exprs, exec)?)) + } else { + Ok(exec) + } + } +} + +#[derive(Debug)] +struct GeoTiffTilesExec { + dir: String, + recursive: bool, + schema: SchemaRef, + properties: PlanProperties, +} + +impl GeoTiffTilesExec { + fn new(dir: String, recursive: bool, schema: SchemaRef) -> Self { + let properties = PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + + Self { + dir, + recursive, + schema, + properties, + } + } +} + +impl DisplayAs for GeoTiffTilesExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "GeoTiffTilesExec: path='{}', recursive={}", + self.dir, self.recursive + ) + } +} + +impl ExecutionPlan for GeoTiffTilesExec { + fn name(&self) -> &str { + "GeoTiffTilesExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + Vec::new() + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let schema_worker = self.schema.clone(); + let schema_empty = self.schema.clone(); + let schema_adapter = self.schema.clone(); + let dir = self.dir.clone(); + let recursive = self.recursive; + + let paths = list_geotiffs(&dir, recursive)?; + + let stream = futures::stream::iter(paths) + .map(move |path| { + let schema = schema_worker.clone(); + SpawnedTask::spawn_blocking(move || build_batch_for_file(path, schema)) + }) + .buffered(4) + .map(move |res| match res { + Ok(Ok(Some(batch))) => Ok(batch), + Ok(Ok(None)) => Ok(RecordBatch::new_empty(schema_empty.clone())), + Ok(Err(e)) => Err(e), + Err(e) => Err(exec_datafusion_err!("Task failed: {e}")), + }) + .try_filter(|batch| futures::future::ready(batch.num_rows() > 0)); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema_adapter, + Box::pin(stream), + ))) + } +} + +pub fn build_batch_for_file( + path: impl AsRef, + schema: SchemaRef, +) -> Result> { + let path_str = path.as_ref().to_string_lossy().to_string(); + with_gdal(|gdal| { + let ds = open_gdal_dataset(gdal, &path_str, None) + .map_err(|e| exec_datafusion_err!("Failed to open GeoTIFF {path_str}: {e}"))?; + let (width, height) = ds.raster_size(); + + let band_count = ds.raster_count(); + if band_count == 0 { + return Ok(None); + } + + let band1 = ds + .rasterband(1) + .map_err(|e| exec_datafusion_err!("Failed to get band 1 for {path_str}: {e}"))?; + let (block_x, block_y) = band1.block_size(); + let block_x = block_x.max(1) as u32; + let block_y = block_y.max(1) as u32; + + let tiles_x = div_ceil_u32(width as u32, block_x); + let tiles_y = div_ceil_u32(height as u32, block_y); + + let geotransform = ds + .geo_transform() + .map_err(|e| exec_datafusion_err!("Failed to get geotransform for {path_str}: {e}"))?; + + let base_metadata = RasterMetadata { + width: width as i64, + height: height as i64, + upperleft_x: geotransform[0], + upperleft_y: geotransform[3], + scale_x: geotransform[1], + scale_y: geotransform[5], + skew_x: geotransform[2], + skew_y: geotransform[4], + }; + + let crs = ds + .spatial_ref() + .ok() + .and_then(|sr: SpatialRef| sr.to_projjson().ok()); + + let total_tiles = (tiles_x * tiles_y) as usize; + let mut path_builder = + StringBuilder::with_capacity(total_tiles, total_tiles * path_str.len()); + let mut x_builder = UInt32Builder::with_capacity(total_tiles); + let mut y_builder = UInt32Builder::with_capacity(total_tiles); + let mut rast_builder = RasterBuilder::new(total_tiles); + + for tile_y in 0..tiles_y { + for tile_x in 0..tiles_x { + let px = tile_x * block_x; + let py = tile_y * block_y; + + let tw = (width as u32).saturating_sub(px).min(block_x); + let th = (height as u32).saturating_sub(py).min(block_y); + if tw == 0 || th == 0 { + continue; + } + + let tile_ulx = base_metadata.upperleft_x + + (px as f64) * base_metadata.scale_x + + (py as f64) * base_metadata.skew_x; + let tile_uly = base_metadata.upperleft_y + + (px as f64) * base_metadata.skew_y + + (py as f64) * base_metadata.scale_y; + + let tile_metadata = RasterMetadata { + width: tw as i64, + height: th as i64, + upperleft_x: tile_ulx, + upperleft_y: tile_uly, + scale_x: base_metadata.scale_x, + scale_y: base_metadata.scale_y, + skew_x: base_metadata.skew_x, + skew_y: base_metadata.skew_y, + }; + + path_builder.append_value(&path_str); + x_builder.append_value(tile_x); + y_builder.append_value(tile_y); + + rast_builder + .start_raster(&tile_metadata, crs.as_deref()) + .map_err(|e| { + exec_datafusion_err!( + "Failed to start raster for {path_str} tile ({tile_x},{tile_y}): {e}" + ) + })?; + + for band_idx in 1..=band_count { + let band = ds.rasterband(band_idx).map_err(|e| { + exec_datafusion_err!("Failed to get band {band_idx} for {path_str}: {e}") + })?; + + let gdal_type = band.band_type(); + let band_data_type = gdal_to_band_data_type(gdal_type).map_err(|_| { + exec_datafusion_err!( + "Unsupported band data type {gdal_type:?} for {path_str} band {band_idx}" + ) + })?; + + let nodata_bytes = band + .no_data_value() + .map(|v| nodata_f64_to_bytes(v, &band_data_type)); + + let band_metadata = BandMetadata { + nodata_value: nodata_bytes, + storage_type: StorageType::OutDbRef, + datatype: band_data_type, + outdb_url: Some(path_str.clone()), + outdb_band_id: Some(band_idx as u32), + }; + + rast_builder.start_band(band_metadata).map_err(|e| { + exec_datafusion_err!("Failed to start band {band_idx} for {path_str}: {e}") + })?; + + rast_builder.band_data_writer().append_value([]); + + rast_builder.finish_band().map_err(|e| { + exec_datafusion_err!("Failed to finish band {band_idx} for {path_str}: {e}") + })?; + } + + rast_builder.finish_raster().map_err(|e| { + exec_datafusion_err!( + "Failed to finish raster for {path_str} tile ({tile_x},{tile_y}): {e}" + ) + })?; + } + } + + let rast_array: ArrayRef = Arc::new( + rast_builder + .finish() + .map_err(|e| exec_datafusion_err!("Failed to build rasters: {e}"))?, + ); + let path_array: ArrayRef = Arc::new(path_builder.finish()); + let x_array: ArrayRef = Arc::new(x_builder.finish()); + let y_array: ArrayRef = Arc::new(y_builder.finish()); + + let batch = RecordBatch::try_new(schema, vec![path_array, x_array, y_array, rast_array]) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(Some(batch)) + }) +} + +fn list_geotiffs(path: &str, recursive: bool) -> Result> { + let normalized_path = normalize_outdb_source_path(path); + + if with_gdal(|gdal| Ok(open_gdal_dataset(gdal, &normalized_path, None).is_ok()))? { + if !is_geotiff_path_str(&normalized_path) { + return exec_err!("rs_geotiff_tiles(): path is not a GeoTIFF file: {path}"); + } + return Ok(vec![normalized_path]); + } + + let recurse_depth = if recursive { -1 } else { 0 }; + let list_result = with_gdal(|gdal| { + let separator = directory_separator_for_path(&normalized_path); + let mut dir = gdal + .open_vsi_dir(&normalized_path, recurse_depth, None) + .map_err(convert_gdal_err)?; + + let mut out = Vec::new(); + for entry in &mut dir { + let Some(mode) = entry.mode else { + continue; + }; + + if (mode & VSI_S_IFMT) != VSI_S_IFREG { + continue; + } + + let child_path = join_vsi_path(&normalized_path, separator, &entry.name); + if is_geotiff_path_str(&child_path) { + out.push(child_path); + } + } + + out.sort(); + Ok(out) + }); + + match list_result { + Ok(paths) => Ok(paths), + Err(_) if is_geotiff_path_str(&normalized_path) => { + exec_err!("rs_geotiff_tiles(): failed to open GeoTIFF file: {path}") + } + Err(_) => exec_err!("rs_geotiff_tiles(): path is not a GeoTIFF file or directory: {path}"), + } +} + +fn directory_separator_for_path(path: &str) -> &'static str { + if path.starts_with("/vsi") || path.contains("://") { + "/" + } else if path.contains('\\') { + "\\" + } else { + "/" + } +} + +fn join_vsi_path(base: &str, separator: &str, child_name: &str) -> String { + if base.ends_with(separator) { + format!("{base}{child_name}") + } else { + format!("{base}{separator}{child_name}") + } +} + +fn is_geotiff_path_str(path: &str) -> bool { + let path_without_fragment = path.split('#').next().unwrap_or(path); + let path_without_query = path_without_fragment + .split('?') + .next() + .unwrap_or(path_without_fragment); + let file_name = path_without_query + .rsplit(['/', '\\']) + .next() + .unwrap_or(path_without_query); + match file_name.rsplit_once('.') { + Some((_, ext)) => ext.eq_ignore_ascii_case("tif") || ext.eq_ignore_ascii_case("tiff"), + None => false, + } +} + +fn div_ceil_u32(n: u32, d: u32) -> u32 { + if d == 0 { + return 0; + } + n.div_ceil(d) +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::catalog::TableProvider; + use datafusion::prelude::SessionContext; + use sedona_gdal::raster::types::Buffer; + use std::path::PathBuf; + use tempfile::tempdir; + + fn write_test_geotiff(base: &Path, name: &str) -> PathBuf { + let path = base.join(name); + let path_str = path.to_string_lossy().to_string(); + with_gdal(|gdal| { + let driver = gdal.get_driver_by_name("GTiff").unwrap(); + let dataset = driver + .create_with_band_type::(&path_str, 10, 10, 1) + .unwrap(); + dataset + .set_geo_transform(&[0.0, 1.0, 0.0, 10.0, 0.0, -1.0]) + .unwrap(); + let band = dataset.rasterband(1).unwrap(); + let mut buffer = Buffer::new((10, 10), (0..100u8).collect::>()); + band.write((0, 0), (10, 10), &mut buffer).unwrap(); + Ok::<_, datafusion_common::DataFusionError>(()) + }) + .unwrap(); + path + } + + #[tokio::test] + async fn udtf_registration_smoke() { + let ctx = SessionContext::new(); + ctx.register_udtf("rs_geotiff_tiles", rs_geotiff_tiles_udtf()); + } + + #[test] + fn list_geotiffs_non_recursive() { + let tmp = tempdir().unwrap(); + let base = tmp.path(); + let file_path = write_test_geotiff(base, "a.tif"); + + let files = list_geotiffs(file_path.to_str().unwrap(), false).unwrap(); + assert_eq!(files.len(), 1); + assert!(files[0].ends_with("a.tif")); + } + + #[test] + fn list_geotiffs_file_input_returns_single() { + let tmp = tempdir().unwrap(); + let base = tmp.path(); + let file_path = write_test_geotiff(base, "single.tiff"); + + let files = list_geotiffs(file_path.to_str().unwrap(), true).unwrap(); + assert_eq!(files.len(), 1); + assert_eq!(files[0], file_path.to_string_lossy().to_string()); + } + + #[test] + fn list_geotiffs_file_input_non_tiff_errors() { + let tmp = tempdir().unwrap(); + let base = tmp.path(); + let file_path = base.join("single.txt"); + std::fs::write(&file_path, b"not a real tiff").unwrap(); + + let err = list_geotiffs(file_path.to_str().unwrap(), false).unwrap_err(); + assert!(err + .to_string() + .contains("rs_geotiff_tiles(): path is not a GeoTIFF file")); + } + + #[test] + fn helper_join_vsi_path_and_extension_filtering() { + assert_eq!( + join_vsi_path("/vsis3/bucket/prefix", "/", "x.tif"), + "/vsis3/bucket/prefix/x.tif" + ); + assert_eq!( + join_vsi_path("/vsis3/bucket/prefix/", "/", "x.tif"), + "/vsis3/bucket/prefix/x.tif" + ); + + assert!(is_geotiff_path_str("/tmp/a.tif")); + assert!(is_geotiff_path_str("/tmp/a.TIFF")); + assert!(is_geotiff_path_str("https://host/data.tif?token=abc#f")); + assert!(!is_geotiff_path_str("/tmp/a.txt")); + assert!(!is_geotiff_path_str("/tmp/a")); + + assert_eq!(directory_separator_for_path("/vsis3/bucket/prefix"), "/"); + assert_eq!(directory_separator_for_path("https://host/data.tif"), "/"); + assert_eq!(directory_separator_for_path(r"C:\data\dir"), r"\"); + assert_eq!(directory_separator_for_path("/tmp/data"), "/"); + } + + #[tokio::test] + async fn provider_builds_rows_for_test_raster() { + let tmp = tempdir().unwrap(); + let base = tmp.path(); + + let dst = write_test_geotiff(base, "test4.tiff"); + + let provider = GeoTiffTilesProvider::try_new(base.to_string_lossy().to_string(), false) + .expect("provider created"); + + let batch = build_batch_for_file(dst, provider.schema()) + .expect("build success") + .expect("batch present"); + + assert_eq!(batch.schema().fields().len(), 4); + assert_eq!(batch.num_columns(), 4); + assert!(batch.num_rows() >= 1); + } + + #[test] + fn rast_field_has_raster_metadata() { + let provider = GeoTiffTilesProvider::try_new("/tmp".to_string(), false).unwrap(); + let schema = provider.schema(); + let rast_field = schema.field_with_name("rast").unwrap(); + let sedona_type = sedona_schema::datatypes::SedonaType::from_storage_field(rast_field) + .expect("sedona type"); + assert_eq!(sedona_type, sedona_schema::datatypes::RASTER); + assert_eq!( + rast_field + .metadata() + .get("ARROW:extension:name") + .map(|s| s.as_str()), + Some("sedona.raster") + ); + } +} diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 74edde0de..bfbcfc67e 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -290,6 +290,10 @@ impl SedonaContext { ); out.register_function_set(sedona_raster_gdal::register::default_function_set()); + out.ctx.register_udtf( + "rs_geotiff_tiles", + sedona_raster_gdal::rs_geotiff_tiles_udtf(), + ); // Always register default function set out.register_function_set(sedona_functions::register::default_function_set());