From 51b63a1fe0ba0d7bc1d2e20ab8c6b42242b4f36e Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Fri, 2 Jan 2026 23:49:03 -0800 Subject: [PATCH 01/16] feat: add s3 write support --- README.md | 20 +++ spatialbench-cli/Cargo.toml | 3 +- spatialbench-cli/src/generate.rs | 83 ++++++++++++ spatialbench-cli/src/main.rs | 37 +++++ spatialbench-cli/src/output_plan.rs | 38 ++++-- spatialbench-cli/src/parquet.rs | 92 +++++++++++++ spatialbench-cli/src/plan.rs | 10 ++ spatialbench-cli/src/runner.rs | 22 ++- spatialbench-cli/src/s3_writer.rs | 201 ++++++++++++++++++++++++++++ 9 files changed, 491 insertions(+), 15 deletions(-) create mode 100644 spatialbench-cli/src/s3_writer.rs diff --git a/README.md b/README.md index 9020cf3..dcedb75 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,26 @@ spatialbench-cli --scale-factor 1 --mb-per-file 256 --output-dir sf1-parquet spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir sf10-parquet ``` +#### Generate Data Directly to S3 + +You can generate data directly to Amazon S3 or S3-compatible storage by providing an S3 URI as the output directory: + +```bash +# Set AWS credentials +export AWS_ACCESS_KEY_ID="your-access-key" +export AWS_SECRET_ACCESS_KEY="your-secret-key" +export AWS_REGION="us-west-2" # Must match your bucket's region + +# Generate to S3 +spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir s3://my-bucket/spatialbench/sf10 + +# For S3-compatible services (MinIO, etc.) +export AWS_ENDPOINT="http://localhost:9000" +spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data +``` + +The S3 writer uses streaming multipart upload, buffering data in 32MB chunks before uploading parts. This ensures memory-efficient generation even for large datasets. All output formats (Parquet, CSV, TBL) are supported, and the generated files are byte-for-byte identical to local generation. + #### Custom Spider Configuration You can override these defaults at runtime by passing a YAML file via the `--config` flag: diff --git a/spatialbench-cli/Cargo.toml b/spatialbench-cli/Cargo.toml index a3c8e87..34fb346 100644 --- a/spatialbench-cli/Cargo.toml +++ b/spatialbench-cli/Cargo.toml @@ -24,10 +24,11 @@ serde = { version = "1.0.219", features = ["derive"] } anyhow = "1.0.99" serde_yaml = "0.9.33" datafusion = "50.2" -object_store = { version = "0.12.4", features = ["http"] } +object_store = { version = "0.12.4", features = ["http", "aws"] } arrow-array = "56" arrow-schema = "56" url = "2.5.7" +bytes = "1.10.1" [dev-dependencies] assert_cmd = "2.0" diff --git a/spatialbench-cli/src/generate.rs b/spatialbench-cli/src/generate.rs index bbe3cef..2d4cba3 100644 --- a/spatialbench-cli/src/generate.rs +++ b/spatialbench-cli/src/generate.rs @@ -36,6 +36,15 @@ pub trait Sink: Send { fn flush(self) -> Result<(), io::Error>; } +/// Async version of Sink for writers that need async finalization (like S3Writer) +pub trait AsyncSink: Send { + /// Write all data from the buffer to the sink + fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>; + + /// Complete and flush any remaining data from the sink (async) + fn async_flush(self) -> impl std::future::Future> + Send; +} + /// Generates data in parallel from a series of [`Source`] and writes to a [`Sink`] /// /// Each [`Source`] is a data generator that generates data directly into an in @@ -135,6 +144,80 @@ where writer_task.await.expect("writer task panicked") } +/// Generates data in parallel from a series of [`Source`] and writes to an [`AsyncSink`] +/// +/// This is similar to generate_in_chunks but handles async finalization for S3Writer +pub async fn generate_in_chunks_async( + mut sink: S, + sources: I, + num_threads: usize, +) -> Result<(), io::Error> +where + G: Source + 'static, + I: Iterator, + S: AsyncSink + 'static, +{ + let recycler = BufferRecycler::new(); + let mut sources = sources.peekable(); + + debug!("Using {num_threads} threads (async sink)"); + + let (tx, mut rx) = tokio::sync::mpsc::channel(num_threads); + + // write the header + let Some(first) = sources.peek() else { + return Ok(()); // no sources + }; + let header = first.header(Vec::new()); + tx.send(header) + .await + .expect("tx just created, it should not be closed"); + + let sources_and_recyclers = sources.map(|generator| (generator, recycler.clone())); + + let mut stream = futures::stream::iter(sources_and_recyclers) + .map(async |(source, recycler)| { + let buffer = recycler.new_buffer(1024 * 1024 * 8); + let mut join_set = JoinSet::new(); + join_set.spawn(async move { source.create(buffer) }); + join_set + .join_next() + .await + .expect("had one item") + .expect("join_next join is infallible unless task panics") + }) + .buffered(num_threads) + .map(async |buffer| { + if let Err(e) = tx.send(buffer).await { + debug!("Error sending buffer to writer: {e}"); + } + }); + + let captured_recycler = recycler.clone(); + let writer_task = tokio::task::spawn(async move { + while let Some(buffer) = rx.recv().await { + sink.sink(&buffer)?; + captured_recycler.return_buffer(buffer); + } + // No more input, flush the sink asynchronously + sink.async_flush().await + }); + + // drive the stream to completion + while let Some(write_task) = stream.next().await { + if writer_task.is_finished() { + debug!("writer task is done early, stopping writer"); + break; + } + write_task.await; + } + drop(stream); + drop(tx); + + debug!("waiting for writer task to complete"); + writer_task.await.expect("writer task panicked") +} + /// A simple buffer recycler to avoid allocating new buffers for each part /// /// Clones share the same underlying recycler, so it is not thread safe diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs index e1b5c4c..8994652 100644 --- a/spatialbench-cli/src/main.rs +++ b/spatialbench-cli/src/main.rs @@ -11,6 +11,7 @@ mod output_plan; mod parquet; mod plan; mod runner; +mod s3_writer; mod spatial_config_file; mod statistics; mod tbl; @@ -384,6 +385,13 @@ impl IntoSize for BufWriter { } } +impl IntoSize for s3_writer::S3Writer { + fn into_size(self) -> Result { + // Return the buffer size before finishing + Ok(self.buffer_size()) + } +} + /// Wrapper around a buffer writer that counts the number of buffers and bytes written struct WriterSink { statistics: WriteStatistics, @@ -410,3 +418,32 @@ impl Sink for WriterSink { self.inner.flush() } } + +/// Async wrapper for S3Writer to handle async finalization +pub struct AsyncWriterSink { + statistics: WriteStatistics, + inner: s3_writer::S3Writer, +} + +impl AsyncWriterSink { + pub fn new(inner: s3_writer::S3Writer) -> Self { + Self { + inner, + statistics: WriteStatistics::new("buffers"), + } + } +} + +impl generate::AsyncSink for AsyncWriterSink { + fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error> { + self.statistics.increment_chunks(1); + self.statistics.increment_bytes(buffer.len()); + self.inner.write_all(buffer) + } + + async fn async_flush(mut self) -> Result<(), io::Error> { + self.inner.flush()?; + self.inner.finish().await?; + Ok(()) + } +} diff --git a/spatialbench-cli/src/output_plan.rs b/spatialbench-cli/src/output_plan.rs index 75e441b..5e8a969 100644 --- a/spatialbench-cli/src/output_plan.rs +++ b/spatialbench-cli/src/output_plan.rs @@ -18,6 +18,8 @@ pub enum OutputLocation { File(PathBuf), /// Output to stdout Stdout, + /// Output to S3 + S3(String), } impl Display for OutputLocation { @@ -31,6 +33,10 @@ impl Display for OutputLocation { write!(f, "{}", file.to_string_lossy()) } OutputLocation::Stdout => write!(f, "Stdout"), + OutputLocation::S3(uri) => { + // Display the S3 URI + write!(f, "{}", uri) + } } } } @@ -265,17 +271,31 @@ impl OutputPlanGenerator { OutputFormat::Parquet => "parquet", }; - let mut output_path = self.output_dir.clone(); - if let Some(part) = part { - // If a partition is specified, create a subdirectory for it - output_path.push(table.to_string()); - self.ensure_directory_exists(&output_path)?; - output_path.push(format!("{table}.{part}.{extension}")); + // Check if output_dir is an S3 URI + let output_dir_str = self.output_dir.to_string_lossy(); + if output_dir_str.starts_with("s3://") { + // Handle S3 path + let base_uri = output_dir_str.trim_end_matches('/'); + let s3_uri = if let Some(part) = part { + format!("{base_uri}/{table}/{table}.{part}.{extension}") + } else { + format!("{base_uri}/{table}.{extension}") + }; + Ok(OutputLocation::S3(s3_uri)) } else { - // No partition specified, output to a single file - output_path.push(format!("{table}.{extension}")); + // Handle local filesystem path + let mut output_path = self.output_dir.clone(); + if let Some(part) = part { + // If a partition is specified, create a subdirectory for it + output_path.push(table.to_string()); + self.ensure_directory_exists(&output_path)?; + output_path.push(format!("{table}.{part}.{extension}")); + } else { + // No partition specified, output to a single file + output_path.push(format!("{table}.{extension}")); + } + Ok(OutputLocation::File(output_path)) } - Ok(OutputLocation::File(output_path)) } } diff --git a/spatialbench-cli/src/parquet.rs b/spatialbench-cli/src/parquet.rs index 53b60f4..d9f7222 100644 --- a/spatialbench-cli/src/parquet.rs +++ b/spatialbench-cli/src/parquet.rs @@ -1,5 +1,6 @@ //! Parquet output format +use crate::s3_writer::S3Writer; use crate::statistics::WriteStatistics; use arrow::datatypes::SchemaRef; use futures::StreamExt; @@ -124,6 +125,97 @@ where Ok(()) } +/// Converts a set of RecordBatchIterators into a Parquet file for S3Writer +/// +/// This is a specialized version that handles S3Writer's async finalization +pub async fn generate_parquet_s3( + writer: S3Writer, + iter_iter: I, + num_threads: usize, + parquet_compression: Compression, +) -> Result<(), io::Error> +where + I: Iterator + 'static, +{ + debug!( + "Generating Parquet for S3 with {num_threads} threads, using {parquet_compression} compression" + ); + let mut iter_iter = iter_iter.peekable(); + + // get schema from the first iterator + let Some(first_iter) = iter_iter.peek() else { + return Ok(()); + }; + let schema = Arc::clone(first_iter.schema()); + + // Compute the parquet schema + let writer_properties = WriterProperties::builder() + .set_compression(parquet_compression) + .build(); + let writer_properties = Arc::new(writer_properties); + let parquet_schema = Arc::new( + ArrowSchemaConverter::new() + .with_coerce_types(writer_properties.coerce_types()) + .convert(&schema) + .unwrap(), + ); + + // create a stream that computes the data for each row group + let mut row_group_stream = futures::stream::iter(iter_iter) + .map(async |iter| { + let parquet_schema = Arc::clone(&parquet_schema); + let writer_properties = Arc::clone(&writer_properties); + let schema = Arc::clone(&schema); + tokio::task::spawn(async move { + encode_row_group(parquet_schema, writer_properties, schema, iter) + }) + .await + .expect("Inner task panicked") + }) + .buffered(num_threads); + + let root_schema = parquet_schema.root_schema_ptr(); + let writer_properties_captured = Arc::clone(&writer_properties); + let (tx, mut rx): ( + Sender>, + Receiver>, + ) = tokio::sync::mpsc::channel(num_threads); + + let writer_task = tokio::task::spawn_blocking(move || { + let mut statistics = WriteStatistics::new("row groups"); + let mut writer = + SerializedFileWriter::new(writer, root_schema, writer_properties_captured).unwrap(); + + while let Some(chunks) = rx.blocking_recv() { + let mut row_group_writer = writer.next_row_group().unwrap(); + for chunk in chunks { + chunk.append_to_row_group(&mut row_group_writer).unwrap(); + } + row_group_writer.close().unwrap(); + statistics.increment_chunks(1); + } + // Return the S3Writer for async upload + let s3_writer = writer.into_inner()?; + Ok((s3_writer, statistics)) as Result<(S3Writer, WriteStatistics), io::Error> + }); + + // Drive the input stream + while let Some(chunks) = row_group_stream.next().await { + if let Err(e) = tx.send(chunks).await { + debug!("Error sending chunks to writer: {e}"); + break; + } + } + drop(tx); + + // Wait for writer task and upload to S3 + let (s3_writer, mut statistics) = writer_task.await??; + let size = s3_writer.finish().await?; + statistics.increment_bytes(size); + + Ok(()) +} + /// Creates the data for a particular row group /// /// Note at the moment it does not use multiple tasks/threads but it could diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs index fceace9..1a38c1d 100644 --- a/spatialbench-cli/src/plan.rs +++ b/spatialbench-cli/src/plan.rs @@ -60,6 +60,16 @@ pub struct GenerationPlan { pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 128 * 1024 * 1024; +/// Buffer size for Parquet writing (32MB) +/// +/// This buffer size is used for: +/// - Local file writing with BufWriter +/// - S3 multipart upload parts +/// +/// The 32MB size provides good performance and is well above the AWS S3 +/// minimum part size requirement of 5MB for multipart uploads. +pub const PARQUET_BUFFER_SIZE: usize = 32 * 1024 * 1024; + impl GenerationPlan { /// Returns a GenerationPlan number of parts to generate /// diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs index 882d1cf..8ab881c 100644 --- a/spatialbench-cli/src/runner.rs +++ b/spatialbench-cli/src/runner.rs @@ -1,11 +1,12 @@ //! [`PlanRunner`] for running [`OutputPlan`]s. use crate::csv::*; -use crate::generate::{generate_in_chunks, Source}; +use crate::generate::{generate_in_chunks, generate_in_chunks_async, Source}; use crate::output_plan::{OutputLocation, OutputPlan}; -use crate::parquet::generate_parquet; +use crate::parquet::{generate_parquet, generate_parquet_s3}; +use crate::s3_writer::S3Writer; use crate::tbl::*; -use crate::{OutputFormat, Table, WriterSink}; +use crate::{AsyncWriterSink, OutputFormat, Table, WriterSink}; use log::{debug, info}; use spatialbench::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, @@ -201,6 +202,12 @@ where })?; Ok(()) } + OutputLocation::S3(uri) => { + info!("Writing to S3: {}", uri); + let s3_writer = S3Writer::new(uri)?; + let sink = AsyncWriterSink::new(s3_writer); + generate_in_chunks_async(sink, sources, num_threads).await + } } } @@ -211,7 +218,7 @@ where { match plan.output_location() { OutputLocation::Stdout => { - let writer = BufWriter::with_capacity(32 * 1024 * 1024, io::stdout()); // 32MB buffer + let writer = BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, io::stdout()); generate_parquet(writer, sources, num_threads, plan.parquet_compression()).await } OutputLocation::File(path) => { @@ -225,7 +232,7 @@ where let file = std::fs::File::create(&temp_path).map_err(|err| { io::Error::other(format!("Failed to create {temp_path:?}: {err}")) })?; - let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 32MB buffer + let writer = BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, file); generate_parquet(writer, sources, num_threads, plan.parquet_compression()).await?; // rename the temp file to the final path std::fs::rename(&temp_path, path).map_err(|e| { @@ -235,6 +242,11 @@ where })?; Ok(()) } + OutputLocation::S3(uri) => { + info!("Writing parquet to S3: {}", uri); + let s3_writer = S3Writer::new(uri)?; + generate_parquet_s3(s3_writer, sources, num_threads, plan.parquet_compression()).await + } } } diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs new file mode 100644 index 0000000..9d8ea40 --- /dev/null +++ b/spatialbench-cli/src/s3_writer.rs @@ -0,0 +1,201 @@ +//! S3 writer support for writing generated data directly to S3 + +use crate::plan::PARQUET_BUFFER_SIZE; +use bytes::Bytes; +use log::{debug, info}; +use object_store::aws::AmazonS3Builder; +use object_store::path::Path as ObjectPath; +use object_store::ObjectStore; +use std::io::{self, Write}; +use std::sync::Arc; +use url::Url; + +/// Minimum part size enforced by AWS S3 for multipart uploads (except last part) +const S3_MIN_PART_SIZE: usize = 5 * 1024 * 1024; // 5MB + +/// A writer that buffers data parts in memory and uploads to S3 when finished +/// +/// This implementation avoids nested runtime issues by deferring all async +/// operations to the finish() method. Parts are accumulated in memory during +/// write() calls and uploaded in a batch during finish(). +pub struct S3Writer { + /// The S3 client + client: Arc, + /// The path in S3 to write to + path: ObjectPath, + /// Current buffer for accumulating data + buffer: Vec, + /// Completed parts ready for upload (each is at least MIN_PART_SIZE) + parts: Vec, + /// Total bytes written + total_bytes: usize, +} + +impl S3Writer { + /// Create a new S3 writer for the given S3 URI + /// + /// The URI should be in the format: s3://bucket/path/to/object + /// + /// Authentication is handled through AWS environment variables: + /// - AWS_ACCESS_KEY_ID + /// - AWS_SECRET_ACCESS_KEY + /// - AWS_REGION (optional, defaults to us-east-1) + /// - AWS_SESSION_TOKEN (optional, for temporary credentials) + /// - AWS_ENDPOINT (optional, for S3-compatible services) + pub fn new(uri: &str) -> Result { + let url = Url::parse(uri).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("Invalid S3 URI: {}", e), + ) + })?; + + if url.scheme() != "s3" { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Expected s3:// URI, got: {}", url.scheme()), + )); + } + + let bucket = url.host_str().ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "S3 URI missing bucket name") + })?; + + let path = url.path().trim_start_matches('/'); + + debug!( + "Creating S3 streaming writer for bucket: {}, path: {}", + bucket, path + ); + + // Build the S3 client using environment variables + let mut builder = AmazonS3Builder::new().with_bucket_name(bucket); + + // Try to get credentials from environment variables + if let Ok(access_key) = std::env::var("AWS_ACCESS_KEY_ID") { + builder = builder.with_access_key_id(access_key); + } + + if let Ok(secret_key) = std::env::var("AWS_SECRET_ACCESS_KEY") { + builder = builder.with_secret_access_key(secret_key); + } + + if let Ok(region) = std::env::var("AWS_REGION") { + builder = builder.with_region(region); + } + + if let Ok(session_token) = std::env::var("AWS_SESSION_TOKEN") { + builder = builder.with_token(session_token); + } + + if let Ok(endpoint) = std::env::var("AWS_ENDPOINT") { + builder = builder.with_endpoint(endpoint); + } + + let client = builder + .build() + .map_err(|e| io::Error::other(format!("Failed to create S3 client: {}", e)))?; + + info!( + "S3 streaming writer created successfully for bucket: {}", + bucket + ); + + Ok(Self { + client: Arc::new(client), + path: ObjectPath::from(path), + buffer: Vec::with_capacity(S3_MIN_PART_SIZE), + parts: Vec::new(), + total_bytes: 0, + }) + } + + /// Complete the upload by sending all buffered data to S3 + /// + /// This method performs all async operations, uploading parts and completing + /// the multipart upload. It must be called from an async context. + pub async fn finish(mut self) -> Result { + debug!("Completing S3 upload: {} bytes total", self.total_bytes); + + // Add any remaining buffer data as the final part + if !self.buffer.is_empty() { + self.parts + .push(Bytes::from(std::mem::take(&mut self.buffer))); + } + + // Handle small files with simple PUT + if self.parts.len() == 1 && self.parts[0].len() < S3_MIN_PART_SIZE { + debug!( + "Using simple PUT for small file: {} bytes", + self.total_bytes + ); + let data = self.parts.into_iter().next().unwrap(); + self.client + .put(&self.path, data.into()) + .await + .map_err(|e| io::Error::other(format!("Failed to upload to S3: {}", e)))?; + info!("Successfully uploaded {} bytes to S3", self.total_bytes); + return Ok(self.total_bytes); + } + + // Use multipart upload for larger files + debug!("Starting multipart upload for {} parts", self.parts.len()); + let mut upload = + self.client.put_multipart(&self.path).await.map_err(|e| { + io::Error::other(format!("Failed to start multipart upload: {}", e)) + })?; + + // Upload all parts + for (i, part_data) in self.parts.into_iter().enumerate() { + debug!("Uploading part {} ({} bytes)", i + 1, part_data.len()); + upload + .put_part(part_data.into()) + .await + .map_err(|e| io::Error::other(format!("Failed to upload part {}: {}", i + 1, e)))?; + } + + // Complete the multipart upload + upload + .complete() + .await + .map_err(|e| io::Error::other(format!("Failed to complete multipart upload: {}", e)))?; + + info!( + "Successfully uploaded {} bytes to S3 using multipart upload", + self.total_bytes + ); + Ok(self.total_bytes) + } + + /// Get the total bytes written so far + pub fn total_bytes(&self) -> usize { + self.total_bytes + } + + /// Get the buffer size (for compatibility) + pub fn buffer_size(&self) -> usize { + self.total_bytes + } +} + +impl Write for S3Writer { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.total_bytes += buf.len(); + self.buffer.extend_from_slice(buf); + + // When buffer reaches our target part size (32MB), save it as a completed part + // No async operations here - we just move data to the parts vec + if self.buffer.len() >= PARQUET_BUFFER_SIZE { + let part_data = + std::mem::replace(&mut self.buffer, Vec::with_capacity(PARQUET_BUFFER_SIZE)); + self.parts.push(Bytes::from(part_data)); + } + + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + // No-op: all data will be uploaded in finish() + Ok(()) + } +} From 13ae2bdf8b63a5d807429ee3d7ed2d9b0d6f0f3c Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 14:03:33 -0800 Subject: [PATCH 02/16] fix: add license header to s3_writer.rs and fix typo in plan.rs --- spatialbench-cli/src/plan.rs | 2 +- spatialbench-cli/src/s3_writer.rs | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs index 2ac0f3d..f0d8c5e 100644 --- a/spatialbench-cli/src/plan.rs +++ b/spatialbench-cli/src/plan.rs @@ -217,7 +217,7 @@ impl GenerationPlan { }) } - /// Return the number of part(ititions) this plan will generate + /// Return the number of part(ition)s this plan will generate pub fn chunk_count(&self) -> usize { self.part_list.clone().count() } diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index 9d8ea40..55f8dfa 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! S3 writer support for writing generated data directly to S3 use crate::plan::PARQUET_BUFFER_SIZE; From aa2c0c5e2f94c1a9d1f81acf6644964189472a85 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 14:03:56 -0800 Subject: [PATCH 03/16] fix: skip local dir creation for S3 output paths --- spatialbench-cli/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs index c10e0b8..021e643 100644 --- a/spatialbench-cli/src/main.rs +++ b/spatialbench-cli/src/main.rs @@ -253,8 +253,9 @@ impl Cli { debug!("Logging configured from environment variables"); } - // Create output directory if it doesn't exist and we are not writing to stdout. - if !self.stdout { + // Create output directory if it doesn't exist and we are not writing to stdout + // or to S3 (where local directories are meaningless). + if !self.stdout && !self.output_dir.to_string_lossy().starts_with("s3://") { fs::create_dir_all(&self.output_dir)?; } From 85e571c2367b4eeba1a8d5c97ea8f6a379ab1c6a Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 14:06:28 -0800 Subject: [PATCH 04/16] refactor: use from_env() and share S3 client across files --- spatialbench-cli/src/output_plan.rs | 45 ++++++++-- spatialbench-cli/src/runner.rs | 9 +- spatialbench-cli/src/s3_writer.rs | 133 +++++++++++++--------------- 3 files changed, 105 insertions(+), 82 deletions(-) diff --git a/spatialbench-cli/src/output_plan.rs b/spatialbench-cli/src/output_plan.rs index f3bf771..ac60014 100644 --- a/spatialbench-cli/src/output_plan.rs +++ b/spatialbench-cli/src/output_plan.rs @@ -20,23 +20,33 @@ //! * [`OutputPlanGenerator`]: plans the output files to be generated use crate::plan::GenerationPlan; +use crate::s3_writer::{build_s3_client, parse_s3_uri}; use crate::{OutputFormat, Table}; use log::debug; +use object_store::ObjectStore; use parquet::basic::Compression; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::io; use std::path::PathBuf; +use std::sync::Arc; /// Where a partition will be output -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub enum OutputLocation { /// Output to a file File(PathBuf), /// Output to stdout Stdout, - /// Output to S3 - S3(String), + /// Output to S3 with a shared client + S3 { + /// The full S3 URI for this object (e.g. `s3://bucket/path/to/file.parquet`) + uri: String, + /// The object path within the bucket (e.g. `path/to/file.parquet`) + path: String, + /// Shared S3 client for the bucket + client: Arc, + }, } impl Display for OutputLocation { @@ -50,16 +60,13 @@ impl Display for OutputLocation { write!(f, "{}", file.to_string_lossy()) } OutputLocation::Stdout => write!(f, "Stdout"), - OutputLocation::S3(uri) => { - // Display the S3 URI - write!(f, "{}", uri) - } + OutputLocation::S3 { uri, .. } => write!(f, "{}", uri), } } } /// Describes an output partition (file) that will be generated -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub struct OutputPlan { /// The table table: Table, @@ -157,6 +164,8 @@ pub struct OutputPlanGenerator { /// Output directories that have been created so far /// (used to avoid creating the same directory multiple times) created_directories: HashSet, + /// Shared S3 client, lazily created on first S3 output location + s3_client: Option>, } impl OutputPlanGenerator { @@ -177,6 +186,7 @@ impl OutputPlanGenerator { output_dir, output_plans: Vec::new(), created_directories: HashSet::new(), + s3_client: None, } } @@ -298,7 +308,24 @@ impl OutputPlanGenerator { } else { format!("{base_uri}/{table}.{extension}") }; - Ok(OutputLocation::S3(s3_uri)) + + // Lazily build the S3 client on first use, then reuse it + let client = if let Some(ref client) = self.s3_client { + Arc::clone(client) + } else { + let (bucket, _) = parse_s3_uri(&s3_uri)?; + let client = build_s3_client(&bucket)?; + self.s3_client = Some(Arc::clone(&client)); + client + }; + + let (_, path) = parse_s3_uri(&s3_uri)?; + + Ok(OutputLocation::S3 { + uri: s3_uri, + path, + client, + }) } else { // Handle local filesystem path let mut output_path = self.output_dir.clone(); diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs index f175e1a..64e5747 100644 --- a/spatialbench-cli/src/runner.rs +++ b/spatialbench-cli/src/runner.rs @@ -33,6 +33,7 @@ use spatialbench_arrow::{ }; use std::io; use std::io::BufWriter; +use std::sync::Arc; use tokio::task::{JoinError, JoinSet}; /// Runs multiple [`OutputPlan`]s in parallel, managing the number of threads @@ -219,9 +220,9 @@ where })?; Ok(()) } - OutputLocation::S3(uri) => { + OutputLocation::S3 { uri, path, client } => { info!("Writing to S3: {}", uri); - let s3_writer = S3Writer::new(uri)?; + let s3_writer = S3Writer::with_client(Arc::clone(client), path); let sink = AsyncWriterSink::new(s3_writer); generate_in_chunks_async(sink, sources, num_threads).await } @@ -259,9 +260,9 @@ where })?; Ok(()) } - OutputLocation::S3(uri) => { + OutputLocation::S3 { uri, path, client } => { info!("Writing parquet to S3: {}", uri); - let s3_writer = S3Writer::new(uri)?; + let s3_writer = S3Writer::with_client(Arc::clone(client), path); generate_parquet_s3(s3_writer, sources, num_threads, plan.parquet_compression()).await } } diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index 55f8dfa..836e51a 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -30,6 +30,51 @@ use url::Url; /// Minimum part size enforced by AWS S3 for multipart uploads (except last part) const S3_MIN_PART_SIZE: usize = 5 * 1024 * 1024; // 5MB +/// Parse an S3 URI into its (bucket, path) components. +/// +/// The URI should be in the format: `s3://bucket/path/to/object` +pub fn parse_s3_uri(uri: &str) -> Result<(String, String), io::Error> { + let url = Url::parse(uri).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("Invalid S3 URI: {}", e), + ) + })?; + + if url.scheme() != "s3" { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Expected s3:// URI, got: {}", url.scheme()), + )); + } + + let bucket = url + .host_str() + .ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "S3 URI missing bucket name") + })? + .to_string(); + + let path = url.path().trim_start_matches('/').to_string(); + + Ok((bucket, path)) +} + +/// Build an S3 [`ObjectStore`] client for the given bucket using environment variables. +/// +/// Uses [`AmazonS3Builder::from_env`] which reads all standard AWS environment +/// variables including `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, +/// `AWS_DEFAULT_REGION`, `AWS_REGION`, `AWS_SESSION_TOKEN`, `AWS_ENDPOINT`, etc. +pub fn build_s3_client(bucket: &str) -> Result, io::Error> { + debug!("Building S3 client for bucket: {}", bucket); + let client = AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .build() + .map_err(|e| io::Error::other(format!("Failed to create S3 client: {}", e)))?; + info!("S3 client created successfully for bucket: {}", bucket); + Ok(Arc::new(client)) +} + /// A writer that buffers data parts in memory and uploads to S3 when finished /// /// This implementation avoids nested runtime issues by deferring all async @@ -49,82 +94,32 @@ pub struct S3Writer { } impl S3Writer { - /// Create a new S3 writer for the given S3 URI + /// Create a new S3 writer for the given S3 URI, building a fresh client. /// - /// The URI should be in the format: s3://bucket/path/to/object + /// Prefer [`S3Writer::with_client`] when writing multiple files to reuse + /// the same client. /// - /// Authentication is handled through AWS environment variables: - /// - AWS_ACCESS_KEY_ID - /// - AWS_SECRET_ACCESS_KEY - /// - AWS_REGION (optional, defaults to us-east-1) - /// - AWS_SESSION_TOKEN (optional, for temporary credentials) - /// - AWS_ENDPOINT (optional, for S3-compatible services) + /// Authentication is handled through standard AWS environment variables + /// via [`AmazonS3Builder::from_env`]. pub fn new(uri: &str) -> Result { - let url = Url::parse(uri).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidInput, - format!("Invalid S3 URI: {}", e), - ) - })?; - - if url.scheme() != "s3" { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("Expected s3:// URI, got: {}", url.scheme()), - )); - } - - let bucket = url.host_str().ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "S3 URI missing bucket name") - })?; - - let path = url.path().trim_start_matches('/'); - - debug!( - "Creating S3 streaming writer for bucket: {}, path: {}", - bucket, path - ); - - // Build the S3 client using environment variables - let mut builder = AmazonS3Builder::new().with_bucket_name(bucket); - - // Try to get credentials from environment variables - if let Ok(access_key) = std::env::var("AWS_ACCESS_KEY_ID") { - builder = builder.with_access_key_id(access_key); - } - - if let Ok(secret_key) = std::env::var("AWS_SECRET_ACCESS_KEY") { - builder = builder.with_secret_access_key(secret_key); - } - - if let Ok(region) = std::env::var("AWS_REGION") { - builder = builder.with_region(region); - } - - if let Ok(session_token) = std::env::var("AWS_SESSION_TOKEN") { - builder = builder.with_token(session_token); - } - - if let Ok(endpoint) = std::env::var("AWS_ENDPOINT") { - builder = builder.with_endpoint(endpoint); - } - - let client = builder - .build() - .map_err(|e| io::Error::other(format!("Failed to create S3 client: {}", e)))?; - - info!( - "S3 streaming writer created successfully for bucket: {}", - bucket - ); + let (bucket, path) = parse_s3_uri(uri)?; + let client = build_s3_client(&bucket)?; + Ok(Self::with_client(client, &path)) + } - Ok(Self { - client: Arc::new(client), + /// Create a new S3 writer using an existing [`ObjectStore`] client. + /// + /// This avoids creating a new client per file, which is important when + /// generating many partitioned files. + pub fn with_client(client: Arc, path: &str) -> Self { + debug!("Creating S3 writer for path: {}", path); + Self { + client, path: ObjectPath::from(path), - buffer: Vec::with_capacity(S3_MIN_PART_SIZE), + buffer: Vec::with_capacity(PARQUET_BUFFER_SIZE), parts: Vec::new(), total_bytes: 0, - }) + } } /// Complete the upload by sending all buffered data to S3 From 08ec40f215637c50b5ebf9293b030e5c661dc2d6 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 14:20:46 -0800 Subject: [PATCH 05/16] refactor: unify S3 and local write paths --- spatialbench-cli/src/generate.rs | 83 ---------------------------- spatialbench-cli/src/main.rs | 35 ++---------- spatialbench-cli/src/parquet.rs | 92 ------------------------------- spatialbench-cli/src/runner.rs | 12 ++-- spatialbench-cli/src/s3_writer.rs | 7 ++- 5 files changed, 14 insertions(+), 215 deletions(-) diff --git a/spatialbench-cli/src/generate.rs b/spatialbench-cli/src/generate.rs index 77e1d9f..90cba4b 100644 --- a/spatialbench-cli/src/generate.rs +++ b/spatialbench-cli/src/generate.rs @@ -53,15 +53,6 @@ pub trait Sink: Send { fn flush(self) -> Result<(), io::Error>; } -/// Async version of Sink for writers that need async finalization (like S3Writer) -pub trait AsyncSink: Send { - /// Write all data from the buffer to the sink - fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>; - - /// Complete and flush any remaining data from the sink (async) - fn async_flush(self) -> impl std::future::Future> + Send; -} - /// Generates data in parallel from a series of [`Source`] and writes to a [`Sink`] /// /// Each [`Source`] is a data generator that generates data directly into an in @@ -161,80 +152,6 @@ where writer_task.await.expect("writer task panicked") } -/// Generates data in parallel from a series of [`Source`] and writes to an [`AsyncSink`] -/// -/// This is similar to generate_in_chunks but handles async finalization for S3Writer -pub async fn generate_in_chunks_async( - mut sink: S, - sources: I, - num_threads: usize, -) -> Result<(), io::Error> -where - G: Source + 'static, - I: Iterator, - S: AsyncSink + 'static, -{ - let recycler = BufferRecycler::new(); - let mut sources = sources.peekable(); - - debug!("Using {num_threads} threads (async sink)"); - - let (tx, mut rx) = tokio::sync::mpsc::channel(num_threads); - - // write the header - let Some(first) = sources.peek() else { - return Ok(()); // no sources - }; - let header = first.header(Vec::new()); - tx.send(header) - .await - .expect("tx just created, it should not be closed"); - - let sources_and_recyclers = sources.map(|generator| (generator, recycler.clone())); - - let mut stream = futures::stream::iter(sources_and_recyclers) - .map(async |(source, recycler)| { - let buffer = recycler.new_buffer(1024 * 1024 * 8); - let mut join_set = JoinSet::new(); - join_set.spawn(async move { source.create(buffer) }); - join_set - .join_next() - .await - .expect("had one item") - .expect("join_next join is infallible unless task panics") - }) - .buffered(num_threads) - .map(async |buffer| { - if let Err(e) = tx.send(buffer).await { - debug!("Error sending buffer to writer: {e}"); - } - }); - - let captured_recycler = recycler.clone(); - let writer_task = tokio::task::spawn(async move { - while let Some(buffer) = rx.recv().await { - sink.sink(&buffer)?; - captured_recycler.return_buffer(buffer); - } - // No more input, flush the sink asynchronously - sink.async_flush().await - }); - - // drive the stream to completion - while let Some(write_task) = stream.next().await { - if writer_task.is_finished() { - debug!("writer task is done early, stopping writer"); - break; - } - write_task.await; - } - drop(stream); - drop(tx); - - debug!("waiting for writer task to complete"); - writer_task.await.expect("writer task panicked") -} - /// A simple buffer recycler to avoid allocating new buffers for each part /// /// Clones share the same underlying recycler, so it is not thread safe diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs index 021e643..736e4cc 100644 --- a/spatialbench-cli/src/main.rs +++ b/spatialbench-cli/src/main.rs @@ -405,8 +405,10 @@ impl IntoSize for BufWriter { impl IntoSize for s3_writer::S3Writer { fn into_size(self) -> Result { - // Return the buffer size before finishing - Ok(self.buffer_size()) + // Complete the S3 upload. This runs inside spawn_blocking, so we can + // use the tokio runtime handle to drive the async finish(). + let handle = tokio::runtime::Handle::current(); + handle.block_on(self.finish()) } } @@ -436,32 +438,3 @@ impl Sink for WriterSink { self.inner.flush() } } - -/// Async wrapper for S3Writer to handle async finalization -pub struct AsyncWriterSink { - statistics: WriteStatistics, - inner: s3_writer::S3Writer, -} - -impl AsyncWriterSink { - pub fn new(inner: s3_writer::S3Writer) -> Self { - Self { - inner, - statistics: WriteStatistics::new("buffers"), - } - } -} - -impl generate::AsyncSink for AsyncWriterSink { - fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error> { - self.statistics.increment_chunks(1); - self.statistics.increment_bytes(buffer.len()); - self.inner.write_all(buffer) - } - - async fn async_flush(mut self) -> Result<(), io::Error> { - self.inner.flush()?; - self.inner.finish().await?; - Ok(()) - } -} diff --git a/spatialbench-cli/src/parquet.rs b/spatialbench-cli/src/parquet.rs index 458ba41..45ffcbd 100644 --- a/spatialbench-cli/src/parquet.rs +++ b/spatialbench-cli/src/parquet.rs @@ -17,7 +17,6 @@ //! Parquet output format -use crate::s3_writer::S3Writer; use crate::statistics::WriteStatistics; use arrow::datatypes::SchemaRef; use futures::StreamExt; @@ -142,97 +141,6 @@ where Ok(()) } -/// Converts a set of RecordBatchIterators into a Parquet file for S3Writer -/// -/// This is a specialized version that handles S3Writer's async finalization -pub async fn generate_parquet_s3( - writer: S3Writer, - iter_iter: I, - num_threads: usize, - parquet_compression: Compression, -) -> Result<(), io::Error> -where - I: Iterator + 'static, -{ - debug!( - "Generating Parquet for S3 with {num_threads} threads, using {parquet_compression} compression" - ); - let mut iter_iter = iter_iter.peekable(); - - // get schema from the first iterator - let Some(first_iter) = iter_iter.peek() else { - return Ok(()); - }; - let schema = Arc::clone(first_iter.schema()); - - // Compute the parquet schema - let writer_properties = WriterProperties::builder() - .set_compression(parquet_compression) - .build(); - let writer_properties = Arc::new(writer_properties); - let parquet_schema = Arc::new( - ArrowSchemaConverter::new() - .with_coerce_types(writer_properties.coerce_types()) - .convert(&schema) - .unwrap(), - ); - - // create a stream that computes the data for each row group - let mut row_group_stream = futures::stream::iter(iter_iter) - .map(async |iter| { - let parquet_schema = Arc::clone(&parquet_schema); - let writer_properties = Arc::clone(&writer_properties); - let schema = Arc::clone(&schema); - tokio::task::spawn(async move { - encode_row_group(parquet_schema, writer_properties, schema, iter) - }) - .await - .expect("Inner task panicked") - }) - .buffered(num_threads); - - let root_schema = parquet_schema.root_schema_ptr(); - let writer_properties_captured = Arc::clone(&writer_properties); - let (tx, mut rx): ( - Sender>, - Receiver>, - ) = tokio::sync::mpsc::channel(num_threads); - - let writer_task = tokio::task::spawn_blocking(move || { - let mut statistics = WriteStatistics::new("row groups"); - let mut writer = - SerializedFileWriter::new(writer, root_schema, writer_properties_captured).unwrap(); - - while let Some(chunks) = rx.blocking_recv() { - let mut row_group_writer = writer.next_row_group().unwrap(); - for chunk in chunks { - chunk.append_to_row_group(&mut row_group_writer).unwrap(); - } - row_group_writer.close().unwrap(); - statistics.increment_chunks(1); - } - // Return the S3Writer for async upload - let s3_writer = writer.into_inner()?; - Ok((s3_writer, statistics)) as Result<(S3Writer, WriteStatistics), io::Error> - }); - - // Drive the input stream - while let Some(chunks) = row_group_stream.next().await { - if let Err(e) = tx.send(chunks).await { - debug!("Error sending chunks to writer: {e}"); - break; - } - } - drop(tx); - - // Wait for writer task and upload to S3 - let (s3_writer, mut statistics) = writer_task.await??; - let size = s3_writer.finish().await?; - statistics.increment_bytes(size); - - Ok(()) -} - /// Creates the data for a particular row group /// /// Note at the moment it does not use multiple tasks/threads but it could diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs index 64e5747..9d49c7e 100644 --- a/spatialbench-cli/src/runner.rs +++ b/spatialbench-cli/src/runner.rs @@ -18,12 +18,12 @@ //! [`PlanRunner`] for running [`OutputPlan`]s. use crate::csv::*; -use crate::generate::{generate_in_chunks, generate_in_chunks_async, Source}; +use crate::generate::{generate_in_chunks, Source}; use crate::output_plan::{OutputLocation, OutputPlan}; -use crate::parquet::{generate_parquet, generate_parquet_s3}; +use crate::parquet::generate_parquet; use crate::s3_writer::S3Writer; use crate::tbl::*; -use crate::{AsyncWriterSink, OutputFormat, Table, WriterSink}; +use crate::{OutputFormat, Table, WriterSink}; use log::{debug, info}; use spatialbench::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, @@ -223,8 +223,8 @@ where OutputLocation::S3 { uri, path, client } => { info!("Writing to S3: {}", uri); let s3_writer = S3Writer::with_client(Arc::clone(client), path); - let sink = AsyncWriterSink::new(s3_writer); - generate_in_chunks_async(sink, sources, num_threads).await + let sink = WriterSink::new(s3_writer); + generate_in_chunks(sink, sources, num_threads).await } } } @@ -263,7 +263,7 @@ where OutputLocation::S3 { uri, path, client } => { info!("Writing parquet to S3: {}", uri); let s3_writer = S3Writer::with_client(Arc::clone(client), path); - generate_parquet_s3(s3_writer, sources, num_threads, plan.parquet_compression()).await + generate_parquet(s3_writer, sources, num_threads, plan.parquet_compression()).await } } } diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index 836e51a..98b28e2 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -50,9 +50,7 @@ pub fn parse_s3_uri(uri: &str) -> Result<(String, String), io::Error> { let bucket = url .host_str() - .ok_or_else(|| { - io::Error::new(io::ErrorKind::InvalidInput, "S3 URI missing bucket name") - })? + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "S3 URI missing bucket name"))? .to_string(); let path = url.path().trim_start_matches('/').to_string(); @@ -101,6 +99,7 @@ impl S3Writer { /// /// Authentication is handled through standard AWS environment variables /// via [`AmazonS3Builder::from_env`]. + #[allow(dead_code)] // used by zone module in a later commit pub fn new(uri: &str) -> Result { let (bucket, path) = parse_s3_uri(uri)?; let client = build_s3_client(&bucket)?; @@ -180,11 +179,13 @@ impl S3Writer { } /// Get the total bytes written so far + #[allow(dead_code)] // used by zone module in a later commit pub fn total_bytes(&self) -> usize { self.total_bytes } /// Get the buffer size (for compatibility) + #[allow(dead_code)] // used by zone module in a later commit pub fn buffer_size(&self) -> usize { self.total_bytes } From 2f99044ab8b27c9f33d22f3096ac069f5f7906b9 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 14:28:12 -0800 Subject: [PATCH 06/16] feat: stream multipart uploads instead of buffering in memory --- spatialbench-cli/src/s3_writer.rs | 304 +++++++++++++++++++++++------- 1 file changed, 240 insertions(+), 64 deletions(-) diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index 98b28e2..f9983c2 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -15,7 +15,14 @@ // specific language governing permissions and limitations // under the License. -//! S3 writer support for writing generated data directly to S3 +//! S3 writer that streams multipart uploads instead of buffering in memory. +//! +//! Data is buffered in 32 MB chunks (matching [`PARQUET_BUFFER_SIZE`]). When a +//! chunk is full it is sent through an [`mpsc`] channel to a background tokio +//! task that uploads it immediately via [`MultipartUpload::put_part`]. This +//! keeps peak memory usage roughly constant regardless of total file size. +//! +//! [`mpsc`]: tokio::sync::mpsc use crate::plan::PARQUET_BUFFER_SIZE; use bytes::Bytes; @@ -25,11 +32,10 @@ use object_store::path::Path as ObjectPath; use object_store::ObjectStore; use std::io::{self, Write}; use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::oneshot; use url::Url; -/// Minimum part size enforced by AWS S3 for multipart uploads (except last part) -const S3_MIN_PART_SIZE: usize = 5 * 1024 * 1024; // 5MB - /// Parse an S3 URI into its (bucket, path) components. /// /// The URI should be in the format: `s3://bucket/path/to/object` @@ -73,22 +79,53 @@ pub fn build_s3_client(bucket: &str) -> Result, io::Error> Ok(Arc::new(client)) } -/// A writer that buffers data parts in memory and uploads to S3 when finished +/// Message sent from the writer thread to the background upload task. +enum UploadMessage { + /// A completed part ready for upload. + Part(Bytes), + /// All parts have been sent; the upload should be completed. + Finish, +} + +/// A writer that streams data to S3 via multipart upload. /// -/// This implementation avoids nested runtime issues by deferring all async -/// operations to the finish() method. Parts are accumulated in memory during -/// write() calls and uploaded in a batch during finish(). +/// Internally, a background tokio task is spawned that starts the multipart +/// upload eagerly and uploads each part as it arrives through a channel. +/// The [`Write`] implementation buffers data in 32 MB chunks and sends +/// completed chunks to the background task via [`mpsc::Sender::blocking_send`] +/// (safe because all callers run inside [`tokio::task::spawn_blocking`]). +/// +/// On [`finish`](S3Writer::finish), any remaining buffered data is sent as the +/// final part, the channel is closed, and we wait for the background task to +/// call `complete()` on the multipart upload. If any part upload fails, the +/// multipart upload is aborted to avoid orphaned uploads accruing S3 storage +/// costs. +/// +/// For small files (< 5 MB total) a simple PUT is used instead of multipart. pub struct S3Writer { - /// The S3 client + /// The S3 client (kept for the small-file PUT fallback) client: Arc, - /// The path in S3 to write to + /// The object path in S3 path: ObjectPath, - /// Current buffer for accumulating data + /// Current buffer for accumulating data before sending as a part buffer: Vec, - /// Completed parts ready for upload (each is at least MIN_PART_SIZE) - parts: Vec, - /// Total bytes written + /// Total bytes written through [`Write::write`] total_bytes: usize, + /// Channel to send parts to the background upload task. + /// + /// Set to `None` after the first part is sent (at which point the + /// background task is spawned and this is replaced by `upload_tx`). + /// Before any parts are sent this is `None` and parts accumulate in + /// `pending_parts` for the small-file optimization. + upload_tx: Option>, + /// Receives the final result (total bytes) from the background upload task. + result_rx: Option>>, + /// Parts accumulated before we decide whether to use simple PUT or + /// multipart upload. Once we exceed [`S3_MIN_PART_SIZE`] total, we switch + /// to the streaming multipart path. + pending_parts: Vec, + /// Whether the streaming multipart upload has been started. + multipart_started: bool, } impl S3Writer { @@ -116,66 +153,115 @@ impl S3Writer { client, path: ObjectPath::from(path), buffer: Vec::with_capacity(PARQUET_BUFFER_SIZE), - parts: Vec::new(), total_bytes: 0, + upload_tx: None, + result_rx: None, + pending_parts: Vec::new(), + multipart_started: false, + } + } + + /// Start the background multipart upload task, draining any pending parts. + /// + /// This is called lazily when we accumulate enough data to exceed the + /// simple-PUT threshold. From this point on, every completed buffer is + /// sent directly to the background task for immediate upload. + fn start_multipart_upload(&mut self) { + debug_assert!(!self.multipart_started, "multipart upload already started"); + self.multipart_started = true; + + // Channel capacity of 2: one part being uploaded, one buffered and ready. + // This keeps memory bounded while allowing overlap between buffering and + // uploading. + let (tx, rx) = mpsc::channel::(2); + let (result_tx, result_rx) = oneshot::channel(); + + let client = Arc::clone(&self.client); + let path = self.path.clone(); + let pending = std::mem::take(&mut self.pending_parts); + + tokio::spawn(async move { + let result = run_multipart_upload(client, path, pending, rx).await; + // Ignore send error — the receiver may have been dropped if the + // writer was abandoned. + let _ = result_tx.send(result); + }); + + self.upload_tx = Some(tx); + self.result_rx = Some(result_rx); + } + + /// Send a completed buffer chunk to the background upload task. + fn send_part(&mut self, part: Bytes) -> io::Result<()> { + if let Some(tx) = &self.upload_tx { + tx.blocking_send(UploadMessage::Part(part)) + .map_err(|_| io::Error::other("Background upload task terminated unexpectedly"))?; } + Ok(()) } - /// Complete the upload by sending all buffered data to S3 + /// Complete the upload by sending any remaining data and waiting for the + /// background task to finish. /// - /// This method performs all async operations, uploading parts and completing - /// the multipart upload. It must be called from an async context. + /// For small files (total data < [`S3_MIN_PART_SIZE`] and fits in a single + /// part), a simple PUT is used instead of multipart upload. + /// + /// This method must be called from an async context (it is typically called + /// via [`block_on`](tokio::runtime::Handle::block_on) from inside + /// [`spawn_blocking`](tokio::task::spawn_blocking)). pub async fn finish(mut self) -> Result { - debug!("Completing S3 upload: {} bytes total", self.total_bytes); + let total = self.total_bytes; + debug!("Completing S3 upload: {} bytes total", total); - // Add any remaining buffer data as the final part + // Flush any remaining buffer data if !self.buffer.is_empty() { - self.parts - .push(Bytes::from(std::mem::take(&mut self.buffer))); + let remaining = Bytes::from(std::mem::take(&mut self.buffer)); + + if self.multipart_started { + // Send as the last part + if let Some(tx) = &self.upload_tx { + tx.send(UploadMessage::Part(remaining)).await.map_err(|_| { + io::Error::other("Background upload task terminated unexpectedly") + })?; + } + } else { + self.pending_parts.push(remaining); + } } - // Handle small files with simple PUT - if self.parts.len() == 1 && self.parts[0].len() < S3_MIN_PART_SIZE { - debug!( - "Using simple PUT for small file: {} bytes", - self.total_bytes - ); - let data = self.parts.into_iter().next().unwrap(); + if self.multipart_started { + // Signal the background task that we are done + if let Some(tx) = self.upload_tx.take() { + let _ = tx.send(UploadMessage::Finish).await; + } + // Wait for the background task result + if let Some(rx) = self.result_rx.take() { + rx.await.map_err(|_| { + io::Error::other("Upload task dropped without sending result") + })??; + } + } else { + // Small file path — use a simple PUT + let data: Vec = self + .pending_parts + .into_iter() + .flat_map(|b| b.to_vec()) + .collect(); + + if data.is_empty() { + debug!("No data to upload"); + return Ok(0); + } + + debug!("Using simple PUT for small file: {} bytes", data.len()); self.client - .put(&self.path, data.into()) + .put(&self.path, Bytes::from(data).into()) .await .map_err(|e| io::Error::other(format!("Failed to upload to S3: {}", e)))?; - info!("Successfully uploaded {} bytes to S3", self.total_bytes); - return Ok(self.total_bytes); - } - - // Use multipart upload for larger files - debug!("Starting multipart upload for {} parts", self.parts.len()); - let mut upload = - self.client.put_multipart(&self.path).await.map_err(|e| { - io::Error::other(format!("Failed to start multipart upload: {}", e)) - })?; - - // Upload all parts - for (i, part_data) in self.parts.into_iter().enumerate() { - debug!("Uploading part {} ({} bytes)", i + 1, part_data.len()); - upload - .put_part(part_data.into()) - .await - .map_err(|e| io::Error::other(format!("Failed to upload part {}: {}", i + 1, e)))?; } - // Complete the multipart upload - upload - .complete() - .await - .map_err(|e| io::Error::other(format!("Failed to complete multipart upload: {}", e)))?; - - info!( - "Successfully uploaded {} bytes to S3 using multipart upload", - self.total_bytes - ); - Ok(self.total_bytes) + info!("Successfully uploaded {} bytes to S3", total); + Ok(total) } /// Get the total bytes written so far @@ -191,17 +277,107 @@ impl S3Writer { } } +/// Background task that runs the multipart upload. +/// +/// Starts the upload, drains any pre-accumulated pending parts, then +/// continuously receives new parts from the channel and uploads them. On +/// any upload error the multipart upload is aborted to avoid orphaned +/// uploads accruing S3 storage costs. +async fn run_multipart_upload( + client: Arc, + path: ObjectPath, + pending_parts: Vec, + mut rx: mpsc::Receiver, +) -> Result<(), io::Error> { + debug!("Starting multipart upload for {:?}", path); + let mut upload = client + .put_multipart(&path) + .await + .map_err(|e| io::Error::other(format!("Failed to start multipart upload: {}", e)))?; + + let mut part_number: usize = 0; + + // Upload any parts that were accumulated before the task started + for part_data in pending_parts { + part_number += 1; + debug!( + "Uploading pending part {} ({} bytes)", + part_number, + part_data.len() + ); + if let Err(e) = upload.put_part(part_data.into()).await { + debug!("Part upload failed, aborting multipart upload"); + let _ = upload.abort().await; + return Err(io::Error::other(format!( + "Failed to upload part {}: {}", + part_number, e + ))); + } + } + + // Receive and upload parts from the channel + while let Some(msg) = rx.recv().await { + match msg { + UploadMessage::Part(part_data) => { + part_number += 1; + debug!("Uploading part {} ({} bytes)", part_number, part_data.len()); + if let Err(e) = upload.put_part(part_data.into()).await { + debug!("Part upload failed, aborting multipart upload"); + let _ = upload.abort().await; + return Err(io::Error::other(format!( + "Failed to upload part {}: {}", + part_number, e + ))); + } + } + UploadMessage::Finish => { + break; + } + } + } + + // Complete the multipart upload + debug!("Completing multipart upload ({} parts)", part_number); + if let Err(e) = upload.complete().await { + debug!("Multipart complete failed, aborting"); + // complete() consumes the upload, so we can't abort here — the upload + // will be cleaned up by S3's lifecycle rules for incomplete uploads. + return Err(io::Error::other(format!( + "Failed to complete multipart upload: {}", + e + ))); + } + + debug!( + "Multipart upload completed successfully ({} parts)", + part_number + ); + Ok(()) +} + impl Write for S3Writer { fn write(&mut self, buf: &[u8]) -> io::Result { self.total_bytes += buf.len(); self.buffer.extend_from_slice(buf); - // When buffer reaches our target part size (32MB), save it as a completed part - // No async operations here - we just move data to the parts vec + // When buffer reaches our target part size (32MB), send it as a part if self.buffer.len() >= PARQUET_BUFFER_SIZE { - let part_data = - std::mem::replace(&mut self.buffer, Vec::with_capacity(PARQUET_BUFFER_SIZE)); - self.parts.push(Bytes::from(part_data)); + let part_data = Bytes::from(std::mem::replace( + &mut self.buffer, + Vec::with_capacity(PARQUET_BUFFER_SIZE), + )); + + if self.multipart_started { + // Stream directly to the background upload task + self.send_part(part_data)?; + } else { + // Accumulate until we know whether this will be a small file + self.pending_parts.push(part_data); + + // We now have at least 32MB, which exceeds the 5MB simple PUT + // threshold — switch to streaming multipart upload + self.start_multipart_upload(); + } } Ok(buf.len()) From f6c859f468cf2e2d64171295957a63100aad5e48 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 14:35:51 -0800 Subject: [PATCH 07/16] feat: add S3 write support to zone table generation --- spatialbench-cli/src/s3_writer.rs | 1 - spatialbench-cli/src/zone/config.rs | 18 +++++++++++ spatialbench-cli/src/zone/mod.rs | 4 +-- spatialbench-cli/src/zone/writer.rs | 46 ++++++++++++++++++++++++++++- 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index f9983c2..67e50e2 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -136,7 +136,6 @@ impl S3Writer { /// /// Authentication is handled through standard AWS environment variables /// via [`AmazonS3Builder::from_env`]. - #[allow(dead_code)] // used by zone module in a later commit pub fn new(uri: &str) -> Result { let (bucket, path) = parse_s3_uri(uri)?; let client = build_s3_client(&bucket)?; diff --git a/spatialbench-cli/src/zone/config.rs b/spatialbench-cli/src/zone/config.rs index 9594c8b..3a30f98 100644 --- a/spatialbench-cli/src/zone/config.rs +++ b/spatialbench-cli/src/zone/config.rs @@ -77,4 +77,22 @@ impl ZoneDfArgs { self.output_dir.join("zone.parquet") } } + + /// Whether the output directory is an S3 URI (starts with `s3://`) + pub fn is_s3(&self) -> bool { + self.output_dir.to_string_lossy().starts_with("s3://") + } + + /// Compute the S3 object key for this zone output. + /// + /// Returns the full S3 URI (e.g. `s3://bucket/prefix/zone.parquet`). + pub fn output_s3_uri(&self) -> String { + let base = self.output_dir.to_string_lossy(); + let base = base.trim_end_matches('/'); + if self.parts.unwrap_or(1) > 1 { + format!("{}/zone/zone.{}.parquet", base, self.part.unwrap_or(1)) + } else { + format!("{}/zone.parquet", base) + } + } } diff --git a/spatialbench-cli/src/zone/mod.rs b/spatialbench-cli/src/zone/mod.rs index 4071454..727add1 100644 --- a/spatialbench-cli/src/zone/mod.rs +++ b/spatialbench-cli/src/zone/mod.rs @@ -59,7 +59,7 @@ pub async fn generate_zone_parquet_single(args: ZoneDfArgs) -> Result<()> { let batches = df.collect().await?; let writer = ParquetWriter::new(&args, &stats, schema); - writer.write(&batches)?; + writer.write(&batches).await?; Ok(()) } @@ -106,7 +106,7 @@ pub async fn generate_zone_parquet_multi(args: ZoneDfArgs) -> Result<()> { ); let writer = ParquetWriter::new(&part_args, &stats, schema.clone()); - writer.write(&partitioned_batches)?; + writer.write(&partitioned_batches).await?; } Ok(()) diff --git a/spatialbench-cli/src/zone/writer.rs b/spatialbench-cli/src/zone/writer.rs index b22671a..5afa038 100644 --- a/spatialbench-cli/src/zone/writer.rs +++ b/spatialbench-cli/src/zone/writer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::s3_writer::S3Writer; use anyhow::Result; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; @@ -52,7 +53,16 @@ impl ParquetWriter { } } - pub fn write(&self, batches: &[RecordBatch]) -> Result<()> { + pub async fn write(&self, batches: &[RecordBatch]) -> Result<()> { + if self.args.is_s3() { + self.write_s3(batches).await + } else { + self.write_local(batches) + } + } + + /// Write batches to a local file using a temp-file + rename pattern. + fn write_local(&self, batches: &[RecordBatch]) -> Result<()> { // Create parent directory of output file (handles both zone/ subdirectory and base dir) let parent_dir = self .output_path @@ -108,4 +118,38 @@ impl ParquetWriter { Ok(()) } + + /// Write batches to S3 using [`S3Writer`]. + /// + /// S3 writes are atomic (via multipart upload `complete()`), so no + /// temp-file or rename is needed. + async fn write_s3(&self, batches: &[RecordBatch]) -> Result<()> { + let uri = self.args.output_s3_uri(); + info!("Writing zone parquet to S3: {}", uri); + + let t0 = Instant::now(); + let s3_writer = S3Writer::new(&uri)?; + let mut writer = ArrowWriter::try_new( + s3_writer, + Arc::clone(&self.schema), + Some(self.props.clone()), + )?; + + for batch in batches { + writer.write(batch)?; + } + + let s3_writer = writer.into_inner()?; + let size = s3_writer.finish().await?; + + let duration = t0.elapsed(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + info!( + "Zone -> {} (part {:?}/{:?}). write={:?}, total_rows={}, bytes={}", + uri, self.args.part, self.args.parts, duration, total_rows, size + ); + + Ok(()) + } } From 354b22cd347706f43718d959e952df43294ad4a6 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 17:23:02 -0800 Subject: [PATCH 08/16] test: add unit tests for S3 writer and URI parsing --- spatialbench-cli/src/s3_writer.rs | 147 ++++++++++++++++++++++++++++ spatialbench-cli/src/zone/config.rs | 69 +++++++++++++ 2 files changed, 216 insertions(+) diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index 67e50e2..f2a2d4b 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -387,3 +387,150 @@ impl Write for S3Writer { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use object_store::memory::InMemory; + + // ---- parse_s3_uri tests ---- + + #[test] + fn parse_s3_uri_valid() { + let (bucket, path) = parse_s3_uri("s3://my-bucket/path/to/file.parquet").unwrap(); + assert_eq!(bucket, "my-bucket"); + assert_eq!(path, "path/to/file.parquet"); + } + + #[test] + fn parse_s3_uri_nested_path() { + let (bucket, path) = parse_s3_uri("s3://bucket/a/b/c/d/file.parquet").unwrap(); + assert_eq!(bucket, "bucket"); + assert_eq!(path, "a/b/c/d/file.parquet"); + } + + #[test] + fn parse_s3_uri_no_path() { + let (bucket, path) = parse_s3_uri("s3://bucket").unwrap(); + assert_eq!(bucket, "bucket"); + assert_eq!(path, ""); + } + + #[test] + fn parse_s3_uri_trailing_slash() { + let (bucket, path) = parse_s3_uri("s3://bucket/prefix/").unwrap(); + assert_eq!(bucket, "bucket"); + assert_eq!(path, "prefix/"); + } + + #[test] + fn parse_s3_uri_wrong_scheme() { + let err = parse_s3_uri("https://bucket/path").unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + assert!(err.to_string().contains("Expected s3://")); + } + + #[test] + fn parse_s3_uri_invalid_uri() { + let err = parse_s3_uri("not a uri at all").unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + assert!(err.to_string().contains("Invalid S3 URI")); + } + + // ---- S3Writer tests using InMemory object store ---- + + #[tokio::test] + async fn write_small_file() { + let store = Arc::new(InMemory::new()); + let mut writer = S3Writer::with_client(store.clone(), "output/test.parquet"); + + let data = b"hello world"; + writer.write_all(data).unwrap(); + + let total = writer.finish().await.unwrap(); + assert_eq!(total, data.len()); + + // Verify the data arrived in the store + let result = store + .get(&ObjectPath::from("output/test.parquet")) + .await + .unwrap(); + let stored = result.bytes().await.unwrap(); + assert_eq!(stored.as_ref(), data); + } + + #[tokio::test] + async fn write_empty_file() { + let store = Arc::new(InMemory::new()); + let writer = S3Writer::with_client(store.clone(), "output/empty.parquet"); + + let total = writer.finish().await.unwrap(); + assert_eq!(total, 0); + + // Nothing should be written to the store + let result = store.get(&ObjectPath::from("output/empty.parquet")).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn write_large_file_triggers_multipart() { + let store = Arc::new(InMemory::new()); + let mut writer = S3Writer::with_client(store.clone(), "output/large.parquet"); + + // Write more than PARQUET_BUFFER_SIZE (32MB) to trigger multipart + let chunk = vec![0xABu8; 1024 * 1024]; // 1MB chunks + let num_chunks = 34; // 34MB total > 32MB threshold + for _ in 0..num_chunks { + writer.write_all(&chunk).unwrap(); + } + + let total = writer.finish().await.unwrap(); + assert_eq!(total, num_chunks * chunk.len()); + + // Verify the data arrived in the store and is correct size + let result = store + .get(&ObjectPath::from("output/large.parquet")) + .await + .unwrap(); + let stored = result.bytes().await.unwrap(); + assert_eq!(stored.len(), num_chunks * chunk.len()); + // Verify all bytes are correct + assert!(stored.iter().all(|&b| b == 0xAB)); + } + + #[tokio::test] + async fn write_multiple_small_writes() { + let store = Arc::new(InMemory::new()); + let mut writer = S3Writer::with_client(store.clone(), "output/multi.parquet"); + + // Simulate many small writes (like a Parquet encoder would produce) + for i in 0u8..100 { + writer.write_all(&[i]).unwrap(); + } + + let total = writer.finish().await.unwrap(); + assert_eq!(total, 100); + + let result = store + .get(&ObjectPath::from("output/multi.parquet")) + .await + .unwrap(); + let stored = result.bytes().await.unwrap(); + let expected: Vec = (0u8..100).collect(); + assert_eq!(stored.as_ref(), expected.as_slice()); + } + + #[tokio::test] + async fn total_bytes_tracks_writes() { + let store = Arc::new(InMemory::new()); + let mut writer = S3Writer::with_client(store, "output/track.parquet"); + + assert_eq!(writer.total_bytes(), 0); + + writer.write_all(&[1, 2, 3]).unwrap(); + assert_eq!(writer.total_bytes(), 3); + + writer.write_all(&[4, 5]).unwrap(); + assert_eq!(writer.total_bytes(), 5); + } +} diff --git a/spatialbench-cli/src/zone/config.rs b/spatialbench-cli/src/zone/config.rs index 3a30f98..a6d28bb 100644 --- a/spatialbench-cli/src/zone/config.rs +++ b/spatialbench-cli/src/zone/config.rs @@ -96,3 +96,72 @@ impl ZoneDfArgs { } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn default_args(output_dir: &str) -> ZoneDfArgs { + ZoneDfArgs::new( + 1.0, + PathBuf::from(output_dir), + None, + None, + None, + 128 * 1024 * 1024, + ParquetCompression::ZSTD(Default::default()), + ) + } + + #[test] + fn is_s3_with_s3_uri() { + let args = default_args("s3://my-bucket/output"); + assert!(args.is_s3()); + } + + #[test] + fn is_s3_with_local_path() { + let args = default_args("/tmp/output"); + assert!(!args.is_s3()); + } + + #[test] + fn is_s3_with_relative_path() { + let args = default_args("./output"); + assert!(!args.is_s3()); + } + + #[test] + fn output_s3_uri_single_file() { + let args = default_args("s3://bucket/prefix"); + assert_eq!(args.output_s3_uri(), "s3://bucket/prefix/zone.parquet"); + } + + #[test] + fn output_s3_uri_single_file_trailing_slash() { + let args = default_args("s3://bucket/prefix/"); + assert_eq!(args.output_s3_uri(), "s3://bucket/prefix/zone.parquet"); + } + + #[test] + fn output_s3_uri_with_partitions() { + let mut args = default_args("s3://bucket/prefix"); + args.parts = Some(10); + args.part = Some(3); + assert_eq!( + args.output_s3_uri(), + "s3://bucket/prefix/zone/zone.3.parquet" + ); + } + + #[test] + fn output_s3_uri_partition_defaults_to_part_1() { + let mut args = default_args("s3://bucket/prefix"); + args.parts = Some(5); + // part not set — should default to 1 + assert_eq!( + args.output_s3_uri(), + "s3://bucket/prefix/zone/zone.1.parquet" + ); + } +} From a203e7ef20627258e89260fcedd0ea9f356032f1 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 17:53:31 -0800 Subject: [PATCH 09/16] fix: finalize S3 uploads in async context to prevent deadlock --- spatialbench-cli/src/main.rs | 18 +++++++----------- spatialbench-cli/src/parquet.rs | 31 ++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs index 736e4cc..705706c 100644 --- a/spatialbench-cli/src/main.rs +++ b/spatialbench-cli/src/main.rs @@ -388,27 +388,23 @@ impl Cli { } } -impl IntoSize for BufWriter { - fn into_size(self) -> Result { - // we can't get the size of stdout, so just return 0 +impl AsyncFinalize for BufWriter { + async fn finalize(self) -> Result { Ok(0) } } -impl IntoSize for BufWriter { - fn into_size(self) -> Result { +impl AsyncFinalize for BufWriter { + async fn finalize(self) -> Result { let file = self.into_inner()?; let metadata = file.metadata()?; Ok(metadata.len() as usize) } } -impl IntoSize for s3_writer::S3Writer { - fn into_size(self) -> Result { - // Complete the S3 upload. This runs inside spawn_blocking, so we can - // use the tokio runtime handle to drive the async finish(). - let handle = tokio::runtime::Handle::current(); - handle.block_on(self.finish()) +impl AsyncFinalize for s3_writer::S3Writer { + async fn finalize(self) -> Result { + self.finish().await } } diff --git a/spatialbench-cli/src/parquet.rs b/spatialbench-cli/src/parquet.rs index 45ffcbd..fa9d84f 100644 --- a/spatialbench-cli/src/parquet.rs +++ b/spatialbench-cli/src/parquet.rs @@ -33,9 +33,17 @@ use std::io::Write; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; -pub trait IntoSize { - /// Convert the object into a size - fn into_size(self) -> Result; +/// Finalize a writer after all Parquet data has been written. +/// +/// This is called from the async context (outside `spawn_blocking`) so +/// that implementations like [`S3Writer`](crate::s3_writer::S3Writer) can +/// `.await` their upload without competing with the tokio runtime for +/// threads — avoiding deadlocks under concurrent plans. +/// +/// For local files and stdout the implementation is trivially synchronous. +pub trait AsyncFinalize: Write + Send + 'static { + /// Finalize the writer and return the total bytes written. + fn finalize(self) -> impl std::future::Future> + Send; } /// Converts a set of RecordBatchIterators into a Parquet file @@ -44,7 +52,7 @@ pub trait IntoSize { /// /// Note the input is an iterator of [`RecordBatchIterator`]; The batches /// produced by each iterator is encoded as its own row group. -pub async fn generate_parquet( +pub async fn generate_parquet( writer: W, iter_iter: I, num_threads: usize, @@ -119,9 +127,8 @@ where row_group_writer.close().unwrap(); statistics.increment_chunks(1); } - let size = writer.into_inner()?.into_size()?; - statistics.increment_bytes(size); - Ok(()) as Result<(), io::Error> + let inner = writer.into_inner()?; + Ok((inner, statistics)) as Result<(W, WriteStatistics), io::Error> }); // now, drive the input stream and send results to the writer task @@ -135,8 +142,14 @@ where // signal the writer task that we are done drop(tx); - // Wait for the writer task to finish - writer_task.await??; + // Wait for the blocking writer task to return the underlying writer + let (inner, mut statistics) = writer_task.await??; + + // Finalize in the async context so S3 uploads can .await without + // competing for tokio runtime threads (prevents deadlock under + // concurrent plans). + let size = inner.finalize().await?; + statistics.increment_bytes(size); Ok(()) } From 9b30c30bd4ca9c87fd4c4871591291ba49171797 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Wed, 11 Feb 2026 21:56:48 -0800 Subject: [PATCH 10/16] fix: resolve broken doc links in s3_writer.rs --- spatialbench-cli/src/s3_writer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index f2a2d4b..bbc3232 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -19,7 +19,7 @@ //! //! Data is buffered in 32 MB chunks (matching [`PARQUET_BUFFER_SIZE`]). When a //! chunk is full it is sent through an [`mpsc`] channel to a background tokio -//! task that uploads it immediately via [`MultipartUpload::put_part`]. This +//! task that uploads it immediately via `MultipartUpload::put_part`. This //! keeps peak memory usage roughly constant regardless of total file size. //! //! [`mpsc`]: tokio::sync::mpsc @@ -121,7 +121,7 @@ pub struct S3Writer { /// Receives the final result (total bytes) from the background upload task. result_rx: Option>>, /// Parts accumulated before we decide whether to use simple PUT or - /// multipart upload. Once we exceed [`S3_MIN_PART_SIZE`] total, we switch + /// multipart upload. Once we exceed [`PARQUET_BUFFER_SIZE`] total, we switch /// to the streaming multipart path. pending_parts: Vec, /// Whether the streaming multipart upload has been started. @@ -202,7 +202,7 @@ impl S3Writer { /// Complete the upload by sending any remaining data and waiting for the /// background task to finish. /// - /// For small files (total data < [`S3_MIN_PART_SIZE`] and fits in a single + /// For small files (total data < [`PARQUET_BUFFER_SIZE`] and fits in a single /// part), a simple PUT is used instead of multipart upload. /// /// This method must be called from an async context (it is typically called From 012154ff8da5d9de91982b9287194f1f1b8745ab Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Thu, 12 Feb 2026 08:10:41 -0800 Subject: [PATCH 11/16] docs: add S3 output documentation to quickstart and datasets pages --- docs/datasets-generators.md | 8 ++++++++ docs/quickstart.md | 20 ++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/docs/datasets-generators.md b/docs/datasets-generators.md index 5392ebe..c007f1b 100644 --- a/docs/datasets-generators.md +++ b/docs/datasets-generators.md @@ -106,6 +106,14 @@ You can generate the tables for Scale Factor 1 with the following command: spatialbench-cli -s 1 --format=parquet --output-dir sf1-parquet ``` +You can also generate data directly to Amazon S3 by providing an S3 URI: + +``` +spatialbench-cli -s 1 --format=parquet --output-dir s3://my-bucket/sf1-parquet +``` + +See the [Quickstart](quickstart.md#generate-data-directly-to-s3) for details on configuring AWS credentials. + Here are the contents of the `sf1-parquet` directory: * `building.parquet` diff --git a/docs/quickstart.md b/docs/quickstart.md index a4f75bb..f6dea28 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -84,6 +84,26 @@ spatialbench-cli --scale-factor 10 --mb-per-file 512 spatialbench-cli --scale-factor 1 --output-dir data/sf1 ``` +### Generate Data Directly to S3 + +You can generate data directly to Amazon S3 or S3-compatible storage by providing an S3 URI as the output directory: + +```shell +# Set AWS credentials +export AWS_ACCESS_KEY_ID="your-access-key" +export AWS_SECRET_ACCESS_KEY="your-secret-key" +export AWS_REGION="us-west-2" # Must match your bucket's region + +# Generate to S3 +spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir s3://my-bucket/spatialbench/sf10 + +# For S3-compatible services (MinIO, etc.) +export AWS_ENDPOINT="http://localhost:9000" +spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data +``` + +The S3 writer uses streaming multipart upload, buffering data in 32 MB chunks before uploading parts. All standard AWS environment variables are supported, including `AWS_SESSION_TOKEN` for temporary credentials. + ## Configuring Spatial Distributions SpatialBench uses a spatial data generator to generate synthetic points and polygons using realistic spatial distributions. From 00c533b7c2a402f63a05d26ada5fd63f2e4b4adc Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Thu, 12 Feb 2026 08:46:18 -0800 Subject: [PATCH 12/16] fix: finalize S3 uploads for CSV/TBL writes --- spatialbench-cli/src/generate.rs | 14 ++++++++------ spatialbench-cli/src/main.rs | 10 ++++++++-- spatialbench-cli/src/runner.rs | 7 +++++-- spatialbench-cli/src/s3_writer.rs | 32 +++++++++++++++++++++++++++++++ 4 files changed, 53 insertions(+), 10 deletions(-) diff --git a/spatialbench-cli/src/generate.rs b/spatialbench-cli/src/generate.rs index 90cba4b..3215175 100644 --- a/spatialbench-cli/src/generate.rs +++ b/spatialbench-cli/src/generate.rs @@ -45,12 +45,13 @@ pub trait Source: Send { /// Something that can write the contents of a buffer somewhere /// /// For example, this is implemented for a file writer. -pub trait Sink: Send { +pub trait Sink: Send + Sized { /// Write all data from the buffer to the sink fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>; - /// Complete and flush any remaining data from the sink - fn flush(self) -> Result<(), io::Error>; + /// Complete and flush any remaining data from the sink, returning it + /// so the caller can perform additional finalization (e.g. async S3 upload). + fn flush(self) -> Result; } /// Generates data in parallel from a series of [`Source`] and writes to a [`Sink`] @@ -69,7 +70,7 @@ pub async fn generate_in_chunks( mut sink: S, sources: I, num_threads: usize, -) -> Result<(), io::Error> +) -> Result where G: Source + 'static, I: Iterator, @@ -86,7 +87,7 @@ where // write the header let Some(first) = sources.peek() else { - return Ok(()); // no sources + return Ok(sink); // no sources }; let header = first.header(Vec::new()); tx.send(header) @@ -131,7 +132,8 @@ where sink.sink(&buffer)?; captured_recycler.return_buffer(buffer); } - // No more input, flush the sink and return + // No more input, flush the sink and return it so the caller can + // perform additional finalization (e.g. async S3 upload). sink.flush() }); diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs index 705706c..431720b 100644 --- a/spatialbench-cli/src/main.rs +++ b/spatialbench-cli/src/main.rs @@ -421,6 +421,11 @@ impl WriterSink { statistics: WriteStatistics::new("buffers"), } } + + /// Consume the sink and return the inner writer for further finalization. + fn into_inner(self) -> W { + self.inner + } } impl Sink for WriterSink { @@ -430,7 +435,8 @@ impl Sink for WriterSink { self.inner.write_all(buffer) } - fn flush(mut self) -> Result<(), io::Error> { - self.inner.flush() + fn flush(mut self) -> Result { + self.inner.flush()?; + Ok(self) } } diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs index 9d49c7e..c014ea1 100644 --- a/spatialbench-cli/src/runner.rs +++ b/spatialbench-cli/src/runner.rs @@ -197,7 +197,8 @@ where match plan.output_location() { OutputLocation::Stdout => { let sink = WriterSink::new(io::stdout()); - generate_in_chunks(sink, sources, num_threads).await + generate_in_chunks(sink, sources, num_threads).await?; + Ok(()) } OutputLocation::File(path) => { // if the output already exists, skip running @@ -224,7 +225,9 @@ where info!("Writing to S3: {}", uri); let s3_writer = S3Writer::with_client(Arc::clone(client), path); let sink = WriterSink::new(s3_writer); - generate_in_chunks(sink, sources, num_threads).await + let sink = generate_in_chunks(sink, sources, num_threads).await?; + sink.into_inner().finish().await?; + Ok(()) } } } diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index bbc3232..4475d90 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -533,4 +533,36 @@ mod tests { writer.write_all(&[4, 5]).unwrap(); assert_eq!(writer.total_bytes(), 5); } + + /// Verify that `std::io::Write::flush()` does NOT upload data to S3. + /// Data is only uploaded when `finish()` is called. This test guards + /// against the bug where CSV/TBL writes were silently lost because + /// the `WriterSink` called `flush()` (a no-op) but never `finish()`. + #[tokio::test] + async fn flush_does_not_upload_without_finish() { + let store = Arc::new(InMemory::new()); + let mut writer = S3Writer::with_client(store.clone(), "output/flush_test.csv"); + + let data = b"col1,col2\nfoo,bar\n"; + writer.write_all(data).unwrap(); + writer.flush().unwrap(); + + // Data should NOT be in the store yet — flush is a no-op + let result = store.get(&ObjectPath::from("output/flush_test.csv")).await; + assert!( + result.is_err(), + "data should not be uploaded before finish()" + ); + + // Now call finish — data should appear + let total = writer.finish().await.unwrap(); + assert_eq!(total, data.len()); + + let result = store + .get(&ObjectPath::from("output/flush_test.csv")) + .await + .unwrap(); + let stored = result.bytes().await.unwrap(); + assert_eq!(stored.as_ref(), data); + } } From a9471750e95f4279644651d153adc602d6afa908 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Thu, 12 Feb 2026 12:01:08 -0800 Subject: [PATCH 13/16] fix: propagate errors instead of panicking on S3 upload failures --- spatialbench-cli/src/parquet.rs | 14 ++++++++------ spatialbench-cli/src/s3_writer.rs | 19 +++++++++++++++++-- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/spatialbench-cli/src/parquet.rs b/spatialbench-cli/src/parquet.rs index fa9d84f..d721eec 100644 --- a/spatialbench-cli/src/parquet.rs +++ b/spatialbench-cli/src/parquet.rs @@ -113,21 +113,23 @@ where ) = tokio::sync::mpsc::channel(num_threads); let writer_task = tokio::task::spawn_blocking(move || { // Create parquet writer - let mut writer = - SerializedFileWriter::new(writer, root_schema, writer_properties_captured).unwrap(); + let mut writer = SerializedFileWriter::new(writer, root_schema, writer_properties_captured) + .map_err(io::Error::from)?; while let Some(chunks) = rx.blocking_recv() { // Start row group - let mut row_group_writer = writer.next_row_group().unwrap(); + let mut row_group_writer = writer.next_row_group().map_err(io::Error::from)?; // Slap the chunks into the row group for chunk in chunks { - chunk.append_to_row_group(&mut row_group_writer).unwrap(); + chunk + .append_to_row_group(&mut row_group_writer) + .map_err(io::Error::from)?; } - row_group_writer.close().unwrap(); + row_group_writer.close().map_err(io::Error::from)?; statistics.increment_chunks(1); } - let inner = writer.into_inner()?; + let inner = writer.into_inner().map_err(io::Error::from)?; Ok((inner, statistics)) as Result<(W, WriteStatistics), io::Error> }); diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index 4475d90..55bdb21 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -191,10 +191,25 @@ impl S3Writer { } /// Send a completed buffer chunk to the background upload task. + /// + /// If the channel is closed (because the background task failed), this + /// attempts to retrieve the real error from `result_rx` so the caller + /// sees the underlying S3 error rather than a generic "channel closed" + /// message. fn send_part(&mut self, part: Bytes) -> io::Result<()> { if let Some(tx) = &self.upload_tx { - tx.blocking_send(UploadMessage::Part(part)) - .map_err(|_| io::Error::other("Background upload task terminated unexpectedly"))?; + if tx.blocking_send(UploadMessage::Part(part)).is_err() { + // The background task has terminated — try to retrieve the + // real error it reported before falling back to a generic msg. + if let Some(rx) = &mut self.result_rx { + if let Ok(Err(e)) = rx.try_recv() { + return Err(io::Error::other(format!("S3 upload failed: {e}"))); + } + } + return Err(io::Error::other( + "Background upload task terminated unexpectedly", + )); + } } Ok(()) } From 1091d38c5c73dd41e2fa484d02abb702060f15e7 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Thu, 12 Feb 2026 12:02:26 -0800 Subject: [PATCH 14/16] test: add multipart upload tests for large files and spawn_blocking --- spatialbench-cli/src/s3_writer.rs | 72 +++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/spatialbench-cli/src/s3_writer.rs b/spatialbench-cli/src/s3_writer.rs index 55bdb21..3a25dfa 100644 --- a/spatialbench-cli/src/s3_writer.rs +++ b/spatialbench-cli/src/s3_writer.rs @@ -580,4 +580,76 @@ mod tests { let stored = result.bytes().await.unwrap(); assert_eq!(stored.as_ref(), data); } + + /// Simulate the `--mb-per-file 256` scenario: a large file with multiple + /// multipart parts streamed through the channel after the initial pending + /// parts are drained. This exercises the `send_part` → channel → background + /// task path with several parts (like 6 × 32 MB for a ~185 MB file). + /// + /// Writes are done from `spawn_blocking` to match the real Parquet write + /// path — `blocking_send` requires a non-async context. + #[tokio::test] + async fn write_many_parts_triggers_streaming_multipart() { + let store = Arc::new(InMemory::new()); + let writer = S3Writer::with_client(store.clone(), "output/many_parts.parquet"); + + // Write 192 MB from a blocking task. The first 32 MB goes to + // pending_parts, then start_multipart_upload is called, and the + // remaining 5 parts are streamed through the channel. + let writer = tokio::task::spawn_blocking(move || { + let mut writer = writer; + let chunk = vec![0xCDu8; 1024 * 1024]; // 1 MB + let total_mb = 192; + for _ in 0..total_mb { + writer.write_all(&chunk).unwrap(); + } + writer + }) + .await + .unwrap(); + + let total_mb = 192; + let total = writer.finish().await.unwrap(); + assert_eq!(total, total_mb * 1024 * 1024); + + let result = store + .get(&ObjectPath::from("output/many_parts.parquet")) + .await + .unwrap(); + let stored = result.bytes().await.unwrap(); + assert_eq!(stored.len(), total_mb * 1024 * 1024); + assert!(stored.iter().all(|&b| b == 0xCD)); + } + + /// Write from inside `spawn_blocking` to match the real Parquet write + /// path, where `S3Writer::write()` is called from a blocking thread and + /// `finish()` is awaited after the blocking task returns. + #[tokio::test] + async fn write_from_spawn_blocking() { + let store = Arc::new(InMemory::new()); + let writer = S3Writer::with_client(store.clone(), "output/blocking.parquet"); + + // Write 96 MB (3 × 32 MB parts) from a blocking task + let writer = tokio::task::spawn_blocking(move || { + let mut writer = writer; + let chunk = vec![0xEFu8; 1024 * 1024]; // 1 MB + for _ in 0..96 { + writer.write_all(&chunk).unwrap(); + } + writer + }) + .await + .unwrap(); + + let total = writer.finish().await.unwrap(); + assert_eq!(total, 96 * 1024 * 1024); + + let result = store + .get(&ObjectPath::from("output/blocking.parquet")) + .await + .unwrap(); + let stored = result.bytes().await.unwrap(); + assert_eq!(stored.len(), 96 * 1024 * 1024); + assert!(stored.iter().all(|&b| b == 0xEF)); + } } From 126ad26f83efc96aac9ed8c59a548357d39a6dbe Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Thu, 12 Feb 2026 13:54:52 -0800 Subject: [PATCH 15/16] test: add S3 integration tests using MinIO (ignored by default) --- spatialbench-cli/tests/s3_integration.rs | 177 +++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 spatialbench-cli/tests/s3_integration.rs diff --git a/spatialbench-cli/tests/s3_integration.rs b/spatialbench-cli/tests/s3_integration.rs new file mode 100644 index 0000000..418139f --- /dev/null +++ b/spatialbench-cli/tests/s3_integration.rs @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for S3 output using MinIO. +//! +//! These tests are `#[ignore]`d by default and only run in CI where a MinIO +//! service container is available. They verify end-to-end S3 write support +//! by running the CLI binary against a real S3-compatible object store. +//! +//! Required environment variables (set by CI): +//! - `AWS_ACCESS_KEY_ID` +//! - `AWS_SECRET_ACCESS_KEY` +//! - `AWS_ENDPOINT` — MinIO endpoint (e.g. `http://localhost:9000`) +//! - `AWS_REGION` +//! - `AWS_ALLOW_HTTP=true` +//! - `S3_TEST_BUCKET` — bucket name to write to (must already exist) + +use assert_cmd::Command; +use object_store::aws::AmazonS3Builder; +use object_store::ObjectStore; +use std::sync::Arc; + +/// Build an S3 client pointing at the MinIO instance, using the same env +/// vars that `spatialbench-cli` uses internally. +fn minio_client(bucket: &str) -> Arc { + Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .build() + .expect("Failed to build MinIO client from env"), + ) +} + +/// Return the test bucket name from the environment. +fn test_bucket() -> String { + std::env::var("S3_TEST_BUCKET").expect("S3_TEST_BUCKET not set") +} + +/// List all object keys under the given prefix. +async fn list_keys(client: &dyn ObjectStore, prefix: &str) -> Vec { + use futures::TryStreamExt; + let prefix = object_store::path::Path::from(prefix); + client + .list(Some(&prefix)) + .try_collect::>() + .await + .expect("Failed to list objects") + .into_iter() + .map(|meta| meta.location.to_string()) + .collect() +} + +/// "Is it plugged in" check: generate Parquet output to S3 and verify the +/// files land in the bucket. +#[tokio::test] +#[ignore] +async fn s3_parquet_output() { + let bucket = test_bucket(); + let prefix = "s3-integration-test/parquet"; + let output_dir = format!("s3://{bucket}/{prefix}/"); + + Command::cargo_bin("spatialbench-cli") + .expect("Binary not found") + .args([ + "--scale-factor", + "0.001", + "--tables", + "trip", + "--format", + "parquet", + "--output-dir", + &output_dir, + ]) + .assert() + .success(); + + let client = minio_client(&bucket); + let keys = list_keys(client.as_ref(), prefix).await; + assert!( + keys.iter().any(|k| k.ends_with(".parquet")), + "Expected at least one .parquet file in {output_dir}, found: {keys:?}" + ); + + // Verify the file is non-empty + for key in &keys { + let path = object_store::path::Path::from(key.as_str()); + let meta = client.head(&path).await.expect("Failed to HEAD object"); + assert!(meta.size > 0, "File {key} should be non-empty"); + } +} + +/// Verify CSV output to S3 works (exercises the WriterSink → finish() path, +/// which is different from the Parquet AsyncFinalize path). +#[tokio::test] +#[ignore] +async fn s3_csv_output() { + let bucket = test_bucket(); + let prefix = "s3-integration-test/csv"; + let output_dir = format!("s3://{bucket}/{prefix}/"); + + Command::cargo_bin("spatialbench-cli") + .expect("Binary not found") + .args([ + "--scale-factor", + "0.001", + "--tables", + "trip", + "--format", + "csv", + "--output-dir", + &output_dir, + ]) + .assert() + .success(); + + let client = minio_client(&bucket); + let keys = list_keys(client.as_ref(), prefix).await; + assert!( + keys.iter().any(|k| k.ends_with(".csv")), + "Expected at least one .csv file in {output_dir}, found: {keys:?}" + ); + + for key in &keys { + let path = object_store::path::Path::from(key.as_str()); + let meta = client.head(&path).await.expect("Failed to HEAD object"); + assert!(meta.size > 0, "File {key} should be non-empty"); + } +} + +/// Verify multi-part file generation works with S3 output. +#[tokio::test] +#[ignore] +async fn s3_parquet_multi_part_output() { + let bucket = test_bucket(); + let prefix = "s3-integration-test/parquet-parts"; + let output_dir = format!("s3://{bucket}/{prefix}/"); + + Command::cargo_bin("spatialbench-cli") + .expect("Binary not found") + .args([ + "--scale-factor", + "0.001", + "--tables", + "trip", + "--format", + "parquet", + "--parts", + "2", + "--output-dir", + &output_dir, + ]) + .assert() + .success(); + + let client = minio_client(&bucket); + let keys = list_keys(client.as_ref(), prefix).await; + let parquet_keys: Vec<_> = keys.iter().filter(|k| k.ends_with(".parquet")).collect(); + assert_eq!( + parquet_keys.len(), + 2, + "Expected 2 .parquet files with --parts 2, found: {parquet_keys:?}" + ); +} From 534a46c155f893aa6f22997f590b54dbd3fc8c59 Mon Sep 17 00:00:00 2001 From: Pranav Toggi Date: Thu, 12 Feb 2026 13:56:11 -0800 Subject: [PATCH 16/16] ci: add S3 integration test job with MinIO and path filtering --- .github/workflows/rust.yml | 66 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 82c8651..b8c8c7f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -107,3 +107,69 @@ jobs: env: RUSTDOCFLAGS: "-D warnings" run: cargo doc --no-deps --workspace + + # S3 integration tests using MinIO (only when S3-related files change) + test-s3-integration: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Check for S3-related changes + id: changes + run: | + BASE_SHA=${{ github.event.pull_request.base.sha || github.event.before || 'HEAD~1' }} + if git diff --name-only "$BASE_SHA" HEAD | grep -qE \ + '^spatialbench-cli/src/(s3_writer|runner|output_plan|main)\.rs$|^spatialbench-cli/tests/s3_integration\.rs$|^\.github/workflows/rust\.yml$'; then + echo "s3=true" >> "$GITHUB_OUTPUT" + else + echo "s3=false" >> "$GITHUB_OUTPUT" + fi + + - name: Start MinIO + if: steps.changes.outputs.s3 == 'true' + run: | + docker run -d --name minio \ + -p 9000:9000 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + minio/minio:latest server /data + # Wait for MinIO to be ready + for i in $(seq 1 30); do + if curl -sf http://localhost:9000/minio/health/live; then + echo "MinIO is ready" + exit 0 + fi + sleep 1 + done + echo "MinIO failed to start" + docker logs minio + exit 1 + + - name: Create MinIO test bucket + if: steps.changes.outputs.s3 == 'true' + run: | + curl -sL https://dl.min.io/client/mc/release/linux-amd64/mc -o /usr/local/bin/mc + chmod +x /usr/local/bin/mc + mc alias set local http://localhost:9000 minioadmin minioadmin + mc mb local/spatialbench-test + + - uses: dtolnay/rust-toolchain@stable + if: steps.changes.outputs.s3 == 'true' + + - uses: Swatinem/rust-cache@v2 + if: steps.changes.outputs.s3 == 'true' + with: + prefix-key: "rust-test-s3-v1" + + - name: Run S3 integration tests + if: steps.changes.outputs.s3 == 'true' + env: + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + AWS_ENDPOINT: http://localhost:9000 + AWS_REGION: us-east-1 + AWS_ALLOW_HTTP: "true" + S3_TEST_BUCKET: spatialbench-test + run: cargo test -p spatialbench-cli --test s3_integration -- --ignored