diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp index 47087e54..59f1ed03 100644 --- a/bindings/cpp/examples/example.cpp +++ b/bindings/cpp/examples/example.cpp @@ -79,7 +79,7 @@ int main() { // 5) Write rows with scalar and temporal values fluss::AppendWriter writer; - check("new_append_writer", table.NewAppendWriter(writer)); + check("new_append_writer", table.NewAppend().CreateWriter(writer)); struct RowData { int id; @@ -423,7 +423,7 @@ int main() { check("get_decimal_table", conn.GetTable(decimal_table_path, decimal_table)); fluss::AppendWriter decimal_writer; - check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer)); + check("new_decimal_writer", decimal_table.NewAppend().CreateWriter(decimal_writer)); // Just provide the value — Rust resolves (p,s) from schema { @@ -512,7 +512,7 @@ int main() { check("get_partitioned_table", conn.GetTable(partitioned_table_path, partitioned_table)); fluss::AppendWriter partitioned_writer; - check("new_partitioned_writer", partitioned_table.NewAppendWriter(partitioned_writer)); + check("new_partitioned_writer", partitioned_table.NewAppend().CreateWriter(partitioned_writer)); struct PartitionedRow { int id; diff --git a/bindings/cpp/examples/kv_example.cpp b/bindings/cpp/examples/kv_example.cpp index daebfb26..2a40db3e 100644 --- a/bindings/cpp/examples/kv_example.cpp +++ b/bindings/cpp/examples/kv_example.cpp @@ -76,7 +76,7 @@ int main() { // - Set("last_seen", ts) auto-routes to SetTimestampLtz (schema-aware) std::cout << "\n--- Upsert Rows ---" << std::endl; fluss::UpsertWriter upsert_writer; - check("new_upsert_writer", kv_table.NewUpsertWriter(upsert_writer)); + check("new_upsert_writer", kv_table.NewUpsert().CreateWriter(upsert_writer)); // Fire-and-forget upserts { @@ -130,7 +130,7 @@ int main() { // 4) Lookup by primary key — verify all types round-trip std::cout << "\n--- Lookup by Primary Key ---" << std::endl; fluss::Lookuper lookuper; - check("new_lookuper", kv_table.NewLookuper(lookuper)); + check("new_lookuper", kv_table.NewLookup().CreateLookuper(lookuper)); // Lookup existing key { @@ -242,9 +242,9 @@ int main() { // 7) Partial update by column names std::cout << "\n--- Partial Update by Column Names ---" << std::endl; fluss::UpsertWriter partial_writer; - check("new_partial_upsert_writer", - kv_table.NewUpsertWriter(partial_writer, - std::vector{"user_id", "balance", "last_seen"})); + check("new_partial_upsert_writer", kv_table.NewUpsert() + .PartialUpdateByName({"user_id", "balance", "last_seen"}) + .CreateWriter(partial_writer)); { auto row = kv_table.NewRow(); @@ -282,7 +282,7 @@ int main() { fluss::UpsertWriter partial_writer_idx; // Columns: 0=user_id (PK), 1=name — update name only check("new_partial_upsert_writer_idx", - kv_table.NewUpsertWriter(partial_writer_idx, std::vector{0, 1})); + kv_table.NewUpsert().PartialUpdateByIndex({0, 1}).CreateWriter(partial_writer_idx)); { // Index-based setters: lighter than name-based, useful for hot paths @@ -321,37 +321,39 @@ int main() { // 9) Partitioned KV table std::cout << "\n--- Partitioned KV Table ---" << std::endl; - fluss::TablePath part_kv_path("fluss", "partitioned_kv_cpp_v1"); - admin.DropTable(part_kv_path, true); - - auto part_kv_schema = fluss::Schema::NewBuilder() - .AddColumn("region", fluss::DataType::String()) - .AddColumn("user_id", fluss::DataType::Int()) - .AddColumn("name", fluss::DataType::String()) - .AddColumn("score", fluss::DataType::BigInt()) - .SetPrimaryKeys({"region", "user_id"}) - .Build(); - - auto part_kv_descriptor = fluss::TableDescriptor::NewBuilder() - .SetSchema(part_kv_schema) - .SetPartitionKeys({"region"}) - .SetComment("partitioned kv table example") - .Build(); - - check("create_part_kv", admin.CreateTable(part_kv_path, part_kv_descriptor, false)); + fluss::TablePath partitioned_kv_path("fluss", "partitioned_kv_cpp_v1"); + admin.DropTable(partitioned_kv_path, true); + + auto partitioned_kv_schema = fluss::Schema::NewBuilder() + .AddColumn("region", fluss::DataType::String()) + .AddColumn("user_id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"region", "user_id"}) + .Build(); + + auto partitioned_kv_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(partitioned_kv_schema) + .SetPartitionKeys({"region"}) + .SetComment("partitioned kv table example") + .Build(); + + check("create_partitioned_kv", + admin.CreateTable(partitioned_kv_path, partitioned_kv_descriptor, false)); std::cout << "Created partitioned KV table" << std::endl; // Create partitions - check("create_US", admin.CreatePartition(part_kv_path, {{"region", "US"}})); - check("create_EU", admin.CreatePartition(part_kv_path, {{"region", "EU"}})); - check("create_APAC", admin.CreatePartition(part_kv_path, {{"region", "APAC"}})); + check("create_US", admin.CreatePartition(partitioned_kv_path, {{"region", "US"}})); + check("create_EU", admin.CreatePartition(partitioned_kv_path, {{"region", "EU"}})); + check("create_APAC", admin.CreatePartition(partitioned_kv_path, {{"region", "APAC"}})); std::cout << "Created partitions: US, EU, APAC" << std::endl; - fluss::Table part_kv_table; - check("get_part_kv_table", conn.GetTable(part_kv_path, part_kv_table)); + fluss::Table partitioned_kv_table; + check("get_partitioned_kv_table", conn.GetTable(partitioned_kv_path, partitioned_kv_table)); - fluss::UpsertWriter part_writer; - check("new_part_writer", part_kv_table.NewUpsertWriter(part_writer)); + fluss::UpsertWriter partitioned_writer; + check("new_partitioned_writer", + partitioned_kv_table.NewUpsert().CreateWriter(partitioned_writer)); // Upsert rows across partitions struct TestRow { @@ -366,28 +368,29 @@ int main() { }; for (const auto& td : test_data) { - auto row = part_kv_table.NewRow(); + auto row = partitioned_kv_table.NewRow(); row.Set("region", td.region); row.Set("user_id", td.user_id); row.Set("name", td.name); row.Set("score", td.score); - check("part_upsert", part_writer.Upsert(row)); + check("partitioned_upsert", partitioned_writer.Upsert(row)); } - check("part_flush", part_writer.Flush()); + check("partitioned_flush", partitioned_writer.Flush()); std::cout << "Upserted 5 rows across 3 partitions" << std::endl; // Lookup all rows - fluss::Lookuper part_lookuper; - check("new_part_lookuper", part_kv_table.NewLookuper(part_lookuper)); + fluss::Lookuper partitioned_lookuper; + check("new_partitioned_lookuper", + partitioned_kv_table.NewLookup().CreateLookuper(partitioned_lookuper)); for (const auto& td : test_data) { - auto pk = part_kv_table.NewRow(); + auto pk = partitioned_kv_table.NewRow(); pk.Set("region", td.region); pk.Set("user_id", td.user_id); bool found = false; fluss::GenericRow result; - check("part_lookup", part_lookuper.Lookup(pk, found, result)); + check("partitioned_lookup", partitioned_lookuper.Lookup(pk, found, result)); if (!found) { std::cerr << "ERROR: Expected to find region=" << td.region << " user_id=" << td.user_id << std::endl; @@ -403,22 +406,22 @@ int main() { // Update within a partition { - auto row = part_kv_table.NewRow(); + auto row = partitioned_kv_table.NewRow(); row.Set("region", "US"); row.Set("user_id", 1); row.Set("name", "Gustave Updated"); row.Set("score", static_cast(999)); fluss::WriteResult wr; - check("part_update", part_writer.Upsert(row, wr)); - check("part_update_wait", wr.Wait()); + check("partitioned_update", partitioned_writer.Upsert(row, wr)); + check("partitioned_update_wait", wr.Wait()); } { - auto pk = part_kv_table.NewRow(); + auto pk = partitioned_kv_table.NewRow(); pk.Set("region", "US"); pk.Set("user_id", 1); bool found = false; fluss::GenericRow result; - check("part_lookup_updated", part_lookuper.Lookup(pk, found, result)); + check("partitioned_lookup_updated", partitioned_lookuper.Lookup(pk, found, result)); if (!found || result.GetString(2) != "Gustave Updated" || result.GetInt64(3) != 999) { std::cerr << "ERROR: Partition update verification failed" << std::endl; std::exit(1); @@ -429,12 +432,12 @@ int main() { // Lookup in non-existent partition { - auto pk = part_kv_table.NewRow(); + auto pk = partitioned_kv_table.NewRow(); pk.Set("region", "UNKNOWN"); pk.Set("user_id", 1); bool found = false; fluss::GenericRow result; - check("part_lookup_unknown", part_lookuper.Lookup(pk, found, result)); + check("partitioned_lookup_unknown", partitioned_lookuper.Lookup(pk, found, result)); if (found) { std::cerr << "ERROR: Expected UNKNOWN partition lookup to return not found" << std::endl; @@ -445,20 +448,20 @@ int main() { // Delete within a partition { - auto pk = part_kv_table.NewRow(); + auto pk = partitioned_kv_table.NewRow(); pk.Set("region", "EU"); pk.Set("user_id", 1); fluss::WriteResult wr; - check("part_delete", part_writer.Delete(pk, wr)); - check("part_delete_wait", wr.Wait()); + check("partitioned_delete", partitioned_writer.Delete(pk, wr)); + check("partitioned_delete_wait", wr.Wait()); } { - auto pk = part_kv_table.NewRow(); + auto pk = partitioned_kv_table.NewRow(); pk.Set("region", "EU"); pk.Set("user_id", 1); bool found = false; fluss::GenericRow result; - check("part_lookup_deleted", part_lookuper.Lookup(pk, found, result)); + check("partitioned_lookup_deleted", partitioned_lookuper.Lookup(pk, found, result)); if (found) { std::cerr << "ERROR: Expected EU/1 to be deleted" << std::endl; std::exit(1); @@ -468,12 +471,12 @@ int main() { // Verify other record in same partition still exists { - auto pk = part_kv_table.NewRow(); + auto pk = partitioned_kv_table.NewRow(); pk.Set("region", "EU"); pk.Set("user_id", 2); bool found = false; fluss::GenericRow result; - check("part_lookup_eu2", part_lookuper.Lookup(pk, found, result)); + check("partitioned_lookup_eu2", partitioned_lookuper.Lookup(pk, found, result)); if (!found || result.GetString(2) != "Maelle") { std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" << std::endl; std::exit(1); @@ -481,7 +484,7 @@ int main() { std::cout << "EU/2 still exists: name=" << result.GetString(2) << std::endl; } - check("drop_part_kv", admin.DropTable(part_kv_path, true)); + check("drop_partitioned_kv", admin.DropTable(partitioned_kv_path, true)); std::cout << "\nKV table example completed successfully!" << std::endl; return 0; diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 41aae670..18066169 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -804,6 +804,9 @@ class WriteResult; class LogScanner; class Admin; class Table; +class TableAppend; +class TableUpsert; +class TableLookup; class TableScan; class Connection { @@ -909,11 +912,9 @@ class Table { GenericRow NewRow() const; - Result NewAppendWriter(AppendWriter& out); - Result NewUpsertWriter(UpsertWriter& out); - Result NewUpsertWriter(UpsertWriter& out, const std::vector& column_names); - Result NewUpsertWriter(UpsertWriter& out, const std::vector& column_indices); - Result NewLookuper(Lookuper& out); + TableAppend NewAppend(); + TableUpsert NewUpsert(); + TableLookup NewLookup(); TableScan NewScan(); TableInfo GetTableInfo() const; @@ -922,6 +923,9 @@ class Table { private: friend class Connection; + friend class TableAppend; + friend class TableUpsert; + friend class TableLookup; friend class TableScan; Table(ffi::Table* table) noexcept; @@ -932,6 +936,61 @@ class Table { mutable std::shared_ptr column_map_; }; +class TableAppend { + public: + TableAppend(const TableAppend&) = delete; + TableAppend& operator=(const TableAppend&) = delete; + TableAppend(TableAppend&&) noexcept = default; + TableAppend& operator=(TableAppend&&) noexcept = default; + + Result CreateWriter(AppendWriter& out); + + private: + friend class Table; + explicit TableAppend(ffi::Table* table) noexcept; + + ffi::Table* table_{nullptr}; +}; + +class TableUpsert { + public: + TableUpsert(const TableUpsert&) = delete; + TableUpsert& operator=(const TableUpsert&) = delete; + TableUpsert(TableUpsert&&) noexcept = default; + TableUpsert& operator=(TableUpsert&&) noexcept = default; + + TableUpsert& PartialUpdateByIndex(std::vector column_indices); + TableUpsert& PartialUpdateByName(std::vector column_names); + + Result CreateWriter(UpsertWriter& out); + + private: + friend class Table; + explicit TableUpsert(ffi::Table* table) noexcept; + + std::vector ResolveNameProjection() const; + + ffi::Table* table_{nullptr}; + std::vector column_indices_; + std::vector column_names_; +}; + +class TableLookup { + public: + TableLookup(const TableLookup&) = delete; + TableLookup& operator=(const TableLookup&) = delete; + TableLookup(TableLookup&&) noexcept = default; + TableLookup& operator=(TableLookup&&) noexcept = default; + + Result CreateLookuper(Lookuper& out); + + private: + friend class Table; + explicit TableLookup(ffi::Table* table) noexcept; + + ffi::Table* table_{nullptr}; +}; + class TableScan { public: TableScan(const TableScan&) = delete; @@ -999,6 +1058,7 @@ class AppendWriter { private: friend class Table; + friend class TableAppend; AppendWriter(ffi::AppendWriter* writer) noexcept; void Destroy() noexcept; @@ -1025,6 +1085,7 @@ class UpsertWriter { private: friend class Table; + friend class TableUpsert; UpsertWriter(ffi::UpsertWriter* writer) noexcept; void Destroy() noexcept; ffi::UpsertWriter* writer_{nullptr}; @@ -1046,6 +1107,7 @@ class Lookuper { private: friend class Table; + friend class TableLookup; Lookuper(ffi::Lookuper* lookuper) noexcept; void Destroy() noexcept; ffi::Lookuper* lookuper_{nullptr}; diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp index 77c95d31..7925256e 100644 --- a/bindings/cpp/src/admin.cpp +++ b/bindings/cpp/src/admin.cpp @@ -17,9 +17,9 @@ * under the License. */ +#include "ffi_converter.hpp" #include "fluss.hpp" #include "lib.rs.h" -#include "ffi_converter.hpp" #include "rust/cxx.h" namespace fluss { @@ -37,9 +37,7 @@ void Admin::Destroy() noexcept { } } -Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) { - other.admin_ = nullptr; -} +Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) { other.admin_ = nullptr; } Admin& Admin::operator=(Admin&& other) noexcept { if (this != &other) { @@ -52,8 +50,7 @@ Admin& Admin::operator=(Admin&& other) noexcept { bool Admin::Available() const { return admin_ != nullptr; } -Result Admin::CreateTable(const TablePath& table_path, - const TableDescriptor& descriptor, +Result Admin::CreateTable(const TablePath& table_path, const TableDescriptor& descriptor, bool ignore_if_exists) { if (!Available()) { return utils::make_error(1, "Admin not available"); @@ -109,17 +106,16 @@ Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& o } // function for common list offsets functionality -Result Admin::DoListOffsets(const TablePath& table_path, - const std::vector& bucket_ids, - const OffsetQuery& offset_query, - std::unordered_map& out, - const std::string* partition_name) { +Result Admin::DoListOffsets(const TablePath& table_path, const std::vector& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map& out, + const std::string* partition_name) { if (!Available()) { return utils::make_error(1, "Admin not available"); } auto ffi_path = utils::to_ffi_table_path(table_path); - + rust::Vec rust_bucket_ids; for (int32_t id : bucket_ids) { rust_bucket_ids.push_back(id); @@ -131,11 +127,12 @@ Result Admin::DoListOffsets(const TablePath& table_path, ffi::FfiListOffsetsResult ffi_result; if (partition_name != nullptr) { - ffi_result = admin_->list_partition_offsets(ffi_path, rust::String(*partition_name), std::move(rust_bucket_ids), ffi_query); + ffi_result = admin_->list_partition_offsets(ffi_path, rust::String(*partition_name), + std::move(rust_bucket_ids), ffi_query); } else { ffi_result = admin_->list_offsets(ffi_path, std::move(rust_bucket_ids), ffi_query); } - + auto result = utils::from_ffi_result(ffi_result.result); if (result.Ok()) { out.clear(); @@ -147,23 +144,20 @@ Result Admin::DoListOffsets(const TablePath& table_path, return result; } -Result Admin::ListOffsets(const TablePath& table_path, - const std::vector& bucket_ids, +Result Admin::ListOffsets(const TablePath& table_path, const std::vector& bucket_ids, const OffsetQuery& offset_query, std::unordered_map& out) { return DoListOffsets(table_path, bucket_ids, offset_query, out); } -Result Admin::ListPartitionOffsets(const TablePath& table_path, - const std::string& partition_name, - const std::vector& bucket_ids, - const OffsetQuery& offset_query, - std::unordered_map& out) { +Result Admin::ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name, + const std::vector& bucket_ids, + const OffsetQuery& offset_query, + std::unordered_map& out) { return DoListOffsets(table_path, bucket_ids, offset_query, out, &partition_name); } -Result Admin::ListPartitionInfos(const TablePath& table_path, - std::vector& out) { +Result Admin::ListPartitionInfos(const TablePath& table_path, std::vector& out) { if (!Available()) { return utils::make_error(1, "Admin not available"); } @@ -221,21 +215,18 @@ Result Admin::DropPartition(const TablePath& table_path, rust_spec.push_back(std::move(kv)); } - auto ffi_result = - admin_->drop_partition(ffi_path, std::move(rust_spec), ignore_if_not_exists); + auto ffi_result = admin_->drop_partition(ffi_path, std::move(rust_spec), ignore_if_not_exists); return utils::from_ffi_result(ffi_result); } -Result Admin::CreateDatabase(const std::string& database_name, - const DatabaseDescriptor& descriptor, +Result Admin::CreateDatabase(const std::string& database_name, const DatabaseDescriptor& descriptor, bool ignore_if_exists) { if (!Available()) { return utils::make_error(1, "Admin not available"); } auto ffi_desc = utils::to_ffi_database_descriptor(descriptor); - auto ffi_result = - admin_->create_database(rust::Str(database_name), ffi_desc, ignore_if_exists); + auto ffi_result = admin_->create_database(rust::Str(database_name), ffi_desc, ignore_if_exists); return utils::from_ffi_result(ffi_result); } diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp index ea884cdb..4fbfafb1 100644 --- a/bindings/cpp/src/connection.cpp +++ b/bindings/cpp/src/connection.cpp @@ -17,9 +17,9 @@ * under the License. */ +#include "ffi_converter.hpp" #include "fluss.hpp" #include "lib.rs.h" -#include "ffi_converter.hpp" #include "rust/cxx.h" namespace fluss { @@ -35,9 +35,7 @@ void Connection::Destroy() noexcept { } } -Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) { - other.conn_ = nullptr; -} +Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) { other.conn_ = nullptr; } Connection& Connection::operator=(Connection&& other) noexcept { if (this != &other) { diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index ee7f1d8f..9b1b5ef8 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -325,25 +325,15 @@ mod ffi { // Table unsafe fn delete_table(table: *mut Table); fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>; - fn new_log_scanner(self: &Table) -> Result<*mut LogScanner>; - fn new_log_scanner_with_projection( - self: &Table, - column_indices: Vec, - ) -> Result<*mut LogScanner>; - fn new_record_batch_log_scanner(self: &Table) -> Result<*mut LogScanner>; - fn new_record_batch_log_scanner_with_projection( + fn create_scanner( self: &Table, column_indices: Vec, + batch: bool, ) -> Result<*mut LogScanner>; fn get_table_info_from_table(self: &Table) -> FfiTableInfo; fn get_table_path(self: &Table) -> FfiTablePath; fn has_primary_key(self: &Table) -> bool; - fn new_upsert_writer(self: &Table) -> Result<*mut UpsertWriter>; - fn new_upsert_writer_with_column_names( - self: &Table, - column_names: Vec, - ) -> Result<*mut UpsertWriter>; - fn new_upsert_writer_with_column_indices( + fn create_upsert_writer( self: &Table, column_indices: Vec, ) -> Result<*mut UpsertWriter>; @@ -919,153 +909,86 @@ unsafe fn delete_table(table: *mut Table) { } impl Table { - fn new_append_writer(&self) -> Result<*mut AppendWriter, String> { - let _enter = RUNTIME.enter(); - - let fluss_table = fcore::client::FlussTable::new( + fn fluss_table(&self) -> fcore::client::FlussTable<'_> { + fcore::client::FlussTable::new( &self.connection, self.metadata.clone(), self.table_info.clone(), - ); - - let table_append = match fluss_table.new_append() { - Ok(a) => a, - Err(e) => return Err(format!("Failed to create append: {e}")), - }; - - let writer = match table_append.create_writer() { - Ok(w) => w, - Err(e) => return Err(format!("Failed to create writer: {e}")), - }; - let writer = Box::into_raw(Box::new(AppendWriter { - inner: writer, - table_info: self.table_info.clone(), - })); - Ok(writer) + ) } - fn new_log_scanner(&self) -> Result<*mut LogScanner, String> { - RUNTIME.block_on(async { - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let scanner = fluss_table - .new_scan() - .create_log_scanner() - .map_err(|e| format!("Failed to create log scanner: {e}"))?; - - let scanner_ptr = Box::into_raw(Box::new(LogScanner { - inner: Some(scanner), - inner_batch: None, - projected_columns: self.table_info.get_schema().columns().to_vec(), - })); - - Ok(scanner_ptr) - }) - } - - fn new_log_scanner_with_projection( + fn resolve_projected_columns( &self, - column_indices: Vec, - ) -> Result<*mut LogScanner, String> { - RUNTIME.block_on(async { - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let all_columns = self.table_info.get_schema().columns(); - let projected_columns: Vec<_> = column_indices - .iter() - .map(|&i| { - all_columns.get(i).cloned().ok_or_else(|| { - format!( - "Invalid column index {i}: schema has {} columns", - all_columns.len() - ) - }) + indices: &[usize], + ) -> Result, String> { + let all_columns = self.table_info.get_schema().columns(); + indices + .iter() + .map(|&i| { + all_columns.get(i).cloned().ok_or_else(|| { + format!( + "Invalid column index {i}: schema has {} columns", + all_columns.len() + ) }) - .collect::>()?; - - let log_scanner = fluss_table - .new_scan() - .project(&column_indices) - .map_err(|e| format!("Failed to project columns: {e}"))? - .create_log_scanner() - .map_err(|e| format!("Failed to create log scanner: {e}"))?; - - let scanner = Box::into_raw(Box::new(LogScanner { - inner: Some(log_scanner), - inner_batch: None, - projected_columns, - })); - Ok(scanner) - }) + }) + .collect() } - fn new_record_batch_log_scanner(&self) -> Result<*mut LogScanner, String> { - RUNTIME.block_on(async { - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let batch_scanner = fluss_table - .new_scan() - .create_record_batch_log_scanner() - .map_err(|e| format!("Failed to create record batch log scanner: {e}"))?; - - let scanner = Box::into_raw(Box::new(LogScanner { - inner: None, - inner_batch: Some(batch_scanner), - projected_columns: self.table_info.get_schema().columns().to_vec(), - })); - Ok(scanner) - }) + fn new_append_writer(&self) -> Result<*mut AppendWriter, String> { + let _enter = RUNTIME.enter(); + + let table_append = self + .fluss_table() + .new_append() + .map_err(|e| format!("Failed to create append: {e}"))?; + + let writer = table_append + .create_writer() + .map_err(|e| format!("Failed to create writer: {e}"))?; + + Ok(Box::into_raw(Box::new(AppendWriter { + inner: writer, + table_info: self.table_info.clone(), + }))) } - fn new_record_batch_log_scanner_with_projection( + fn create_scanner( &self, column_indices: Vec, + batch: bool, ) -> Result<*mut LogScanner, String> { RUNTIME.block_on(async { - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let all_columns = self.table_info.get_schema().columns(); - let projected_columns: Vec<_> = column_indices - .iter() - .map(|&i| { - all_columns.get(i).cloned().ok_or_else(|| { - format!( - "Invalid column index {i}: schema has {} columns", - all_columns.len() - ) - }) - }) - .collect::>()?; - - let batch_scanner = fluss_table - .new_scan() - .project(&column_indices) - .map_err(|e| format!("Failed to project columns: {e}"))? - .create_record_batch_log_scanner() - .map_err(|e| format!("Failed to create record batch log scanner: {e}"))?; - - let scanner = Box::into_raw(Box::new(LogScanner { - inner: None, - inner_batch: Some(batch_scanner), + let fluss_table = self.fluss_table(); + let scan = fluss_table.new_scan(); + + let (projected_columns, scan) = if column_indices.is_empty() { + (self.table_info.get_schema().columns().to_vec(), scan) + } else { + let cols = self.resolve_projected_columns(&column_indices)?; + let scan = scan + .project(&column_indices) + .map_err(|e| format!("Failed to project columns: {e}"))?; + (cols, scan) + }; + + let (inner, inner_batch) = if batch { + let batch_scanner = scan + .create_record_batch_log_scanner() + .map_err(|e| format!("Failed to create record batch log scanner: {e}"))?; + (None, Some(batch_scanner)) + } else { + let log_scanner = scan + .create_log_scanner() + .map_err(|e| format!("Failed to create log scanner: {e}"))?; + (Some(log_scanner), None) + }; + + Ok(Box::into_raw(Box::new(LogScanner { + inner, + inner_batch, projected_columns, - })); - Ok(scanner) + }))) }) } @@ -1084,79 +1007,24 @@ impl Table { self.has_pk } - fn new_upsert_writer(&self) -> Result<*mut UpsertWriter, String> { - let _enter = RUNTIME.enter(); - - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let table_upsert = fluss_table - .new_upsert() - .map_err(|e| format!("Failed to create upsert: {e}"))?; - - let writer = table_upsert - .create_writer() - .map_err(|e| format!("Failed to create upsert writer: {e}"))?; - - Ok(Box::into_raw(Box::new(UpsertWriter { - inner: writer, - table_info: self.table_info.clone(), - }))) - } - - fn new_upsert_writer_with_column_names( - &self, - column_names: Vec, - ) -> Result<*mut UpsertWriter, String> { - let _enter = RUNTIME.enter(); - - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let table_upsert = fluss_table - .new_upsert() - .map_err(|e| format!("Failed to create upsert: {e}"))?; - - let col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect(); - let table_upsert = table_upsert - .partial_update_with_column_names(&col_refs) - .map_err(|e| format!("Failed to set partial update columns: {e}"))?; - - let writer = table_upsert - .create_writer() - .map_err(|e| format!("Failed to create upsert writer: {e}"))?; - - Ok(Box::into_raw(Box::new(UpsertWriter { - inner: writer, - table_info: self.table_info.clone(), - }))) - } - - fn new_upsert_writer_with_column_indices( + fn create_upsert_writer( &self, column_indices: Vec, ) -> Result<*mut UpsertWriter, String> { let _enter = RUNTIME.enter(); - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let table_upsert = fluss_table + let table_upsert = self + .fluss_table() .new_upsert() .map_err(|e| format!("Failed to create upsert: {e}"))?; - let table_upsert = table_upsert - .partial_update(Some(column_indices)) - .map_err(|e| format!("Failed to set partial update columns: {e}"))?; + let table_upsert = if column_indices.is_empty() { + table_upsert + } else { + table_upsert + .partial_update(Some(column_indices)) + .map_err(|e| format!("Failed to set partial update columns: {e}"))? + }; let writer = table_upsert .create_writer() @@ -1171,13 +1039,8 @@ impl Table { fn new_lookuper(&self) -> Result<*mut Lookuper, String> { let _enter = RUNTIME.enter(); - let fluss_table = fcore::client::FlussTable::new( - &self.connection, - self.metadata.clone(), - self.table_info.clone(), - ); - - let table_lookup = fluss_table + let table_lookup = self + .fluss_table() .new_lookup() .map_err(|e| format!("Failed to create lookup: {e}"))?; diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp index a2663630..da4dc306 100644 --- a/bindings/cpp/src/table.cpp +++ b/bindings/cpp/src/table.cpp @@ -109,8 +109,19 @@ Table& Table::operator=(Table&& other) noexcept { bool Table::Available() const { return table_ != nullptr; } -Result Table::NewAppendWriter(AppendWriter& out) { - if (!Available()) { +TableAppend Table::NewAppend() { return TableAppend(table_); } + +TableUpsert Table::NewUpsert() { return TableUpsert(table_); } + +TableLookup Table::NewLookup() { return TableLookup(table_); } + +TableScan Table::NewScan() { return TableScan(table_); } + +// TableAppend implementation +TableAppend::TableAppend(ffi::Table* table) noexcept : table_(table) {} + +Result TableAppend::CreateWriter(AppendWriter& out) { + if (table_ == nullptr) { return utils::make_error(1, "Table not available"); } @@ -124,7 +135,86 @@ Result Table::NewAppendWriter(AppendWriter& out) { } } -TableScan Table::NewScan() { return TableScan(table_); } +// TableUpsert implementation +TableUpsert::TableUpsert(ffi::Table* table) noexcept : table_(table) {} + +TableUpsert& TableUpsert::PartialUpdateByIndex(std::vector column_indices) { + if (column_indices.empty()) { + throw std::invalid_argument("PartialUpdateByIndex requires at least one column"); + } + column_indices_ = std::move(column_indices); + column_names_.clear(); + return *this; +} + +TableUpsert& TableUpsert::PartialUpdateByName(std::vector column_names) { + if (column_names.empty()) { + throw std::invalid_argument("PartialUpdateByName requires at least one column"); + } + column_names_ = std::move(column_names); + column_indices_.clear(); + return *this; +} + +std::vector TableUpsert::ResolveNameProjection() const { + auto ffi_info = table_->get_table_info_from_table(); + const auto& columns = ffi_info.schema.columns; + + std::vector indices; + for (const auto& name : column_names_) { + bool found = false; + for (size_t i = 0; i < columns.size(); ++i) { + if (std::string(columns[i].name) == name) { + indices.push_back(i); + found = true; + break; + } + } + if (!found) { + throw std::runtime_error("Column '" + name + "' not found"); + } + } + return indices; +} + +Result TableUpsert::CreateWriter(UpsertWriter& out) { + if (table_ == nullptr) { + return utils::make_error(1, "Table not available"); + } + + try { + auto resolved_indices = !column_names_.empty() ? ResolveNameProjection() : column_indices_; + + rust::Vec rust_indices; + for (size_t idx : resolved_indices) { + rust_indices.push_back(idx); + } + out = UpsertWriter(table_->create_upsert_writer(std::move(rust_indices))); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +// TableLookup implementation +TableLookup::TableLookup(ffi::Table* table) noexcept : table_(table) {} + +Result TableLookup::CreateLookuper(Lookuper& out) { + if (table_ == nullptr) { + return utils::make_error(1, "Table not available"); + } + + try { + out = Lookuper(table_->new_lookuper()); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} // TableScan implementation TableScan::TableScan(ffi::Table* table) noexcept : table_(table) {} @@ -169,15 +259,11 @@ Result TableScan::CreateLogScanner(LogScanner& out) { try { auto resolved_indices = !name_projection_.empty() ? ResolveNameProjection() : projection_; - if (!resolved_indices.empty()) { - rust::Vec rust_indices; - for (size_t idx : resolved_indices) { - rust_indices.push_back(idx); - } - out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices)); - } else { - out.scanner_ = table_->new_log_scanner(); + rust::Vec rust_indices; + for (size_t idx : resolved_indices) { + rust_indices.push_back(idx); } + out.scanner_ = table_->create_scanner(std::move(rust_indices), false); return utils::make_ok(); } catch (const rust::Error& e) { return utils::make_error(1, e.what()); @@ -193,16 +279,11 @@ Result TableScan::CreateRecordBatchScanner(LogScanner& out) { try { auto resolved_indices = !name_projection_.empty() ? ResolveNameProjection() : projection_; - if (!resolved_indices.empty()) { - rust::Vec rust_indices; - for (size_t idx : resolved_indices) { - rust_indices.push_back(idx); - } - out.scanner_ = - table_->new_record_batch_log_scanner_with_projection(std::move(rust_indices)); - } else { - out.scanner_ = table_->new_record_batch_log_scanner(); + rust::Vec rust_indices; + for (size_t idx : resolved_indices) { + rust_indices.push_back(idx); } + out.scanner_ = table_->create_scanner(std::move(rust_indices), true); return utils::make_ok(); } catch (const rust::Error& e) { return utils::make_error(1, e.what()); @@ -489,75 +570,6 @@ Result Lookuper::Lookup(const GenericRow& pk_row, bool& found, GenericRow& out) } } -// Table KV methods -Result Table::NewUpsertWriter(UpsertWriter& out) { - if (!Available()) { - return utils::make_error(1, "Table not available"); - } - - try { - out = UpsertWriter(table_->new_upsert_writer()); - return utils::make_ok(); - } catch (const rust::Error& e) { - return utils::make_error(1, e.what()); - } catch (const std::exception& e) { - return utils::make_error(1, e.what()); - } -} - -Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector& column_names) { - if (!Available()) { - return utils::make_error(1, "Table not available"); - } - - try { - rust::Vec rust_names; - for (const auto& name : column_names) { - rust_names.push_back(rust::String(name)); - } - out = UpsertWriter(table_->new_upsert_writer_with_column_names(std::move(rust_names))); - return utils::make_ok(); - } catch (const rust::Error& e) { - return utils::make_error(1, e.what()); - } catch (const std::exception& e) { - return utils::make_error(1, e.what()); - } -} - -Result Table::NewUpsertWriter(UpsertWriter& out, const std::vector& column_indices) { - if (!Available()) { - return utils::make_error(1, "Table not available"); - } - - try { - rust::Vec rust_indices; - for (size_t idx : column_indices) { - rust_indices.push_back(idx); - } - out = UpsertWriter(table_->new_upsert_writer_with_column_indices(std::move(rust_indices))); - return utils::make_ok(); - } catch (const rust::Error& e) { - return utils::make_error(1, e.what()); - } catch (const std::exception& e) { - return utils::make_error(1, e.what()); - } -} - -Result Table::NewLookuper(Lookuper& out) { - if (!Available()) { - return utils::make_error(1, "Table not available"); - } - - try { - out = Lookuper(table_->new_lookuper()); - return utils::make_ok(); - } catch (const rust::Error& e) { - return utils::make_error(1, e.what()); - } catch (const std::exception& e) { - return utils::make_error(1, e.what()); - } -} - // LogScanner implementation LogScanner::LogScanner() noexcept = default;