diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9048fb0..20bee877 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -137,3 +137,45 @@ jobs: env: RUST_LOG: DEBUG RUST_BACKTRACE: full + + python-integration-test: + timeout-minutes: 60 + runs-on: ubuntu-latest + strategy: + matrix: + python: ["3.9", "3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Rust Cache + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build Python bindings + working-directory: bindings/python + run: | + uv sync --extra dev + uv run maturin develop + + - name: Run Python integration tests + working-directory: bindings/python + run: uv run pytest test/ -v + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full diff --git a/.gitignore b/.gitignore index 476f84e9..f251aab3 100644 --- a/.gitignore +++ b/.gitignore @@ -28,9 +28,12 @@ __pycache__/ *.egg-info/ dist/ build/ +.venv/ +uv.lock # CPP *CMakeFiles/ +.cache/ # Website (Docusaurus) website/node_modules diff --git a/bindings/python/GENERATED_README.md b/bindings/python/GENERATED_README.md new file mode 100644 index 00000000..0a011ba6 --- /dev/null +++ b/bindings/python/GENERATED_README.md @@ -0,0 +1 @@ +This readme can be automatically generated by generate_readme.py. \ No newline at end of file diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index 0be25a03..f5b0b68d 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -52,6 +52,7 @@ dev = [ "pytest-asyncio>=0.25.3", "ruff>=0.9.10", "maturin>=1.8.2", + "testcontainers>=4.0.0", ] docs = [ "pdoc>=15.0.4", @@ -90,6 +91,10 @@ docstring-code-format = true [tool.ruff.lint.isort] known-first-party = ["fluss"] +[tool.pytest.ini_options] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "session" + [tool.mypy] python_version = "3.9" warn_return_any = true diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py new file mode 100644 index 00000000..fbd7396e --- /dev/null +++ b/bindings/python/test/conftest.py @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Shared fixtures for Fluss Python integration tests. + +If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster. +Otherwise, a Fluss cluster is started automatically via testcontainers. + +Run with: + uv run maturin develop && uv run pytest test/ -v +""" + +import os +import socket +import time + +import pytest +import pytest_asyncio + +import fluss + +FLUSS_VERSION = "0.7.0" +BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS") + + +def _wait_for_port(host, port, timeout=60): + """Wait for a TCP port to become available.""" + start = time.time() + while time.time() - start < timeout: + try: + with socket.create_connection((host, port), timeout=1): + return + except (ConnectionRefusedError, TimeoutError, OSError): + time.sleep(1) + raise TimeoutError(f"Port {port} on {host} not available after {timeout}s") + + +@pytest.fixture(scope="session") +def fluss_cluster(): + """Start a Fluss cluster using testcontainers, or use an existing one.""" + if BOOTSTRAP_SERVERS_ENV: + yield BOOTSTRAP_SERVERS_ENV + return + + from testcontainers.core.container import DockerContainer + from testcontainers.core.network import Network + + network = Network() + network.create() + + zookeeper = ( + DockerContainer("zookeeper:3.9.2") + .with_network(network) + .with_name("zookeeper-python-test") + ) + + coordinator_props = "\n".join([ + "zookeeper.address: zookeeper-python-test:2181", + "bind.listeners: INTERNAL://coordinator-server-python-test:0," + " CLIENT://coordinator-server-python-test:9123", + "advertised.listeners: CLIENT://localhost:9123", + "internal.listener.name: INTERNAL", + "netty.server.num-network-threads: 1", + "netty.server.num-worker-threads: 3", + ]) + coordinator = ( + DockerContainer(f"fluss/fluss:{FLUSS_VERSION}") + .with_network(network) + .with_name("coordinator-server-python-test") + .with_bind_ports(9123, 9123) + .with_command("coordinatorServer") + .with_env("FLUSS_PROPERTIES", coordinator_props) + ) + + tablet_props = "\n".join([ + "zookeeper.address: zookeeper-python-test:2181", + "bind.listeners: INTERNAL://tablet-server-python-test:0," + " CLIENT://tablet-server-python-test:9123", + "advertised.listeners: CLIENT://localhost:9124", + "internal.listener.name: INTERNAL", + "tablet-server.id: 0", + "netty.server.num-network-threads: 1", + "netty.server.num-worker-threads: 3", + ]) + tablet_server = ( + DockerContainer(f"fluss/fluss:{FLUSS_VERSION}") + .with_network(network) + .with_name("tablet-server-python-test") + .with_bind_ports(9123, 9124) + .with_command("tabletServer") + .with_env("FLUSS_PROPERTIES", tablet_props) + ) + + zookeeper.start() + coordinator.start() + tablet_server.start() + + _wait_for_port("localhost", 9123) + _wait_for_port("localhost", 9124) + # Extra wait for cluster to fully initialize + time.sleep(10) + + yield "127.0.0.1:9123" + + tablet_server.stop() + coordinator.stop() + zookeeper.stop() + network.remove() + + +@pytest_asyncio.fixture(scope="session") +async def connection(fluss_cluster): + """Session-scoped connection to the Fluss cluster.""" + config = fluss.Config({"bootstrap.servers": fluss_cluster}) + conn = await fluss.FlussConnection.create(config) + yield conn + conn.close() + + +@pytest_asyncio.fixture(scope="session") +async def admin(connection): + """Session-scoped admin client.""" + return await connection.get_admin() diff --git a/bindings/python/test/test_admin.py b/bindings/python/test/test_admin.py new file mode 100644 index 00000000..f203400f --- /dev/null +++ b/bindings/python/test/test_admin.py @@ -0,0 +1,301 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Integration tests for FlussAdmin operations. + +Mirrors the Rust integration tests in crates/fluss/tests/integration/admin.rs. +""" + +import pyarrow as pa +import pytest + +import fluss + + +async def test_create_database(admin): + """Test database create, exists, get_info, and drop lifecycle.""" + db_name = "py_test_create_database" + + # Cleanup in case of prior failed run + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + + assert not await admin.database_exists(db_name) + + db_descriptor = fluss.DatabaseDescriptor( + comment="test_db", + custom_properties={"k1": "v1", "k2": "v2"}, + ) + await admin.create_database(db_name, db_descriptor, ignore_if_exists=False) + + assert await admin.database_exists(db_name) + + db_info = await admin.get_database_info(db_name) + assert db_info.database_name == db_name + + descriptor = db_info.get_database_descriptor() + assert descriptor.comment == "test_db" + assert descriptor.get_custom_properties() == {"k1": "v1", "k2": "v2"} + + await admin.drop_database(db_name, ignore_if_not_exists=False, cascade=True) + + assert not await admin.database_exists(db_name) + + +async def test_create_table(admin): + """Test table create, exists, get_info, list, and drop lifecycle.""" + db_name = "py_test_create_table_db" + + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + + assert not await admin.database_exists(db_name) + await admin.create_database( + db_name, + fluss.DatabaseDescriptor(comment="Database for test_create_table"), + ignore_if_exists=False, + ) + + table_name = "test_user_table" + table_path = fluss.TablePath(db_name, table_name) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("age", pa.int32()), + pa.field("email", pa.string()), + ] + ), + primary_keys=["id"], + ) + + table_descriptor = fluss.TableDescriptor( + schema, + bucket_count=3, + bucket_keys=["id"], + comment="Test table for user data (id, name, age, email)", + log_format="arrow", + kv_format="indexed", + properties={"table.replication.factor": "1"}, + ) + + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + assert await admin.table_exists(table_path) + + tables = await admin.list_tables(db_name) + assert len(tables) == 1 + assert table_name in tables + + table_info = await admin.get_table_info(table_path) + + assert table_info.comment == "Test table for user data (id, name, age, email)" + assert table_info.get_primary_keys() == ["id"] + assert table_info.num_buckets == 3 + assert table_info.get_bucket_keys() == ["id"] + assert table_info.get_column_names() == ["id", "name", "age", "email"] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + assert not await admin.table_exists(table_path) + + await admin.drop_database(db_name, ignore_if_not_exists=False, cascade=True) + assert not await admin.database_exists(db_name) + + +async def test_partition_apis(admin): + """Test partition create, list, and drop lifecycle.""" + db_name = "py_test_partition_apis_db" + + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + await admin.create_database( + db_name, + fluss.DatabaseDescriptor(comment="Database for test_partition_apis"), + ignore_if_exists=True, + ) + + table_path = fluss.TablePath(db_name, "partitioned_table") + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("dt", pa.string()), + pa.field("region", pa.string()), + ] + ), + primary_keys=["id", "dt", "region"], + ) + + table_descriptor = fluss.TableDescriptor( + schema, + partition_keys=["dt", "region"], + bucket_count=3, + bucket_keys=["id"], + log_format="arrow", + kv_format="compacted", + properties={"table.replication.factor": "1"}, + ) + + await admin.create_table(table_path, table_descriptor, ignore_if_exists=True) + + # Initially no partitions + partitions = await admin.list_partition_infos(table_path) + assert len(partitions) == 0 + + # Create a partition + await admin.create_partition( + table_path, + {"dt": "2024-01-15", "region": "EMEA"}, + ignore_if_exists=False, + ) + + partitions = await admin.list_partition_infos(table_path) + assert len(partitions) == 1 + assert partitions[0].partition_name == "2024-01-15$EMEA" + + # Drop the partition + await admin.drop_partition( + table_path, + {"dt": "2024-01-15", "region": "EMEA"}, + ignore_if_not_exists=False, + ) + + partitions = await admin.list_partition_infos(table_path) + assert len(partitions) == 0 + + await admin.drop_table(table_path, ignore_if_not_exists=True) + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + + +async def test_fluss_error_response(admin): + """Test that API errors are raised as FlussError with correct error codes.""" + table_path = fluss.TablePath("fluss", "py_not_exist") + + with pytest.raises(fluss.FlussError) as exc_info: + await admin.get_table_info(table_path) + + assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST + + +async def test_error_database_not_exist(admin): + """Test error handling for non-existent database operations.""" + # get_database_info + with pytest.raises(fluss.FlussError) as exc_info: + await admin.get_database_info("py_no_such_db") + assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST + + # drop_database without ignore flag + with pytest.raises(fluss.FlussError) as exc_info: + await admin.drop_database("py_no_such_db", ignore_if_not_exists=False) + assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST + + # list_tables for non-existent database + with pytest.raises(fluss.FlussError) as exc_info: + await admin.list_tables("py_no_such_db") + assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_NOT_EXIST + + +async def test_error_database_already_exist(admin): + """Test error when creating a database that already exists.""" + db_name = "py_test_error_db_already_exist" + + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + await admin.create_database(db_name, ignore_if_exists=False) + + # Create same database again without ignore flag + with pytest.raises(fluss.FlussError) as exc_info: + await admin.create_database(db_name, ignore_if_exists=False) + assert exc_info.value.error_code == fluss.ErrorCode.DATABASE_ALREADY_EXIST + + # With ignore flag should succeed + await admin.create_database(db_name, ignore_if_exists=True) + + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + + +async def test_error_table_already_exist(admin): + """Test error when creating a table that already exists.""" + db_name = "py_test_error_tbl_already_exist_db" + + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + await admin.create_database(db_name, ignore_if_exists=True) + + table_path = fluss.TablePath(db_name, "my_table") + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor( + schema, + bucket_count=1, + properties={"table.replication.factor": "1"}, + ) + + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + # Create same table again without ignore flag + with pytest.raises(fluss.FlussError) as exc_info: + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + assert exc_info.value.error_code == fluss.ErrorCode.TABLE_ALREADY_EXIST + + # With ignore flag should succeed + await admin.create_table(table_path, table_descriptor, ignore_if_exists=True) + + await admin.drop_table(table_path, ignore_if_not_exists=True) + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + + +async def test_error_table_not_exist(admin): + """Test error handling for non-existent table operations.""" + table_path = fluss.TablePath("fluss", "py_no_such_table") + + # drop without ignore flag + with pytest.raises(fluss.FlussError) as exc_info: + await admin.drop_table(table_path, ignore_if_not_exists=False) + assert exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_EXIST + + # drop with ignore flag should succeed + await admin.drop_table(table_path, ignore_if_not_exists=True) + + +async def test_error_table_not_partitioned(admin): + """Test error when calling partition operations on non-partitioned table.""" + db_name = "py_test_error_not_partitioned_db" + + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) + await admin.create_database(db_name, ignore_if_exists=True) + + table_path = fluss.TablePath(db_name, "non_partitioned_table") + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor( + schema, + bucket_count=1, + properties={"table.replication.factor": "1"}, + ) + + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + with pytest.raises(fluss.FlussError) as exc_info: + await admin.list_partition_infos(table_path) + assert ( + exc_info.value.error_code == fluss.ErrorCode.TABLE_NOT_PARTITIONED_EXCEPTION + ) + + await admin.drop_table(table_path, ignore_if_not_exists=True) + await admin.drop_database(db_name, ignore_if_not_exists=True, cascade=True) diff --git a/bindings/python/test/test_kv_table.py b/bindings/python/test/test_kv_table.py new file mode 100644 index 00000000..98b0cee9 --- /dev/null +++ b/bindings/python/test/test_kv_table.py @@ -0,0 +1,428 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Integration tests for KV (primary key) table operations. + +Mirrors the Rust integration tests in crates/fluss/tests/integration/kv_table.rs. +""" + +import math +from datetime import date, datetime, timezone +from datetime import time as dt_time +from decimal import Decimal + +import pyarrow as pa + +import fluss + + +async def test_upsert_delete_and_lookup(connection, admin): + """Test upsert, lookup, update, delete, and non-existent key lookup.""" + table_path = fluss.TablePath("fluss", "py_test_upsert_and_lookup") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("age", pa.int64()), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + test_data = [(1, "Verso", 32), (2, "Noco", 25), (3, "Esquie", 35)] + + # Upsert rows (fire-and-forget, then flush) + for id_, name, age in test_data: + upsert_writer.upsert({"id": id_, "name": name, "age": age}) + await upsert_writer.flush() + + # Lookup and verify + lookuper = table.new_lookup().create_lookuper() + + for id_, expected_name, expected_age in test_data: + result = await lookuper.lookup({"id": id_}) + assert result is not None, f"Row with id={id_} should exist" + assert result["id"] == id_ + assert result["name"] == expected_name + assert result["age"] == expected_age + + # Update record with id=1 (await acknowledgment) + handle = upsert_writer.upsert({"id": 1, "name": "Verso", "age": 33}) + await handle.wait() + + result = await lookuper.lookup({"id": 1}) + assert result is not None + assert result["age"] == 33 + assert result["name"] == "Verso" + + # Delete record with id=1 (await acknowledgment) + handle = upsert_writer.delete({"id": 1}) + await handle.wait() + + result = await lookuper.lookup({"id": 1}) + assert result is None, "Record 1 should not exist after delete" + + # Verify other records still exist + for id_ in [2, 3]: + result = await lookuper.lookup({"id": id_}) + assert result is not None, f"Record {id_} should still exist" + + # Lookup non-existent key + result = await lookuper.lookup({"id": 999}) + assert result is None, "Non-existent key should return None" + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_composite_primary_keys(connection, admin): + """Test upsert and lookup with composite (multi-column) primary keys.""" + table_path = fluss.TablePath("fluss", "py_test_composite_pk") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("region", pa.string()), + pa.field("user_id", pa.int32()), + pa.field("score", pa.int64()), + ] + ), + primary_keys=["region", "user_id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + test_data = [ + ("US", 1, 100), + ("US", 2, 200), + ("EU", 1, 150), + ("EU", 2, 250), + ] + + for region, user_id, score in test_data: + upsert_writer.upsert({"region": region, "user_id": user_id, "score": score}) + await upsert_writer.flush() + + lookuper = table.new_lookup().create_lookuper() + + # Lookup (US, 1) -> score 100 + result = await lookuper.lookup({"region": "US", "user_id": 1}) + assert result is not None + assert result["score"] == 100 + + # Lookup (EU, 2) -> score 250 + result = await lookuper.lookup({"region": "EU", "user_id": 2}) + assert result is not None + assert result["score"] == 250 + + # Update (US, 1) score (await acknowledgment) + handle = upsert_writer.upsert({"region": "US", "user_id": 1, "score": 500}) + await handle.wait() + + result = await lookuper.lookup({"region": "US", "user_id": 1}) + assert result is not None + assert result["score"] == 500 + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_partial_update(connection, admin): + """Test partial column update via partial_update_by_name.""" + table_path = fluss.TablePath("fluss", "py_test_partial_update") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("age", pa.int64()), + pa.field("score", pa.int64()), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + + # Insert initial record + upsert_writer = table.new_upsert().create_writer() + handle = upsert_writer.upsert( + {"id": 1, "name": "Verso", "age": 32, "score": 6942} + ) + await handle.wait() + + lookuper = table.new_lookup().create_lookuper() + result = await lookuper.lookup({"id": 1}) + assert result is not None + assert result["id"] == 1 + assert result["name"] == "Verso" + assert result["age"] == 32 + assert result["score"] == 6942 + + # Partial update: only update score column + partial_writer = ( + table.new_upsert().partial_update_by_name(["id", "score"]).create_writer() + ) + handle = partial_writer.upsert({"id": 1, "score": 420}) + await handle.wait() + + result = await lookuper.lookup({"id": 1}) + assert result is not None + assert result["id"] == 1 + assert result["name"] == "Verso", "name should remain unchanged" + assert result["age"] == 32, "age should remain unchanged" + assert result["score"] == 420, "score should be updated to 420" + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_partial_update_by_index(connection, admin): + """Test partial column update via partial_update_by_index.""" + table_path = fluss.TablePath("fluss", "py_test_partial_update_by_index") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("age", pa.int64()), + pa.field("score", pa.int64()), + ] + ), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + + upsert_writer = table.new_upsert().create_writer() + handle = upsert_writer.upsert( + {"id": 1, "name": "Verso", "age": 32, "score": 6942} + ) + await handle.wait() + + # Partial update by indices: columns 0=id (PK), 1=name + partial_writer = ( + table.new_upsert().partial_update_by_index([0, 1]).create_writer() + ) + handle = partial_writer.upsert([1, "Verso Renamed"]) + await handle.wait() + + lookuper = table.new_lookup().create_lookuper() + result = await lookuper.lookup({"id": 1}) + assert result is not None + assert result["name"] == "Verso Renamed", "name should be updated" + assert result["score"] == 6942, "score should remain unchanged" + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_partitioned_table_upsert_and_lookup(connection, admin): + """Test upsert/lookup/delete on a partitioned KV table.""" + table_path = fluss.TablePath("fluss", "py_test_partitioned_kv_table") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("region", pa.string()), + pa.field("user_id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.int64()), + ] + ), + primary_keys=["region", "user_id"], + ) + table_descriptor = fluss.TableDescriptor( + schema, + partition_keys=["region"], + ) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + # Create partitions + for region in ["US", "EU", "APAC"]: + await admin.create_partition( + table_path, {"region": region}, ignore_if_exists=True + ) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + test_data = [ + ("US", 1, "Gustave", 100), + ("US", 2, "Lune", 200), + ("EU", 1, "Sciel", 150), + ("EU", 2, "Maelle", 250), + ("APAC", 1, "Noco", 300), + ] + + for region, user_id, name, score in test_data: + upsert_writer.upsert( + {"region": region, "user_id": user_id, "name": name, "score": score} + ) + await upsert_writer.flush() + + lookuper = table.new_lookup().create_lookuper() + + # Verify all rows across partitions + for region, user_id, expected_name, expected_score in test_data: + result = await lookuper.lookup({"region": region, "user_id": user_id}) + assert result is not None, f"Row ({region}, {user_id}) should exist" + assert result["region"] == region + assert result["user_id"] == user_id + assert result["name"] == expected_name + assert result["score"] == expected_score + + # Update within a partition (await acknowledgment) + handle = upsert_writer.upsert( + {"region": "US", "user_id": 1, "name": "Gustave Updated", "score": 999} + ) + await handle.wait() + + result = await lookuper.lookup({"region": "US", "user_id": 1}) + assert result is not None + assert result["name"] == "Gustave Updated" + assert result["score"] == 999 + + # Lookup in non-existent partition should return None + result = await lookuper.lookup({"region": "UNKNOWN_REGION", "user_id": 1}) + assert result is None, "Lookup in non-existent partition should return None" + + # Delete within a partition (await acknowledgment) + handle = upsert_writer.delete({"region": "EU", "user_id": 1}) + await handle.wait() + + result = await lookuper.lookup({"region": "EU", "user_id": 1}) + assert result is None, "Deleted record should not exist" + + # Verify sibling record still exists + result = await lookuper.lookup({"region": "EU", "user_id": 2}) + assert result is not None + assert result["name"] == "Maelle" + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_all_supported_datatypes(connection, admin): + """Test upsert/lookup for all supported data types, including nulls.""" + table_path = fluss.TablePath("fluss", "py_test_kv_all_datatypes") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("pk_int", pa.int32()), + pa.field("col_boolean", pa.bool_()), + pa.field("col_tinyint", pa.int8()), + pa.field("col_smallint", pa.int16()), + pa.field("col_int", pa.int32()), + pa.field("col_bigint", pa.int64()), + pa.field("col_float", pa.float32()), + pa.field("col_double", pa.float64()), + pa.field("col_string", pa.string()), + pa.field("col_decimal", pa.decimal128(10, 2)), + pa.field("col_date", pa.date32()), + pa.field("col_time", pa.time32("ms")), + pa.field("col_timestamp_ntz", pa.timestamp("us")), + pa.field("col_timestamp_ltz", pa.timestamp("us", tz="UTC")), + pa.field("col_bytes", pa.binary()), + ] + ), + primary_keys=["pk_int"], + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + upsert_writer = table.new_upsert().create_writer() + + # Test data for all types + row_data = { + "pk_int": 1, + "col_boolean": True, + "col_tinyint": 127, + "col_smallint": 32767, + "col_int": 2147483647, + "col_bigint": 9223372036854775807, + "col_float": 3.14, + "col_double": 2.718281828459045, + "col_string": "world of fluss python client", + "col_decimal": Decimal("123.45"), + "col_date": date(2026, 1, 23), + "col_time": dt_time(10, 13, 47, 123000), # millisecond precision + "col_timestamp_ntz": datetime(2026, 1, 23, 10, 13, 47, 123000), + "col_timestamp_ltz": datetime(2026, 1, 23, 10, 13, 47, 123000), + "col_bytes": b"binary data", + } + + handle = upsert_writer.upsert(row_data) + await handle.wait() + + lookuper = table.new_lookup().create_lookuper() + result = await lookuper.lookup({"pk_int": 1}) + assert result is not None, "Row should exist" + + assert result["pk_int"] == 1 + assert result["col_boolean"] is True + assert result["col_tinyint"] == 127 + assert result["col_smallint"] == 32767 + assert result["col_int"] == 2147483647 + assert result["col_bigint"] == 9223372036854775807 + assert math.isclose(result["col_float"], 3.14, rel_tol=1e-6) + assert math.isclose(result["col_double"], 2.718281828459045, rel_tol=1e-15) + assert result["col_string"] == "world of fluss python client" + assert result["col_decimal"] == Decimal("123.45") + assert result["col_date"] == date(2026, 1, 23) + assert result["col_time"] == dt_time(10, 13, 47, 123000) + assert result["col_timestamp_ntz"] == datetime(2026, 1, 23, 10, 13, 47, 123000) + assert result["col_timestamp_ltz"] == datetime( + 2026, 1, 23, 10, 13, 47, 123000, tzinfo=timezone.utc + ) + assert result["col_bytes"] == b"binary data" + + # Test with null values for all nullable columns + null_row = {"pk_int": 2} + for col in row_data: + if col != "pk_int": + null_row[col] = None + handle = upsert_writer.upsert(null_row) + await handle.wait() + + result = await lookuper.lookup({"pk_int": 2}) + assert result is not None, "Row with nulls should exist" + assert result["pk_int"] == 2 + for col in row_data: + if col != "pk_int": + assert result[col] is None, f"{col} should be null" + + await admin.drop_table(table_path, ignore_if_not_exists=False) diff --git a/bindings/python/test/test_log_table.py b/bindings/python/test/test_log_table.py new file mode 100644 index 00000000..3219f03c --- /dev/null +++ b/bindings/python/test/test_log_table.py @@ -0,0 +1,675 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Integration tests for log (append-only) table operations. + +Mirrors the Rust integration tests in crates/fluss/tests/integration/log_table.rs. +""" + +import asyncio +import time + +import pyarrow as pa + +import fluss + + +async def test_append_and_scan(connection, admin): + """Test appending record batches and scanning with a record-based scanner.""" + table_path = fluss.TablePath("fluss", "py_test_append_and_scan") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + append_writer = table.new_append().create_writer() + + batch1 = pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3], type=pa.int32()), pa.array(["a1", "a2", "a3"])], + schema=pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())]), + ) + append_writer.write_arrow_batch(batch1) + + batch2 = pa.RecordBatch.from_arrays( + [pa.array([4, 5, 6], type=pa.int32()), pa.array(["a4", "a5", "a6"])], + schema=pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())]), + ) + append_writer.write_arrow_batch(batch2) + + await append_writer.flush() + + # Scan with record-based scanner + scanner = await table.new_scan().create_log_scanner() + num_buckets = (await admin.get_table_info(table_path)).num_buckets + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + records = _poll_records(scanner, expected_count=6) + + assert len(records) == 6, f"Expected 6 records, got {len(records)}" + + records.sort(key=lambda r: r.row["c1"]) + + expected_c1 = [1, 2, 3, 4, 5, 6] + expected_c2 = ["a1", "a2", "a3", "a4", "a5", "a6"] + for i, record in enumerate(records): + assert record.row["c1"] == expected_c1[i], f"c1 mismatch at row {i}" + assert record.row["c2"] == expected_c2[i], f"c2 mismatch at row {i}" + + # Test unsubscribe + scanner.unsubscribe(bucket_id=0) + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_append_dict_rows(connection, admin): + """Test appending rows as dicts and scanning.""" + table_path = fluss.TablePath("fluss", "py_test_append_dict_rows") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + append_writer = table.new_append().create_writer() + + # Append using dicts + append_writer.append({"id": 1, "name": "Alice"}) + append_writer.append({"id": 2, "name": "Bob"}) + # Append using lists + append_writer.append([3, "Charlie"]) + await append_writer.flush() + + scanner = await table.new_scan().create_log_scanner() + num_buckets = (await admin.get_table_info(table_path)).num_buckets + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + records = _poll_records(scanner, expected_count=3) + assert len(records) == 3 + + rows = sorted([r.row for r in records], key=lambda r: r["id"]) + assert rows[0] == {"id": 1, "name": "Alice"} + assert rows[1] == {"id": 2, "name": "Bob"} + assert rows[2] == {"id": 3, "name": "Charlie"} + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_list_offsets(connection, admin): + """Test listing earliest, latest, and timestamp-based offsets.""" + table_path = fluss.TablePath("fluss", "py_test_list_offsets") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + await asyncio.sleep(2) # Wait for table initialization + + # Earliest offset should be 0 for empty table + earliest = await admin.list_offsets( + table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.earliest() + ) + assert earliest[0] == 0 + + # Latest offset should be 0 for empty table + latest = await admin.list_offsets( + table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest() + ) + assert latest[0] == 0 + + before_append_ms = int(time.time() * 1000) + + # Append some records + table = await connection.get_table(table_path) + append_writer = table.new_append().create_writer() + batch = pa.RecordBatch.from_arrays( + [ + pa.array([1, 2, 3], type=pa.int32()), + pa.array(["alice", "bob", "charlie"]), + ], + schema=pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]), + ) + append_writer.write_arrow_batch(batch) + await append_writer.flush() + + await asyncio.sleep(1) + + after_append_ms = int(time.time() * 1000) + + # Latest offset should be 3 after appending 3 records + latest_after = await admin.list_offsets( + table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.latest() + ) + assert latest_after[0] == 3 + + # Earliest offset should still be 0 + earliest_after = await admin.list_offsets( + table_path, bucket_ids=[0], offset_spec=fluss.OffsetSpec.earliest() + ) + assert earliest_after[0] == 0 + + # Timestamp before append should resolve to offset 0 + ts_before = await admin.list_offsets( + table_path, + bucket_ids=[0], + offset_spec=fluss.OffsetSpec.timestamp(before_append_ms), + ) + assert ts_before[0] == 0 + + # Timestamp after append should resolve to offset 3 + ts_after = await admin.list_offsets( + table_path, + bucket_ids=[0], + offset_spec=fluss.OffsetSpec.timestamp(after_append_ms), + ) + assert ts_after[0] == 3 + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_project(connection, admin): + """Test column projection by name and by index.""" + table_path = fluss.TablePath("fluss", "py_test_project") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("col_a", pa.int32()), + pa.field("col_b", pa.string()), + pa.field("col_c", pa.int32()), + ] + ) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + append_writer = table.new_append().create_writer() + + batch = pa.RecordBatch.from_arrays( + [ + pa.array([1, 2, 3], type=pa.int32()), + pa.array(["x", "y", "z"]), + pa.array([10, 20, 30], type=pa.int32()), + ], + schema=pa.schema( + [ + pa.field("col_a", pa.int32()), + pa.field("col_b", pa.string()), + pa.field("col_c", pa.int32()), + ] + ), + ) + append_writer.write_arrow_batch(batch) + await append_writer.flush() + + # Test project_by_name: select col_b and col_c only + scan = table.new_scan().project_by_name(["col_b", "col_c"]) + scanner = await scan.create_log_scanner() + scanner.subscribe_buckets({0: 0}) + + records = _poll_records(scanner, expected_count=3) + assert len(records) == 3 + + records.sort(key=lambda r: r.row["col_c"]) + expected_col_b = ["x", "y", "z"] + expected_col_c = [10, 20, 30] + for i, record in enumerate(records): + assert record.row["col_b"] == expected_col_b[i] + assert record.row["col_c"] == expected_col_c[i] + # col_a should not be present in projected results + assert "col_a" not in record.row + + # Test project by indices [1, 0] -> (col_b, col_a) + scanner2 = await table.new_scan().project([1, 0]).create_log_scanner() + scanner2.subscribe_buckets({0: 0}) + + records2 = _poll_records(scanner2, expected_count=3) + assert len(records2) == 3 + + records2.sort(key=lambda r: r.row["col_a"]) + for i, record in enumerate(records2): + assert record.row["col_b"] == expected_col_b[i] + assert record.row["col_a"] == [1, 2, 3][i] + assert "col_c" not in record.row + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_poll_batches(connection, admin): + """Test batch-based scanning with poll_arrow and poll_record_batch.""" + table_path = fluss.TablePath("fluss", "py_test_poll_batches") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + await asyncio.sleep(1) + + table = await connection.get_table(table_path) + scanner = await table.new_scan().create_record_batch_log_scanner() + scanner.subscribe(bucket_id=0, start_offset=0) + + # Empty table should return empty result + result = scanner.poll_arrow(500) + assert result.num_rows == 0 + + writer = table.new_append().create_writer() + pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + writer.write_arrow_batch( + pa.RecordBatch.from_arrays( + [pa.array([1, 2], type=pa.int32()), pa.array(["a", "b"])], + schema=pa_schema, + ) + ) + writer.write_arrow_batch( + pa.RecordBatch.from_arrays( + [pa.array([3, 4], type=pa.int32()), pa.array(["c", "d"])], + schema=pa_schema, + ) + ) + writer.write_arrow_batch( + pa.RecordBatch.from_arrays( + [pa.array([5, 6], type=pa.int32()), pa.array(["e", "f"])], + schema=pa_schema, + ) + ) + await writer.flush() + + # Poll until we get all 6 records + all_ids = _poll_arrow_ids(scanner, expected_count=6) + assert all_ids == [1, 2, 3, 4, 5, 6] + + # Append more and verify offset continuation (no duplicates) + writer.write_arrow_batch( + pa.RecordBatch.from_arrays( + [pa.array([7, 8], type=pa.int32()), pa.array(["g", "h"])], + schema=pa_schema, + ) + ) + await writer.flush() + + new_ids = _poll_arrow_ids(scanner, expected_count=2) + assert new_ids == [7, 8] + + # Subscribe from mid-offset should truncate (skip earlier records) + trunc_scanner = await table.new_scan().create_record_batch_log_scanner() + trunc_scanner.subscribe(bucket_id=0, start_offset=3) + + trunc_ids = _poll_arrow_ids(trunc_scanner, expected_count=5) + assert trunc_ids == [4, 5, 6, 7, 8] + + # Projection with batch scanner + proj_scanner = ( + await table.new_scan() + .project_by_name(["id"]) + .create_record_batch_log_scanner() + ) + proj_scanner.subscribe(bucket_id=0, start_offset=0) + batches = proj_scanner.poll_record_batch(10000) + assert len(batches) > 0 + assert batches[0].batch.num_columns == 1 + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_to_arrow_and_to_pandas(connection, admin): + """Test to_arrow() and to_pandas() convenience methods.""" + table_path = fluss.TablePath("fluss", "py_test_to_arrow_pandas") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + writer = table.new_append().create_writer() + + pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + writer.write_arrow_batch( + pa.RecordBatch.from_arrays( + [pa.array([1, 2, 3], type=pa.int32()), pa.array(["a", "b", "c"])], + schema=pa_schema, + ) + ) + await writer.flush() + + num_buckets = (await admin.get_table_info(table_path)).num_buckets + + # to_arrow() + scanner = await table.new_scan().create_record_batch_log_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + arrow_table = scanner.to_arrow() + assert arrow_table.num_rows == 3 + assert arrow_table.schema.names == ["id", "name"] + + # to_pandas() + scanner2 = await table.new_scan().create_record_batch_log_scanner() + scanner2.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + df = scanner2.to_pandas() + assert len(df) == 3 + assert list(df.columns) == ["id", "name"] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_partitioned_table_append_scan(connection, admin): + """Test append and scan on a partitioned log table.""" + table_path = fluss.TablePath("fluss", "py_test_partitioned_log_append") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), + ] + ) + ) + table_descriptor = fluss.TableDescriptor( + schema, + partition_keys=["region"], + ) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + # Create partitions + for region in ["US", "EU"]: + await admin.create_partition( + table_path, {"region": region}, ignore_if_exists=True + ) + + await asyncio.sleep(2) # Wait for partitions to be available + + table = await connection.get_table(table_path) + append_writer = table.new_append().create_writer() + + # Append rows + test_data = [ + (1, "US", 100), + (2, "US", 200), + (3, "EU", 300), + (4, "EU", 400), + ] + for id_, region, value in test_data: + append_writer.append({"id": id_, "region": region, "value": value}) + await append_writer.flush() + + # Append arrow batches per partition + pa_schema = pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), + ] + ) + us_batch = pa.RecordBatch.from_arrays( + [ + pa.array([5, 6], type=pa.int32()), + pa.array(["US", "US"]), + pa.array([500, 600], type=pa.int64()), + ], + schema=pa_schema, + ) + append_writer.write_arrow_batch(us_batch) + + eu_batch = pa.RecordBatch.from_arrays( + [ + pa.array([7, 8], type=pa.int32()), + pa.array(["EU", "EU"]), + pa.array([700, 800], type=pa.int64()), + ], + schema=pa_schema, + ) + append_writer.write_arrow_batch(eu_batch) + await append_writer.flush() + + # Verify partition offsets + us_offsets = await admin.list_partition_offsets( + table_path, + partition_name="US", + bucket_ids=[0], + offset_spec=fluss.OffsetSpec.latest(), + ) + assert us_offsets[0] == 4, "US partition should have 4 records" + + eu_offsets = await admin.list_partition_offsets( + table_path, + partition_name="EU", + bucket_ids=[0], + offset_spec=fluss.OffsetSpec.latest(), + ) + assert eu_offsets[0] == 4, "EU partition should have 4 records" + + # Scan all partitions + scanner = await table.new_scan().create_log_scanner() + partition_infos = await admin.list_partition_infos(table_path) + for p in partition_infos: + scanner.subscribe_partition( + partition_id=p.partition_id, bucket_id=0, start_offset=0 + ) + + expected = [ + (1, "US", 100), + (2, "US", 200), + (3, "EU", 300), + (4, "EU", 400), + (5, "US", 500), + (6, "US", 600), + (7, "EU", 700), + (8, "EU", 800), + ] + + records = _poll_records(scanner, expected_count=8) + assert len(records) == 8 + + collected = sorted( + [(r.row["id"], r.row["region"], r.row["value"]) for r in records], + key=lambda x: x[0], + ) + assert collected == expected + + # Test unsubscribe_partition: unsubscribe from EU, only US data should remain + unsub_scanner = await table.new_scan().create_log_scanner() + eu_partition_id = next( + p.partition_id for p in partition_infos if p.partition_name == "EU" + ) + for p in partition_infos: + unsub_scanner.subscribe_partition(p.partition_id, 0, 0) + unsub_scanner.unsubscribe_partition(eu_partition_id, 0) + + remaining = _poll_records(unsub_scanner, expected_count=4, timeout_s=5) + assert len(remaining) == 4 + assert all(r.row["region"] == "US" for r in remaining) + + # Test subscribe_partition_buckets (batch subscribe) + batch_scanner = await table.new_scan().create_log_scanner() + partition_bucket_offsets = { + (p.partition_id, 0): fluss.EARLIEST_OFFSET for p in partition_infos + } + batch_scanner.subscribe_partition_buckets(partition_bucket_offsets) + + batch_records = _poll_records(batch_scanner, expected_count=8) + assert len(batch_records) == 8 + batch_collected = sorted( + [(r.row["id"], r.row["region"], r.row["value"]) for r in batch_records], + key=lambda x: x[0], + ) + assert batch_collected == expected + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_write_arrow(connection, admin): + """Test writing a full PyArrow Table via write_arrow().""" + table_path = fluss.TablePath("fluss", "py_test_write_arrow") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + writer = table.new_append().create_writer() + + pa_schema = pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + arrow_table = pa.table( + { + "id": pa.array([1, 2, 3, 4, 5], type=pa.int32()), + "name": pa.array(["alice", "bob", "charlie", "dave", "eve"]), + }, + schema=pa_schema, + ) + writer.write_arrow(arrow_table) + await writer.flush() + + num_buckets = (await admin.get_table_info(table_path)).num_buckets + scanner = await table.new_scan().create_record_batch_log_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + result = scanner.to_arrow() + assert result.num_rows == 5 + + ids = sorted(result.column("id").to_pylist()) + names = [ + n + for _, n in sorted( + zip(result.column("id").to_pylist(), result.column("name").to_pylist()) + ) + ] + assert ids == [1, 2, 3, 4, 5] + assert names == ["alice", "bob", "charlie", "dave", "eve"] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_write_pandas(connection, admin): + """Test writing a Pandas DataFrame via write_pandas().""" + import pandas as pd + + table_path = fluss.TablePath("fluss", "py_test_write_pandas") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]) + ) + table_descriptor = fluss.TableDescriptor(schema) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + writer = table.new_append().create_writer() + + df = pd.DataFrame({"id": [10, 20, 30], "name": ["x", "y", "z"]}) + writer.write_pandas(df) + await writer.flush() + + num_buckets = (await admin.get_table_info(table_path)).num_buckets + scanner = await table.new_scan().create_record_batch_log_scanner() + scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + + result = scanner.to_pandas() + assert len(result) == 3 + + result_sorted = result.sort_values("id").reset_index(drop=True) + assert result_sorted["id"].tolist() == [10, 20, 30] + assert result_sorted["name"].tolist() == ["x", "y", "z"] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_partitioned_table_to_arrow(connection, admin): + """Test to_arrow() on partitioned tables.""" + table_path = fluss.TablePath("fluss", "py_test_partitioned_to_arrow") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("region", pa.string()), + pa.field("value", pa.int64()), + ] + ) + ) + table_descriptor = fluss.TableDescriptor(schema, partition_keys=["region"]) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + for region in ["US", "EU"]: + await admin.create_partition( + table_path, {"region": region}, ignore_if_exists=True + ) + + await asyncio.sleep(2) + + table = await connection.get_table(table_path) + writer = table.new_append().create_writer() + writer.append({"id": 1, "region": "US", "value": 100}) + writer.append({"id": 2, "region": "EU", "value": 200}) + await writer.flush() + + scanner = await table.new_scan().create_record_batch_log_scanner() + partition_infos = await admin.list_partition_infos(table_path) + for p in partition_infos: + scanner.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET) + + arrow_table = scanner.to_arrow() + assert arrow_table.num_rows == 2 + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _poll_records(scanner, expected_count, timeout_s=10): + """Poll a record-based scanner until expected_count records are collected.""" + collected = [] + deadline = time.monotonic() + timeout_s + while len(collected) < expected_count and time.monotonic() < deadline: + records = scanner.poll(5000) + collected.extend(records) + return collected + + +def _poll_arrow_ids(scanner, expected_count, timeout_s=10): + """Poll a batch scanner and extract 'id' column values.""" + all_ids = [] + deadline = time.monotonic() + timeout_s + while len(all_ids) < expected_count and time.monotonic() < deadline: + arrow_table = scanner.poll_arrow(5000) + if arrow_table.num_rows > 0: + all_ids.extend(arrow_table.column("id").to_pylist()) + return all_ids diff --git a/website/docs/developer-guide/contributing.md b/website/docs/developer-guide/contributing.md index eced106a..38b792e8 100644 --- a/website/docs/developer-guide/contributing.md +++ b/website/docs/developer-guide/contributing.md @@ -82,7 +82,7 @@ cargo build --workspace --all-targets # Run unit tests cargo test --workspace -# Run integration tests (requires a running Fluss cluster) +# Run integration tests (requires Docker) RUST_TEST_THREADS=1 cargo test --features integration_tests --workspace # Run a single test @@ -93,9 +93,15 @@ cargo test test_name ```bash cd bindings/python -pip install maturin -pip install -e ".[dev]" -maturin develop + +# Install dev dependencies and build the extension +uv sync --extra dev && uv run maturin develop + +# Run integration tests (requires Docker) +uv run pytest test/ -v + +# To run against an existing cluster instead +FLUSS_BOOTSTRAP_SERVERS=127.0.0.1:9123 uv run pytest test/ -v ``` ### C++ Bindings