Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 155 additions & 30 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>

#include <chrono>
#include <iostream>
Expand Down Expand Up @@ -140,6 +143,18 @@ int main() {
std::cout << "Row acknowledged by server" << std::endl;
}

// Append a row with all fields null (matches Rust log_table.rs all_supported_datatypes)
{
fluss::GenericRow row;
size_t field_count = 8;
for (size_t i = 0; i < field_count; ++i) {
row.SetNull(i);
}
check("append_null_row", writer.Append(row));
}
check("flush_null", writer.Flush());
std::cout << "Wrote row with all fields null" << std::endl;

// 6) Full scan — verify all column types including temporal
fluss::LogScanner scanner;
check("new_log_scanner", table.NewScan().CreateLogScanner(scanner));
Expand All @@ -155,41 +170,63 @@ int main() {

std::cout << "Scanned records: " << records.Size() << std::endl;
bool scan_ok = true;
for (const auto& rec : records.records) {
if (rec.row.GetType(4) != fluss::DatumType::Date) {
bool found_null_row = false;
for (const auto& rec : records) {
// Check if this is the all-null row (matches Rust: is_null_at for every column)
if (rec.row.IsNull(0)) {
found_null_row = true;
for (size_t i = 0; i < rec.row.FieldCount(); ++i) {
if (!rec.row.IsNull(i)) {
std::cerr << "ERROR: column " << i << " should be null" << std::endl;
scan_ok = false;
}
}
std::cout << " [null row] all " << rec.row.FieldCount() << " fields are null"
<< std::endl;
continue;
}

// Non-null rows: verify types
if (rec.row.GetType(4) != fluss::TypeId::Date) {
std::cerr << "ERROR: field 4 expected Date, got "
<< static_cast<int>(rec.row.GetType(4)) << std::endl;
scan_ok = false;
}
if (rec.row.GetType(5) != fluss::DatumType::Time) {
if (rec.row.GetType(5) != fluss::TypeId::Time) {
std::cerr << "ERROR: field 5 expected Time, got "
<< static_cast<int>(rec.row.GetType(5)) << std::endl;
scan_ok = false;
}
if (rec.row.GetType(6) != fluss::DatumType::TimestampNtz) {
std::cerr << "ERROR: field 6 expected TimestampNtz, got "
if (rec.row.GetType(6) != fluss::TypeId::Timestamp) {
std::cerr << "ERROR: field 6 expected Timestamp, got "
<< static_cast<int>(rec.row.GetType(6)) << std::endl;
scan_ok = false;
}
if (rec.row.GetType(7) != fluss::DatumType::TimestampLtz) {
if (rec.row.GetType(7) != fluss::TypeId::TimestampLtz) {
std::cerr << "ERROR: field 7 expected TimestampLtz, got "
<< static_cast<int>(rec.row.GetType(7)) << std::endl;
scan_ok = false;
}

auto date = rec.row.GetDate(4);
auto time = rec.row.GetTime(5);
auto ts_ntz = rec.row.GetTimestamp(6);
auto ts_ltz = rec.row.GetTimestamp(7);
// Name-based getters (equivalent to index-based above)
auto date = rec.row.GetDate("event_date");
auto time = rec.row.GetTime("event_time");
auto ts_ntz = rec.row.GetTimestamp("created_at");
auto ts_ltz = rec.row.GetTimestamp("updated_at");

std::cout << " id=" << rec.row.GetInt32(0) << " name=" << rec.row.GetString(1)
<< " score=" << rec.row.GetFloat32(2) << " age=" << rec.row.GetInt32(3)
std::cout << " id=" << rec.row.GetInt32("id") << " name=" << rec.row.GetString("name")
<< " score=" << rec.row.GetFloat32("score") << " age=" << rec.row.GetInt32("age")
<< " date=" << date.Year() << "-" << date.Month() << "-" << date.Day()
<< " time=" << time.Hour() << ":" << time.Minute() << ":" << time.Second()
<< " ts_ntz=" << ts_ntz.epoch_millis << " ts_ltz=" << ts_ltz.epoch_millis << "+"
<< ts_ltz.nano_of_millisecond << "ns" << std::endl;
}

if (!found_null_row) {
std::cerr << "ERROR: did not find the all-null row" << std::endl;
scan_ok = false;
}

if (!scan_ok) {
std::cerr << "Full scan type verification FAILED!" << std::endl;
std::exit(1);
Expand All @@ -210,18 +247,23 @@ int main() {
check("poll_projected", projected_scanner.Poll(5000, projected_records));

std::cout << "Projected records: " << projected_records.Size() << std::endl;
for (const auto& rec : projected_records.records) {
for (const auto& rec : projected_records) {
if (rec.row.FieldCount() != 2) {
std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl;
scan_ok = false;
continue;
}
if (rec.row.GetType(0) != fluss::DatumType::Int32) {
std::cerr << "ERROR: projected field 0 expected Int32, got "
// Skip the all-null row
if (rec.row.IsNull(0)) {
std::cout << " [null row] skipped" << std::endl;
continue;
}
if (rec.row.GetType(0) != fluss::TypeId::Int) {
std::cerr << "ERROR: projected field 0 expected Int, got "
<< static_cast<int>(rec.row.GetType(0)) << std::endl;
scan_ok = false;
}
if (rec.row.GetType(1) != fluss::DatumType::TimestampLtz) {
if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
std::cerr << "ERROR: projected field 1 expected TimestampLtz, got "
<< static_cast<int>(rec.row.GetType(1)) << std::endl;
scan_ok = false;
Expand All @@ -246,18 +288,23 @@ int main() {
check("poll_name_projected", name_projected_scanner.Poll(5000, name_projected_records));

std::cout << "Name-projected records: " << name_projected_records.Size() << std::endl;
for (const auto& rec : name_projected_records.records) {
for (const auto& rec : name_projected_records) {
if (rec.row.FieldCount() != 2) {
std::cerr << "ERROR: expected 2 fields, got " << rec.row.FieldCount() << std::endl;
scan_ok = false;
continue;
}
if (rec.row.GetType(0) != fluss::DatumType::Int32) {
std::cerr << "ERROR: name-projected field 0 expected Int32, got "
// Skip the all-null row
if (rec.row.IsNull(0)) {
std::cout << " [null row] skipped" << std::endl;
continue;
}
if (rec.row.GetType(0) != fluss::TypeId::Int) {
std::cerr << "ERROR: name-projected field 0 expected Int, got "
<< static_cast<int>(rec.row.GetType(0)) << std::endl;
scan_ok = false;
}
if (rec.row.GetType(1) != fluss::DatumType::TimestampLtz) {
if (rec.row.GetType(1) != fluss::TypeId::TimestampLtz) {
std::cerr << "ERROR: name-projected field 1 expected TimestampLtz, got "
<< static_cast<int>(rec.row.GetType(1)) << std::endl;
scan_ok = false;
Expand Down Expand Up @@ -404,7 +451,84 @@ int main() {
}
}

// 12) Decimal support example
// 12) AppendArrowBatch — write an Arrow RecordBatch directly
std::cout << "\n=== AppendArrowBatch Example ===" << std::endl;
{
// Build an Arrow RecordBatch matching sample_table_cpp_v1 schema:
// id:INT, name:STRING, score:FLOAT, age:INT,
// event_date:DATE, event_time:TIME, created_at:TIMESTAMP, updated_at:TIMESTAMP_LTZ
auto arrow_schema = arrow::schema({
arrow::field("id", arrow::int32()),
arrow::field("name", arrow::utf8()),
arrow::field("score", arrow::float32()),
arrow::field("age", arrow::int32()),
arrow::field("event_date", arrow::date32()),
arrow::field("event_time", arrow::time32(arrow::TimeUnit::MILLI)),
arrow::field("created_at", arrow::timestamp(arrow::TimeUnit::MICRO)),
arrow::field("updated_at", arrow::timestamp(arrow::TimeUnit::MICRO)),
});

arrow::Int32Builder id_builder;
arrow::StringBuilder name_builder;
arrow::FloatBuilder score_builder;
arrow::Int32Builder age_builder;
arrow::Date32Builder date_builder;
arrow::Time32Builder time_builder(arrow::time32(arrow::TimeUnit::MILLI),
arrow::default_memory_pool());
arrow::TimestampBuilder ts_ntz_builder(arrow::timestamp(arrow::TimeUnit::MICRO),
arrow::default_memory_pool());
arrow::TimestampBuilder ts_ltz_builder(arrow::timestamp(arrow::TimeUnit::MICRO),
arrow::default_memory_pool());

// Row 1
(void)id_builder.Append(200);
(void)name_builder.Append("ArrowAlice");
(void)score_builder.Append(88.5f);
(void)age_builder.Append(28);
(void)date_builder.Append(19888); // days since epoch (2024-06-15 ≈ 19888)
(void)time_builder.Append(52245000); // 14:30:45 in ms
(void)ts_ntz_builder.Append(1718467200000000); // micros
(void)ts_ltz_builder.Append(1718467200000000);

// Row 2
(void)id_builder.Append(201);
(void)name_builder.Append("ArrowBob");
(void)score_builder.Append(91.3f);
(void)age_builder.Append(33);
(void)date_builder.Append(20089); // 2025-01-02
(void)time_builder.Append(3600000); // 01:00:00
(void)ts_ntz_builder.Append(1735689600000000);
(void)ts_ltz_builder.Append(1735689600000000);

auto batch_result = arrow::RecordBatch::Make(
arrow_schema, 2,
{id_builder.Finish().ValueOrDie(), name_builder.Finish().ValueOrDie(),
score_builder.Finish().ValueOrDie(), age_builder.Finish().ValueOrDie(),
date_builder.Finish().ValueOrDie(), time_builder.Finish().ValueOrDie(),
ts_ntz_builder.Finish().ValueOrDie(), ts_ltz_builder.Finish().ValueOrDie()});

check("append_arrow_batch", writer.AppendArrowBatch(batch_result));
check("flush_arrow", writer.Flush());
std::cout << "Wrote 2 rows via AppendArrowBatch" << std::endl;

// Verify by scanning from latest offsets
fluss::LogScanner arrow_write_scanner;
check("new_arrow_write_scanner", table.NewScan().CreateLogScanner(arrow_write_scanner));
for (const auto& [bid, off] : latest_offsets) {
check("subscribe_arrow_write", arrow_write_scanner.Subscribe(bid, off));
}

fluss::ScanRecords arrow_write_records;
check("poll_arrow_write", arrow_write_scanner.Poll(5000, arrow_write_records));
std::cout << "Scanned " << arrow_write_records.Size()
<< " records written via AppendArrowBatch:" << std::endl;
for (const auto& rec : arrow_write_records) {
std::cout << " id=" << rec.row.GetInt32(0) << " name=" << rec.row.GetString(1)
<< " score=" << rec.row.GetFloat32(2) << std::endl;
}
}

// 13) Decimal support example
std::cout << "\n=== Decimal Support Example ===" << std::endl;

fluss::TablePath decimal_table_path("fluss", "decimal_table_cpp_v1");
Expand Down Expand Up @@ -469,12 +593,12 @@ int main() {

std::cout << "Scanned decimal records: " << decimal_records.Size() << std::endl;
for (const auto& rec : decimal_records) {
std::cout << " id=" << rec.row.GetInt32(0) << " price=" << rec.row.DecimalToString(1)
<< " amount=" << rec.row.DecimalToString(2)
std::cout << " id=" << rec.row.GetInt32(0) << " price=" << rec.row.GetDecimalString(1)
<< " amount=" << rec.row.GetDecimalString(2)
<< " is_decimal=" << rec.row.IsDecimal(1) << std::endl;
}

// 13) Partitioned table example
// 14) Partitioned table example
std::cout << "\n=== Partitioned Table Example ===" << std::endl;

fluss::TablePath partitioned_table_path("fluss", "partitioned_table_cpp_v1");
Expand Down Expand Up @@ -546,7 +670,7 @@ int main() {
check("flush_partitioned", partitioned_writer.Flush());
std::cout << "Wrote " << partitioned_rows.size() << " rows to partitioned table" << std::endl;

// 13.1) subscribe_partition_buckets: subscribe to each partition individually
// 14.1) subscribe_partition_buckets: subscribe to each partition individually
std::cout << "\n--- Testing SubscribePartitionBuckets ---" << std::endl;
fluss::LogScanner partition_scanner;
check("new_partition_scanner", partitioned_table.NewScan().CreateLogScanner(partition_scanner));
Expand All @@ -563,12 +687,13 @@ int main() {
<< std::endl;
for (size_t i = 0; i < partition_records.Size(); ++i) {
const auto& rec = partition_records[i];
std::cout << " Record " << i << ": id=" << rec.row.GetInt32(0)
<< ", region=" << rec.row.GetString(1) << ", value=" << rec.row.GetInt64(2)
<< std::endl;
std::cout << " Record " << i << ": partition_id="
<< (rec.partition_id.has_value() ? std::to_string(*rec.partition_id) : "none")
<< ", id=" << rec.row.GetInt32(0) << ", region=" << rec.row.GetString(1)
<< ", value=" << rec.row.GetInt64(2) << std::endl;
}

// 13.2) subscribe_partition_buckets: batch subscribe to all partitions at once
// 14.2) subscribe_partition_buckets: batch subscribe to all partitions at once
std::cout << "\n--- Testing SubscribePartitionBuckets (batch) ---" << std::endl;
fluss::LogScanner partition_batch_scanner;
check("new_partition_batch_scanner",
Expand All @@ -594,7 +719,7 @@ int main() {
<< std::endl;
}

// 13.3) UnsubscribePartition: unsubscribe from one partition, verify remaining
// 14.3) UnsubscribePartition: unsubscribe from one partition, verify remaining
std::cout << "\n--- Testing UnsubscribePartition ---" << std::endl;
fluss::LogScanner unsub_partition_scanner;
check("new_unsub_partition_scanner",
Expand Down
Loading
Loading