Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ int main() {

// 5) Write rows with scalar and temporal values
fluss::AppendWriter writer;
check("new_append_writer", table.NewAppendWriter(writer));
check("new_append_writer", table.NewAppend().CreateWriter(writer));

struct RowData {
int id;
Expand Down Expand Up @@ -423,7 +423,7 @@ int main() {
check("get_decimal_table", conn.GetTable(decimal_table_path, decimal_table));

fluss::AppendWriter decimal_writer;
check("new_decimal_writer", decimal_table.NewAppendWriter(decimal_writer));
check("new_decimal_writer", decimal_table.NewAppend().CreateWriter(decimal_writer));

// Just provide the value — Rust resolves (p,s) from schema
{
Expand Down Expand Up @@ -512,7 +512,7 @@ int main() {
check("get_partitioned_table", conn.GetTable(partitioned_table_path, partitioned_table));

fluss::AppendWriter partitioned_writer;
check("new_partitioned_writer", partitioned_table.NewAppendWriter(partitioned_writer));
check("new_partitioned_writer", partitioned_table.NewAppend().CreateWriter(partitioned_writer));

struct PartitionedRow {
int id;
Expand Down
109 changes: 56 additions & 53 deletions bindings/cpp/examples/kv_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ int main() {
// - Set("last_seen", ts) auto-routes to SetTimestampLtz (schema-aware)
std::cout << "\n--- Upsert Rows ---" << std::endl;
fluss::UpsertWriter upsert_writer;
check("new_upsert_writer", kv_table.NewUpsertWriter(upsert_writer));
check("new_upsert_writer", kv_table.NewUpsert().CreateWriter(upsert_writer));

// Fire-and-forget upserts
{
Expand Down Expand Up @@ -130,7 +130,7 @@ int main() {
// 4) Lookup by primary key — verify all types round-trip
std::cout << "\n--- Lookup by Primary Key ---" << std::endl;
fluss::Lookuper lookuper;
check("new_lookuper", kv_table.NewLookuper(lookuper));
check("new_lookuper", kv_table.NewLookup().CreateLookuper(lookuper));

// Lookup existing key
{
Expand Down Expand Up @@ -242,9 +242,9 @@ int main() {
// 7) Partial update by column names
std::cout << "\n--- Partial Update by Column Names ---" << std::endl;
fluss::UpsertWriter partial_writer;
check("new_partial_upsert_writer",
kv_table.NewUpsertWriter(partial_writer,
std::vector<std::string>{"user_id", "balance", "last_seen"}));
check("new_partial_upsert_writer", kv_table.NewUpsert()
.PartialUpdateByName({"user_id", "balance", "last_seen"})
.CreateWriter(partial_writer));

{
auto row = kv_table.NewRow();
Expand Down Expand Up @@ -282,7 +282,7 @@ int main() {
fluss::UpsertWriter partial_writer_idx;
// Columns: 0=user_id (PK), 1=name — update name only
check("new_partial_upsert_writer_idx",
kv_table.NewUpsertWriter(partial_writer_idx, std::vector<size_t>{0, 1}));
kv_table.NewUpsert().PartialUpdateByIndex({0, 1}).CreateWriter(partial_writer_idx));

{
// Index-based setters: lighter than name-based, useful for hot paths
Expand Down Expand Up @@ -321,37 +321,39 @@ int main() {

// 9) Partitioned KV table
std::cout << "\n--- Partitioned KV Table ---" << std::endl;
fluss::TablePath part_kv_path("fluss", "partitioned_kv_cpp_v1");
admin.DropTable(part_kv_path, true);

auto part_kv_schema = fluss::Schema::NewBuilder()
.AddColumn("region", fluss::DataType::String())
.AddColumn("user_id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("score", fluss::DataType::BigInt())
.SetPrimaryKeys({"region", "user_id"})
.Build();

auto part_kv_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(part_kv_schema)
.SetPartitionKeys({"region"})
.SetComment("partitioned kv table example")
.Build();

check("create_part_kv", admin.CreateTable(part_kv_path, part_kv_descriptor, false));
fluss::TablePath partitioned_kv_path("fluss", "partitioned_kv_cpp_v1");
admin.DropTable(partitioned_kv_path, true);

auto partitioned_kv_schema = fluss::Schema::NewBuilder()
.AddColumn("region", fluss::DataType::String())
.AddColumn("user_id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.AddColumn("score", fluss::DataType::BigInt())
.SetPrimaryKeys({"region", "user_id"})
.Build();

auto partitioned_kv_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(partitioned_kv_schema)
.SetPartitionKeys({"region"})
.SetComment("partitioned kv table example")
.Build();

check("create_partitioned_kv",
admin.CreateTable(partitioned_kv_path, partitioned_kv_descriptor, false));
std::cout << "Created partitioned KV table" << std::endl;

// Create partitions
check("create_US", admin.CreatePartition(part_kv_path, {{"region", "US"}}));
check("create_EU", admin.CreatePartition(part_kv_path, {{"region", "EU"}}));
check("create_APAC", admin.CreatePartition(part_kv_path, {{"region", "APAC"}}));
check("create_US", admin.CreatePartition(partitioned_kv_path, {{"region", "US"}}));
check("create_EU", admin.CreatePartition(partitioned_kv_path, {{"region", "EU"}}));
check("create_APAC", admin.CreatePartition(partitioned_kv_path, {{"region", "APAC"}}));
std::cout << "Created partitions: US, EU, APAC" << std::endl;

fluss::Table part_kv_table;
check("get_part_kv_table", conn.GetTable(part_kv_path, part_kv_table));
fluss::Table partitioned_kv_table;
check("get_partitioned_kv_table", conn.GetTable(partitioned_kv_path, partitioned_kv_table));

fluss::UpsertWriter part_writer;
check("new_part_writer", part_kv_table.NewUpsertWriter(part_writer));
fluss::UpsertWriter partitioned_writer;
check("new_partitioned_writer",
partitioned_kv_table.NewUpsert().CreateWriter(partitioned_writer));

// Upsert rows across partitions
struct TestRow {
Expand All @@ -366,28 +368,29 @@ int main() {
};

for (const auto& td : test_data) {
auto row = part_kv_table.NewRow();
auto row = partitioned_kv_table.NewRow();
row.Set("region", td.region);
row.Set("user_id", td.user_id);
row.Set("name", td.name);
row.Set("score", td.score);
check("part_upsert", part_writer.Upsert(row));
check("partitioned_upsert", partitioned_writer.Upsert(row));
}
check("part_flush", part_writer.Flush());
check("partitioned_flush", partitioned_writer.Flush());
std::cout << "Upserted 5 rows across 3 partitions" << std::endl;

// Lookup all rows
fluss::Lookuper part_lookuper;
check("new_part_lookuper", part_kv_table.NewLookuper(part_lookuper));
fluss::Lookuper partitioned_lookuper;
check("new_partitioned_lookuper",
partitioned_kv_table.NewLookup().CreateLookuper(partitioned_lookuper));

for (const auto& td : test_data) {
auto pk = part_kv_table.NewRow();
auto pk = partitioned_kv_table.NewRow();
pk.Set("region", td.region);
pk.Set("user_id", td.user_id);

bool found = false;
fluss::GenericRow result;
check("part_lookup", part_lookuper.Lookup(pk, found, result));
check("partitioned_lookup", partitioned_lookuper.Lookup(pk, found, result));
if (!found) {
std::cerr << "ERROR: Expected to find region=" << td.region << " user_id=" << td.user_id
<< std::endl;
Expand All @@ -403,22 +406,22 @@ int main() {

// Update within a partition
{
auto row = part_kv_table.NewRow();
auto row = partitioned_kv_table.NewRow();
row.Set("region", "US");
row.Set("user_id", 1);
row.Set("name", "Gustave Updated");
row.Set("score", static_cast<int64_t>(999));
fluss::WriteResult wr;
check("part_update", part_writer.Upsert(row, wr));
check("part_update_wait", wr.Wait());
check("partitioned_update", partitioned_writer.Upsert(row, wr));
check("partitioned_update_wait", wr.Wait());
}
{
auto pk = part_kv_table.NewRow();
auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "US");
pk.Set("user_id", 1);
bool found = false;
fluss::GenericRow result;
check("part_lookup_updated", part_lookuper.Lookup(pk, found, result));
check("partitioned_lookup_updated", partitioned_lookuper.Lookup(pk, found, result));
if (!found || result.GetString(2) != "Gustave Updated" || result.GetInt64(3) != 999) {
std::cerr << "ERROR: Partition update verification failed" << std::endl;
std::exit(1);
Expand All @@ -429,12 +432,12 @@ int main() {

// Lookup in non-existent partition
{
auto pk = part_kv_table.NewRow();
auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "UNKNOWN");
pk.Set("user_id", 1);
bool found = false;
fluss::GenericRow result;
check("part_lookup_unknown", part_lookuper.Lookup(pk, found, result));
check("partitioned_lookup_unknown", partitioned_lookuper.Lookup(pk, found, result));
if (found) {
std::cerr << "ERROR: Expected UNKNOWN partition lookup to return not found"
<< std::endl;
Expand All @@ -445,20 +448,20 @@ int main() {

// Delete within a partition
{
auto pk = part_kv_table.NewRow();
auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "EU");
pk.Set("user_id", 1);
fluss::WriteResult wr;
check("part_delete", part_writer.Delete(pk, wr));
check("part_delete_wait", wr.Wait());
check("partitioned_delete", partitioned_writer.Delete(pk, wr));
check("partitioned_delete_wait", wr.Wait());
}
{
auto pk = part_kv_table.NewRow();
auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "EU");
pk.Set("user_id", 1);
bool found = false;
fluss::GenericRow result;
check("part_lookup_deleted", part_lookuper.Lookup(pk, found, result));
check("partitioned_lookup_deleted", partitioned_lookuper.Lookup(pk, found, result));
if (found) {
std::cerr << "ERROR: Expected EU/1 to be deleted" << std::endl;
std::exit(1);
Expand All @@ -468,20 +471,20 @@ int main() {

// Verify other record in same partition still exists
{
auto pk = part_kv_table.NewRow();
auto pk = partitioned_kv_table.NewRow();
pk.Set("region", "EU");
pk.Set("user_id", 2);
bool found = false;
fluss::GenericRow result;
check("part_lookup_eu2", part_lookuper.Lookup(pk, found, result));
check("partitioned_lookup_eu2", partitioned_lookuper.Lookup(pk, found, result));
if (!found || result.GetString(2) != "Maelle") {
std::cerr << "ERROR: Expected EU/2 (Maelle) to still exist" << std::endl;
std::exit(1);
}
std::cout << "EU/2 still exists: name=" << result.GetString(2) << std::endl;
}

check("drop_part_kv", admin.DropTable(part_kv_path, true));
check("drop_partitioned_kv", admin.DropTable(partitioned_kv_path, true));
std::cout << "\nKV table example completed successfully!" << std::endl;

return 0;
Expand Down
72 changes: 67 additions & 5 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,9 @@ class WriteResult;
class LogScanner;
class Admin;
class Table;
class TableAppend;
class TableUpsert;
class TableLookup;
class TableScan;

class Connection {
Expand Down Expand Up @@ -909,11 +912,9 @@ class Table {

GenericRow NewRow() const;

Result NewAppendWriter(AppendWriter& out);
Result NewUpsertWriter(UpsertWriter& out);
Result NewUpsertWriter(UpsertWriter& out, const std::vector<std::string>& column_names);
Result NewUpsertWriter(UpsertWriter& out, const std::vector<size_t>& column_indices);
Result NewLookuper(Lookuper& out);
TableAppend NewAppend();
TableUpsert NewUpsert();
TableLookup NewLookup();
TableScan NewScan();

TableInfo GetTableInfo() const;
Expand All @@ -922,6 +923,9 @@ class Table {

private:
friend class Connection;
friend class TableAppend;
friend class TableUpsert;
friend class TableLookup;
friend class TableScan;
Table(ffi::Table* table) noexcept;

Expand All @@ -932,6 +936,61 @@ class Table {
mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
};

class TableAppend {
public:
TableAppend(const TableAppend&) = delete;
TableAppend& operator=(const TableAppend&) = delete;
TableAppend(TableAppend&&) noexcept = default;
TableAppend& operator=(TableAppend&&) noexcept = default;

Result CreateWriter(AppendWriter& out);

private:
friend class Table;
explicit TableAppend(ffi::Table* table) noexcept;

ffi::Table* table_{nullptr};
};

class TableUpsert {
public:
TableUpsert(const TableUpsert&) = delete;
TableUpsert& operator=(const TableUpsert&) = delete;
TableUpsert(TableUpsert&&) noexcept = default;
TableUpsert& operator=(TableUpsert&&) noexcept = default;

TableUpsert& PartialUpdateByIndex(std::vector<size_t> column_indices);
TableUpsert& PartialUpdateByName(std::vector<std::string> column_names);

Result CreateWriter(UpsertWriter& out);

private:
friend class Table;
explicit TableUpsert(ffi::Table* table) noexcept;

std::vector<size_t> ResolveNameProjection() const;

ffi::Table* table_{nullptr};
std::vector<size_t> column_indices_;
std::vector<std::string> column_names_;
};

class TableLookup {
public:
TableLookup(const TableLookup&) = delete;
TableLookup& operator=(const TableLookup&) = delete;
TableLookup(TableLookup&&) noexcept = default;
TableLookup& operator=(TableLookup&&) noexcept = default;

Result CreateLookuper(Lookuper& out);

private:
friend class Table;
explicit TableLookup(ffi::Table* table) noexcept;

ffi::Table* table_{nullptr};
};

class TableScan {
public:
TableScan(const TableScan&) = delete;
Expand Down Expand Up @@ -999,6 +1058,7 @@ class AppendWriter {

private:
friend class Table;
friend class TableAppend;
AppendWriter(ffi::AppendWriter* writer) noexcept;

void Destroy() noexcept;
Expand All @@ -1025,6 +1085,7 @@ class UpsertWriter {

private:
friend class Table;
friend class TableUpsert;
UpsertWriter(ffi::UpsertWriter* writer) noexcept;
void Destroy() noexcept;
ffi::UpsertWriter* writer_{nullptr};
Expand All @@ -1046,6 +1107,7 @@ class Lookuper {

private:
friend class Table;
friend class TableLookup;
Lookuper(ffi::Lookuper* lookuper) noexcept;
void Destroy() noexcept;
ffi::Lookuper* lookuper_{nullptr};
Expand Down
Loading
Loading