diff --git a/Cargo.lock b/Cargo.lock index c7c103d279..24139b5967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3565,6 +3565,7 @@ version = "0.7.0" dependencies = [ "anyhow", "async-trait", + "dashmap", "datafusion", "expect-test", "futures", diff --git a/Cargo.toml b/Cargo.toml index 9904820dea..9d77d1799d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ fs-err = "3.1.0" futures = "0.3" hive_metastore = "0.2.0" home = "=0.5.11" +dashmap = "6" http = "1.2" iceberg = { version = "0.7.0", path = "./crates/iceberg" } iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" } diff --git a/crates/iceberg/src/arrow/id_assigner.rs b/crates/iceberg/src/arrow/id_assigner.rs new file mode 100644 index 0000000000..02dacb089d --- /dev/null +++ b/crates/iceberg/src/arrow/id_assigner.rs @@ -0,0 +1,359 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Arrow schema field ID assignment using breadth-first traversal + +use std::sync::Arc; + +use arrow_schema::{DataType, Fields, Schema as ArrowSchema, TimeUnit}; + +use super::get_field_doc; +use crate::error::Result; +use crate::spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, +}; +use crate::{Error, ErrorKind}; + +/// Helper for assigning field IDs using breadth-first traversal. +/// +/// This struct implements BFS traversal to assign field IDs level-by-level, +/// similar to how `ReassignFieldIds` works in the spec module. All fields at +/// one level are assigned IDs before descending to nested fields. +pub(super) struct ArrowSchemaIdAssigner { + next_id: i32, +} + +impl ArrowSchemaIdAssigner { + pub(super) fn new(start_id: i32) -> Self { + Self { next_id: start_id } + } + + fn next_field_id(&mut self) -> i32 { + let id = self.next_id; + self.next_id += 1; + id + } + + pub(super) fn convert_schema(&mut self, schema: &ArrowSchema) -> Result { + let fields = self.convert_fields(schema.fields())?; + Schema::builder().with_fields(fields).build() + } + + fn convert_fields(&mut self, fields: &Fields) -> Result> { + // First pass: convert all fields at this level and assign IDs + let fields_with_types: Vec<_> = fields + .iter() + .map(|field| { + let id = self.next_field_id(); + let field_type = arrow_type_to_primitive_or_placeholder(field.data_type())?; + Ok((field, id, field_type)) + }) + .collect::>>()?; + + // Second pass: recursively process nested types + fields_with_types + .into_iter() + .map(|(field, id, field_type)| { + let final_type = self.process_nested_type(field.data_type(), field_type)?; + let doc = get_field_doc(field); + Ok(Arc::new(NestedField { + id, + doc, + name: field.name().clone(), + required: !field.is_nullable(), + field_type: Box::new(final_type), + initial_default: None, + write_default: None, + })) + }) + .collect() + } + + fn process_nested_type(&mut self, arrow_type: &DataType, placeholder: Type) -> Result { + match arrow_type { + DataType::Struct(fields) => { + let nested_fields = self.convert_fields(fields)?; + Ok(Type::Struct(StructType::new(nested_fields))) + } + DataType::List(element_field) + | DataType::LargeList(element_field) + | DataType::FixedSizeList(element_field, _) => { + let element_id = self.next_field_id(); + let element_type = + arrow_type_to_primitive_or_placeholder(element_field.data_type())?; + let final_element_type = + self.process_nested_type(element_field.data_type(), element_type)?; + + let doc = get_field_doc(element_field); + let mut element = NestedField::list_element( + element_id, + final_element_type, + !element_field.is_nullable(), + ); + if let Some(doc) = doc { + element = element.with_doc(doc); + } + Ok(Type::List(ListType { + element_field: Arc::new(element), + })) + } + DataType::Map(field, _) => match field.data_type() { + DataType::Struct(fields) if fields.len() == 2 => { + let key_field = &fields[0]; + let value_field = &fields[1]; + + let key_id = self.next_field_id(); + let key_type = arrow_type_to_primitive_or_placeholder(key_field.data_type())?; + let final_key_type = + self.process_nested_type(key_field.data_type(), key_type)?; + + let value_id = self.next_field_id(); + let value_type = + arrow_type_to_primitive_or_placeholder(value_field.data_type())?; + let final_value_type = + self.process_nested_type(value_field.data_type(), value_type)?; + + let key_doc = get_field_doc(key_field); + let mut key = NestedField::map_key_element(key_id, final_key_type); + if let Some(doc) = key_doc { + key = key.with_doc(doc); + } + + let value_doc = get_field_doc(value_field); + let mut value = NestedField::map_value_element( + value_id, + final_value_type, + !value_field.is_nullable(), + ); + if let Some(doc) = value_doc { + value = value.with_doc(doc); + } + + Ok(Type::Map(MapType { + key_field: Arc::new(key), + value_field: Arc::new(value), + })) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have struct type with 2 fields", + )), + }, + _ => Ok(placeholder), // Primitive type, return as-is + } + } +} + +/// Convert Arrow type to Iceberg type for primitives, or return a placeholder for complex types +fn arrow_type_to_primitive_or_placeholder(ty: &DataType) -> Result { + match ty { + DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)), + DataType::Int8 | DataType::Int16 | DataType::Int32 => { + Ok(Type::Primitive(PrimitiveType::Int)) + } + DataType::UInt8 | DataType::UInt16 => Ok(Type::Primitive(PrimitiveType::Int)), + DataType::UInt32 => Ok(Type::Primitive(PrimitiveType::Long)), + DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)), + DataType::UInt64 => Err(Error::new( + ErrorKind::DataInvalid, + "UInt64 is not supported. Use Int64 for values ≤ 9,223,372,036,854,775,807 or Decimal(20,0) for full uint64 range.", + )), + DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)), + DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)), + DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to create decimal type".to_string(), + ) + .with_source(e) + }), + DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)), + DataType::Time64(unit) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Time)) + } + DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Timestamp)) + } + DataType::Timestamp(unit, None) if unit == &TimeUnit::Nanosecond => { + Ok(Type::Primitive(PrimitiveType::TimestampNs)) + } + DataType::Timestamp(unit, Some(zone)) + if unit == &TimeUnit::Microsecond + && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") => + { + Ok(Type::Primitive(PrimitiveType::Timestamptz)) + } + DataType::Timestamp(unit, Some(zone)) + if unit == &TimeUnit::Nanosecond + && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") => + { + Ok(Type::Primitive(PrimitiveType::TimestamptzNs)) + } + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => { + Ok(Type::Primitive(PrimitiveType::Binary)) + } + DataType::FixedSizeBinary(width) => { + Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64))) + } + DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => { + Ok(Type::Primitive(PrimitiveType::String)) + } + // For complex types, return a placeholder that will be replaced + DataType::Struct(_) + | DataType::List(_) + | DataType::LargeList(_) + | DataType::FixedSizeList(_, _) + | DataType::Map(_, _) => { + Ok(Type::Primitive(PrimitiveType::Boolean)) // Placeholder + } + other => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported Arrow data type: {other}"), + )), + } +} + +#[cfg(test)] +mod tests { + use arrow_schema::Field; + + use super::*; + use crate::arrow::DEFAULT_MAP_FIELD_NAME; + + #[test] + fn test_arrow_schema_to_schema_with_assigned_ids() { + // Create an Arrow schema without field IDs (like DataFusion CREATE TABLE) + // Include nested structures to test ID assignment in BFS order + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + // Struct field with nested fields + Field::new( + "address", + DataType::Struct(Fields::from(vec![ + Field::new("street", DataType::Utf8, false), + Field::new("city", DataType::Utf8, false), + Field::new("zip", DataType::Int32, true), + ])), + true, + ), + // List field + Field::new( + "tags", + DataType::List(Arc::new(Field::new("element", DataType::Utf8, false))), + true, + ), + // Map field + Field::new( + "properties", + DataType::Map( + Arc::new(Field::new( + DEFAULT_MAP_FIELD_NAME, + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, true), + ])), + false, + )), + false, + ), + false, + ), + Field::new("value", DataType::Float64, false), + ]); + + // Convert to Iceberg schema with auto-assigned IDs + let mut assigner = ArrowSchemaIdAssigner::new(1); + let iceberg_schema = assigner.convert_schema(&arrow_schema).unwrap(); + + // Verify the schema structure + let fields = iceberg_schema.as_struct().fields(); + assert_eq!(fields.len(), 6); + + // BFS ordering: top-level fields get IDs 1-6, then nested fields get IDs 7+ + + // Check field 1: id + assert_eq!(fields[0].id, 1); + assert_eq!(fields[0].name, "id"); + assert!(fields[0].required); + assert!(matches!( + fields[0].field_type.as_ref(), + Type::Primitive(PrimitiveType::Int) + )); + + // Check field 2: name + assert_eq!(fields[1].id, 2); + assert_eq!(fields[1].name, "name"); + assert!(!fields[1].required); + assert!(matches!( + fields[1].field_type.as_ref(), + Type::Primitive(PrimitiveType::String) + )); + + // Check field 3: address (struct with nested fields) + assert_eq!(fields[2].id, 3); + assert_eq!(fields[2].name, "address"); + assert!(!fields[2].required); + if let Type::Struct(struct_type) = fields[2].field_type.as_ref() { + let nested_fields = struct_type.fields(); + assert_eq!(nested_fields.len(), 3); + // Nested field IDs are assigned after all top-level fields (7, 8, 9) + assert_eq!(nested_fields[0].id, 7); + assert_eq!(nested_fields[0].name, "street"); + assert_eq!(nested_fields[1].id, 8); + assert_eq!(nested_fields[1].name, "city"); + assert_eq!(nested_fields[2].id, 9); + assert_eq!(nested_fields[2].name, "zip"); + } else { + panic!("Expected struct type for address field"); + } + + // Check field 4: tags (list) + assert_eq!(fields[3].id, 4); + assert_eq!(fields[3].name, "tags"); + assert!(!fields[3].required); + if let Type::List(list_type) = fields[3].field_type.as_ref() { + // List element ID is assigned after top-level fields + assert_eq!(list_type.element_field.id, 10); + assert!(list_type.element_field.required); + } else { + panic!("Expected list type for tags field"); + } + + // Check field 5: properties (map) + assert_eq!(fields[4].id, 5); + assert_eq!(fields[4].name, "properties"); + assert!(fields[4].required); + if let Type::Map(map_type) = fields[4].field_type.as_ref() { + // Map key and value IDs are assigned after top-level fields + assert_eq!(map_type.key_field.id, 11); + assert_eq!(map_type.value_field.id, 12); + assert!(!map_type.value_field.required); + } else { + panic!("Expected map type for properties field"); + } + + // Check field 6: value + assert_eq!(fields[5].id, 6); + assert_eq!(fields[5].name, "value"); + assert!(fields[5].required); + assert!(matches!( + fields[5].field_type.as_ref(), + Type::Primitive(PrimitiveType::Double) + )); + } +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index c091c45177..545454887e 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -17,6 +17,7 @@ //! Conversion between Iceberg and Arrow schema +mod id_assigner; mod schema; pub use schema::*; diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 4f4f083c73..ec97f1e846 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -33,6 +33,7 @@ use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; +use super::id_assigner::ArrowSchemaIdAssigner; use crate::error::Result; use crate::spec::{ Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema, @@ -221,6 +222,17 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { visit_schema(schema, &mut visitor) } +/// Convert Arrow schema to Iceberg schema with auto-assigned field IDs. +/// +/// This function is useful when converting Arrow schemas that don't have field IDs +/// in their metadata (e.g., from DataFusion CREATE TABLE statements). Field IDs +/// are assigned sequentially starting from 1, using breadth-first traversal to assign +/// IDs level by level (all fields at one level before descending to nested fields). +pub fn arrow_schema_to_schema_with_assigned_ids(schema: &ArrowSchema) -> Result { + let mut assigner = ArrowSchemaIdAssigner::new(1); + assigner.convert_schema(schema) +} + /// Convert Arrow type to iceberg type. pub fn arrow_type_to_type(ty: &DataType) -> Result { let mut visitor = ArrowSchemaConverter::new(); @@ -246,7 +258,7 @@ pub(super) fn get_field_id(field: &Field) -> Result { )) } -fn get_field_doc(field: &Field) -> Option { +pub(super) fn get_field_doc(field: &Field) -> Option { if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) { return Some(value.clone()); } diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 0ee1738b4f..fd3e489e4b 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -31,6 +31,7 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +dashmap = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 31bbdbd67f..3f76e32327 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -16,16 +16,17 @@ // under the License. use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use dashmap::DashMap; use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result as DFResult}; use futures::future::try_join_all; +use iceberg::arrow::arrow_schema_to_schema_with_assigned_ids; use iceberg::inspect::MetadataTableType; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; use crate::table::IcebergTableProvider; use crate::to_datafusion_error; @@ -34,10 +35,14 @@ use crate::to_datafusion_error; /// access to table providers within a specific namespace. #[derive(Debug)] pub(crate) struct IcebergSchemaProvider { - /// A `HashMap` where keys are table names + /// Reference to the Iceberg catalog + catalog: Arc, + /// The namespace this schema represents + namespace: NamespaceIdent, + /// A concurrent map where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. - tables: HashMap>, + tables: Arc>>, } impl IcebergSchemaProvider { @@ -71,13 +76,16 @@ impl IcebergSchemaProvider { ) .await?; - let tables: HashMap> = table_names - .into_iter() - .zip(providers.into_iter()) - .map(|(name, provider)| (name, Arc::new(provider))) - .collect(); + let tables = DashMap::new(); + for (name, provider) in table_names.into_iter().zip(providers.into_iter()) { + tables.insert(name, Arc::new(provider)); + } - Ok(IcebergSchemaProvider { tables }) + Ok(IcebergSchemaProvider { + catalog: client, + namespace, + tables: Arc::new(tables), + }) } } @@ -89,13 +97,16 @@ impl SchemaProvider for IcebergSchemaProvider { fn table_names(&self) -> Vec { self.tables - .keys() - .flat_map(|table_name| { + .iter() + .flat_map(|entry| { + let table_name = entry.key().clone(); [table_name.clone()] .into_iter() - .chain(MetadataTableType::all_types().map(|metadata_table_name| { - format!("{}${}", table_name.clone(), metadata_table_name.as_str()) - })) + .chain( + MetadataTableType::all_types().map(move |metadata_table_name| { + format!("{}${}", table_name, metadata_table_name.as_str()) + }), + ) }) .collect() } @@ -127,7 +138,97 @@ impl SchemaProvider for IcebergSchemaProvider { Ok(self .tables .get(name) - .cloned() - .map(|t| t as Arc)) + .map(|entry| entry.value().clone() as Arc)) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> DFResult>> { + // Convert DataFusion schema to Iceberg schema + // DataFusion schemas don't have field IDs, so we use the function that assigns them automatically + let df_schema = table.schema(); + let iceberg_schema = arrow_schema_to_schema_with_assigned_ids(df_schema.as_ref()) + .map_err(to_datafusion_error)?; + + // Create the table in the Iceberg catalog + let table_creation = TableCreation::builder() + .name(name.clone()) + .schema(iceberg_schema) + .build(); + + let catalog = self.catalog.clone(); + let namespace = self.namespace.clone(); + let tables = self.tables.clone(); + let name_clone = name.clone(); + + // Use tokio's spawn_blocking to handle the async work on a blocking thread pool + // This avoids the "cannot block within async runtime" error + let result = tokio::task::spawn_blocking(move || { + // Create a new runtime handle to execute the async work + let rt = tokio::runtime::Handle::current(); + rt.block_on(async move { + catalog + .create_table(&namespace, table_creation) + .await + .map_err(to_datafusion_error)?; + + // Create a new table provider using the catalog reference + let table_provider = IcebergTableProvider::try_new( + catalog.clone(), + namespace.clone(), + name_clone.clone(), + ) + .await + .map_err(to_datafusion_error)?; + + // Store the new table provider + let old_table = tables.insert(name_clone, Arc::new(table_provider)); + + Ok(old_table.map(|t| t as Arc)) + }) + }); + + // Block on the spawned task to get the result + // This is safe because spawn_blocking moves the blocking to a dedicated thread pool + futures::executor::block_on(result).map_err(|e| { + DataFusionError::Execution(format!("Failed to create Iceberg table: {}", e)) + })? + } + + fn deregister_table(&self, name: &str) -> DFResult>> { + // Don't allow dropping metadata tables directly + if name.contains('$') { + return Err(DataFusionError::Plan( + "Cannot drop metadata tables directly. Drop the parent table instead.".to_string(), + )); + } + + let catalog = self.catalog.clone(); + let namespace = self.namespace.clone(); + let tables = self.tables.clone(); + let name = name.to_string(); + + let result = tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Handle::current(); + rt.block_on(async move { + let table_ident = iceberg::TableIdent::new(namespace.clone(), name.clone()); + + // Drop the table from the Iceberg catalog + catalog + .drop_table(&table_ident) + .await + .map_err(to_datafusion_error)?; + + // Remove from local cache and return the old provider + let old_table = tables.remove(&name); + Ok(old_table.map(|(_, provider)| provider as Arc)) + }) + }); + + futures::executor::block_on(result).map_err(|e| { + DataFusionError::Execution(format!("Failed to drop Iceberg table: {}", e)) + })? } } diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e3402dfa97..0288a98ebe 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -85,12 +85,12 @@ impl DataFusionEngine { ) .await?; - // Create a test namespace for INSERT INTO tests + // Create a test namespace let namespace = NamespaceIdent::new("default".to_string()); catalog.create_namespace(&namespace, HashMap::new()).await?; - // Create test tables - Self::create_unpartitioned_table(&catalog, &namespace).await?; + // Create partitioned table programmatically (can't be created via SQL yet) + // This table is automatically registered to DataFusion via IcebergCatalogProvider Self::create_partitioned_table(&catalog, &namespace).await?; Ok(Arc::new( @@ -98,35 +98,10 @@ impl DataFusionEngine { )) } - /// Create an unpartitioned test table with id and name columns - /// TODO: this can be removed when we support CREATE TABLE - async fn create_unpartitioned_table( - catalog: &impl Catalog, - namespace: &NamespaceIdent, - ) -> anyhow::Result<()> { - let schema = Schema::builder() - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; - - catalog - .create_table( - namespace, - TableCreation::builder() - .name("test_unpartitioned_table".to_string()) - .schema(schema) - .build(), - ) - .await?; - - Ok(()) - } - /// Create a partitioned test table with id, category, and value columns /// Partitioned by category using identity transform - /// TODO: this can be removed when we support CREATE TABLE + /// Note: Partitioned tables can't be created via SQL yet, so we create them programmatically. + /// The table is automatically registered to DataFusion via IcebergCatalogProvider. async fn create_partitioned_table( catalog: &impl Catalog, namespace: &NamespaceIdent, diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index df5e638d5a..427e63bd37 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -18,10 +18,26 @@ [engines] df = { type = "datafusion" } +# Step 1: Create tables first +[[steps]] +engine = "df" +slt = "df_test/create_table.slt" + +# Step 2: Verify tables exist [[steps]] engine = "df" slt = "df_test/show_tables.slt" +# Step 3: Insert data and verify (covers both unpartitioned and partitioned tables) [[steps]] engine = "df" slt = "df_test/insert_into.slt" + +# Step 4: Test DROP TABLE +[[steps]] +engine = "df" +slt = "df_test/drop_table.slt" + +# Note: CREATE EXTERNAL TABLE tests are in: +# crates/integrations/datafusion/src/table/table_provider_factory.rs +# They require absolute paths which sqllogictest doesn't support variable substitution for. diff --git a/crates/sqllogictest/testdata/slts/df_test/create_table.slt b/crates/sqllogictest/testdata/slts/df_test/create_table.slt new file mode 100644 index 0000000000..2ab64081f5 --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/create_table.slt @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test CREATE TABLE with simple schema +statement ok +CREATE TABLE default.default.test_create_simple ( + id INT, + name STRING +) + +# Verify table was created and is empty +query IT rowsort +SELECT * FROM default.default.test_create_simple +---- + +# Insert data to verify table works +query I +INSERT INTO default.default.test_create_simple VALUES (1, 'test') +---- +1 + +# Verify the insert worked +query IT rowsort +SELECT * FROM default.default.test_create_simple +---- +1 test + +# Note: CREATE TABLE with PARTITIONED BY is not supported in DataFusion SQL syntax +# Partitioned tables must be created using the Iceberg catalog API directly +# See crates/sqllogictest/src/engine/datafusion.rs for examples diff --git a/crates/sqllogictest/testdata/slts/df_test/drop_table.slt b/crates/sqllogictest/testdata/slts/df_test/drop_table.slt new file mode 100644 index 0000000000..3f82387b04 --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/drop_table.slt @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test DROP TABLE functionality + +# Drop tables created in create_table.slt and used in insert_into.slt +# - test_create_simple: created via CREATE TABLE +# - test_partitioned_table: created programmatically + +# Drop test_create_simple +statement ok +DROP TABLE default.default.test_create_simple + +# Verify test_create_simple no longer exists (should error) +statement error +SELECT * FROM default.default.test_create_simple + +# Drop test_partitioned_table +statement ok +DROP TABLE default.default.test_partitioned_table + +# Verify test_partitioned_table no longer exists (should error) +statement error +SELECT * FROM default.default.test_partitioned_table + +# Verify tables are gone with SHOW TABLES +# Only information_schema tables should remain in the default catalog +query TTTT rowsort +SHOW TABLES +---- +datafusion information_schema columns VIEW +datafusion information_schema df_settings VIEW +datafusion information_schema parameters VIEW +datafusion information_schema routines VIEW +datafusion information_schema schemata VIEW +datafusion information_schema tables VIEW +datafusion information_schema views VIEW +default information_schema columns VIEW +default information_schema df_settings VIEW +default information_schema parameters VIEW +default information_schema routines VIEW +default information_schema schemata VIEW +default information_schema tables VIEW +default information_schema views VIEW + +# Test DROP TABLE IF EXISTS on non-existent table (should not error) +statement ok +DROP TABLE IF EXISTS default.default.test_create_simple diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt index 2ba33afcd1..124ab60458 100644 --- a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt +++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt @@ -15,53 +15,62 @@ # specific language governing permissions and limitations # under the License. -# Verify the table is initially empty -query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table ----- +# ============================================ +# Test INSERT INTO for unpartitioned table +# Uses test_create_simple created by create_table.slt +# ============================================ -# Insert a single row and verify the count +# Note: test_create_simple already has (1, 'test') from create_table.slt +# Insert more rows query I -INSERT INTO default.default.test_unpartitioned_table VALUES (1, 'Alice') +INSERT INTO default.default.test_create_simple VALUES (2, 'Alice') ---- 1 # Verify the inserted row query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table +SELECT * FROM default.default.test_create_simple ---- -1 Alice +1 test +2 Alice # Insert multiple rows and verify the count query I -INSERT INTO default.default.test_unpartitioned_table VALUES (2, 'Bob'), (3, 'Charlie') +INSERT INTO default.default.test_create_simple VALUES (3, 'Bob'), (4, 'Charlie') ---- 2 # Verify all rows query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table +SELECT * FROM default.default.test_create_simple ---- -1 Alice -2 Bob -3 Charlie +1 test +2 Alice +3 Bob +4 Charlie # Insert with NULL value and verify the count query I -INSERT INTO default.default.test_unpartitioned_table VALUES (4, NULL) +INSERT INTO default.default.test_create_simple VALUES (5, NULL) ---- 1 # Verify NULL handling query IT rowsort -SELECT * FROM default.default.test_unpartitioned_table +SELECT * FROM default.default.test_create_simple ---- -1 Alice -2 Bob -3 Charlie -4 NULL +1 test +2 Alice +3 Bob +4 Charlie +5 NULL + +# ============================================ +# Test INSERT INTO for partitioned table +# Uses test_partitioned_table +# ============================================ -# Test partitioned table - verify initially empty +# Verify partitioned table is initially empty query ITT rowsort SELECT * FROM default.default.test_partitioned_table ---- diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index c5da5f6276..0e671506d7 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -15,6 +15,9 @@ # specific language governing permissions and limitations # under the License. +# Verify tables created in create_table.slt and programmatically in datafusion.rs +# - test_create_simple: created via CREATE TABLE in create_table.slt +# - test_partitioned_table: created programmatically (partitioned tables can't be created via SQL yet) query TTTT rowsort SHOW TABLES ---- @@ -25,12 +28,12 @@ datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW +default default test_create_simple BASE TABLE +default default test_create_simple$manifests BASE TABLE +default default test_create_simple$snapshots BASE TABLE default default test_partitioned_table BASE TABLE default default test_partitioned_table$manifests BASE TABLE default default test_partitioned_table$snapshots BASE TABLE -default default test_unpartitioned_table BASE TABLE -default default test_unpartitioned_table$manifests BASE TABLE -default default test_unpartitioned_table$snapshots BASE TABLE default information_schema columns VIEW default information_schema df_settings VIEW default information_schema parameters VIEW