From 2b0e2cd8ba3d8c12d120317b6c3e5eb020c8d0f3 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 18 Feb 2026 19:50:32 +0000 Subject: [PATCH 1/8] C++ integration test --- .github/workflows/ci_bindings_cpp.yml | 78 ++ .github/workflows/ci_bindings_python.yml | 81 ++ .github/workflows/ci_check.yml | 60 ++ .github/workflows/{ci.yml => ci_rust.yml} | 73 +- bindings/cpp/CMakeLists.txt | 23 + bindings/cpp/test/test_admin.cpp | 331 +++++++++ bindings/cpp/test/test_kv_table.cpp | 643 ++++++++++++++++ bindings/cpp/test/test_log_table.cpp | 854 ++++++++++++++++++++++ bindings/cpp/test/test_main.cpp | 31 + bindings/cpp/test/test_utils.h | 342 +++++++++ 10 files changed, 2448 insertions(+), 68 deletions(-) create mode 100644 .github/workflows/ci_bindings_cpp.yml create mode 100644 .github/workflows/ci_bindings_python.yml create mode 100644 .github/workflows/ci_check.yml rename .github/workflows/{ci.yml => ci_rust.yml} (65%) create mode 100644 bindings/cpp/test/test_admin.cpp create mode 100644 bindings/cpp/test/test_kv_table.cpp create mode 100644 bindings/cpp/test/test_log_table.cpp create mode 100644 bindings/cpp/test/test_main.cpp create mode 100644 bindings/cpp/test/test_utils.h diff --git a/.github/workflows/ci_bindings_cpp.yml b/.github/workflows/ci_bindings_cpp.yml new file mode 100644 index 00000000..1404d8f3 --- /dev/null +++ b/.github/workflows/ci_bindings_cpp.yml @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Bindings C++ + +on: + push: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + pull_request: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + - 'bindings/python/**' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + cpp-integration-test: + timeout-minutes: 60 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Install Apache Arrow C++ + run: | + sudo apt-get install -y -V ca-certificates lsb-release wget + wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb + sudo apt-get update + sudo apt-get install -y -V libarrow-dev + + - name: Rust Cache + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: cpp-test-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build C++ bindings and tests + working-directory: bindings/cpp + run: | + cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug + cmake --build build --parallel + + - name: Run C++ integration tests + working-directory: bindings/cpp + run: cd build && ctest --output-on-failure --timeout 300 + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full diff --git a/.github/workflows/ci_bindings_python.yml b/.github/workflows/ci_bindings_python.yml new file mode 100644 index 00000000..17005fa1 --- /dev/null +++ b/.github/workflows/ci_bindings_python.yml @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Bindings Python + +on: + push: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + pull_request: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + - 'bindings/cpp/**' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + python-integration-test: + timeout-minutes: 60 + runs-on: ubuntu-latest + strategy: + matrix: + python: ["3.9", "3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python }} + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Rust Cache + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build Python bindings + working-directory: bindings/python + run: | + uv sync --extra dev + uv run maturin develop + + - name: Run Python integration tests + working-directory: bindings/python + run: uv run pytest test/ -v + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full diff --git a/.github/workflows/ci_check.yml b/.github/workflows/ci_check.yml new file mode 100644 index 00000000..00746d2b --- /dev/null +++ b/.github/workflows/ci_check.yml @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Check + +on: + push: + branches: + - main + paths-ignore: + - 'website/**' + - '**/*.md' + pull_request: + branches: + - main + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + check: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Check License Header + uses: apache/skywalking-eyes/header@v0.6.0 + + - name: Install cargo-deny + uses: taiki-e/install-action@v2 + with: + tool: cargo-deny@0.14.22 + + - name: Check dependency licenses (Apache-compatible) + run: cargo deny check licenses + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Format + run: cargo fmt --all -- --check + + - name: Clippy + run: cargo clippy --all-targets --workspace -- -D warnings diff --git a/.github/workflows/ci.yml b/.github/workflows/ci_rust.yml similarity index 65% rename from .github/workflows/ci.yml rename to .github/workflows/ci_rust.yml index d51e3c07..f4cb7db9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci_rust.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: CI +name: Rust on: push: @@ -30,37 +30,15 @@ on: paths-ignore: - 'website/**' - '**/*.md' + - 'bindings/python/**' + - 'bindings/cpp/**' + workflow_dispatch: concurrency: group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} cancel-in-progress: true jobs: - check: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Check License Header - uses: apache/skywalking-eyes/header@v0.6.0 - - - name: Install cargo-deny - uses: taiki-e/install-action@v2 - with: - tool: cargo-deny@0.14.22 - - - name: Check dependency licenses (Apache-compatible) - run: cargo deny check licenses - - - name: Install protoc - run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - - - name: Format - run: cargo fmt --all -- --check - - - name: Clippy - run: cargo clippy --all-targets --workspace -- -D warnings - build: runs-on: ${{ matrix.os }} strategy: @@ -98,6 +76,7 @@ jobs: run: cargo build --workspace --all-targets test: + timeout-minutes: 60 runs-on: ${{ matrix.os }} strategy: matrix: @@ -143,45 +122,3 @@ jobs: env: RUST_LOG: DEBUG RUST_BACKTRACE: full - - python-integration-test: - timeout-minutes: 60 - runs-on: ubuntu-latest - strategy: - matrix: - python: ["3.9", "3.10", "3.11", "3.12"] - steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python }} - - - name: Install uv - uses: astral-sh/setup-uv@v4 - - - name: Install protoc - run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - - - name: Rust Cache - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: python-test-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} - - - name: Build Python bindings - working-directory: bindings/python - run: | - uv sync --extra dev - uv run maturin develop - - - name: Run Python integration tests - working-directory: bindings/python - run: uv run pytest test/ -v - env: - RUST_LOG: DEBUG - RUST_BACKTRACE: full diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index a8f527ed..6bd9fc79 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -123,3 +123,26 @@ if (FLUSS_ENABLE_ADDRESS_SANITIZER) target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1) target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined) endif() + +if (FLUSS_ENABLE_TESTING) + FetchContent_Declare( + googletest + URL https://github.com/google/googletest/archive/refs/tags/v${FLUSS_GOOGLETEST_VERSION}.tar.gz + ) + set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) + FetchContent_MakeAvailable(googletest) + + enable_testing() + + file(GLOB TEST_SOURCE_FILES "test/*.cpp") + add_executable(fluss_cpp_test ${TEST_SOURCE_FILES}) + target_link_libraries(fluss_cpp_test PRIVATE fluss_cpp GTest::gtest) + target_link_libraries(fluss_cpp_test PRIVATE Arrow::arrow_shared) + target_compile_definitions(fluss_cpp_test PRIVATE ARROW_FOUND) + target_include_directories(fluss_cpp_test PRIVATE + ${CPP_INCLUDE_DIR} + ${PROJECT_SOURCE_DIR}/test + ) + + add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test) +endif() diff --git a/bindings/cpp/test/test_admin.cpp b/bindings/cpp/test/test_admin.cpp new file mode 100644 index 00000000..b6bb25b7 --- /dev/null +++ b/bindings/cpp/test/test_admin.cpp @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "test_utils.h" + +class AdminTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } +}; + +TEST_F(AdminTest, CreateDatabase) { + auto& adm = admin(); + + std::string db_name = "test_create_database_cpp"; + + // Database should not exist initially + bool exists = true; + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); + + // Create database with descriptor + fluss::DatabaseDescriptor descriptor; + descriptor.comment = "test_db"; + descriptor.properties = {{"k1", "v1"}, {"k2", "v2"}}; + ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false)); + + // Database should exist now + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_TRUE(exists); + + // Get database info + fluss::DatabaseInfo db_info; + ASSERT_OK(adm.GetDatabaseInfo(db_name, db_info)); + EXPECT_EQ(db_info.database_name, db_name); + EXPECT_EQ(db_info.comment, "test_db"); + EXPECT_EQ(db_info.properties.at("k1"), "v1"); + EXPECT_EQ(db_info.properties.at("k2"), "v2"); + + // Drop database + ASSERT_OK(adm.DropDatabase(db_name, false, true)); + + // Database should not exist now + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); +} + +TEST_F(AdminTest, CreateTable) { + auto& adm = admin(); + + std::string db_name = "test_create_table_cpp_db"; + fluss::DatabaseDescriptor db_desc; + db_desc.comment = "Database for test_create_table"; + + bool exists = false; + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); + + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, false)); + + std::string table_name = "test_user_table"; + fluss::TablePath table_path(db_name, table_name); + + // Build schema + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::Int(), "User's age (optional)") + .AddColumn("email", fluss::DataType::String()) + .SetPrimaryKeys({"id"}) + .Build(); + + // Build table descriptor + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetComment("Test table for user data (id, name, age, email)") + .SetBucketCount(3) + .SetBucketKeys({"id"}) + .SetProperty("table.replication.factor", "1") + .SetLogFormat("arrow") + .SetKvFormat("indexed") + .Build(); + + // Create table + ASSERT_OK(adm.CreateTable(table_path, table_descriptor, false)); + + // Table should exist + ASSERT_OK(adm.TableExists(table_path, exists)); + ASSERT_TRUE(exists); + + // List tables + std::vector tables; + ASSERT_OK(adm.ListTables(db_name, tables)); + ASSERT_EQ(tables.size(), 1u); + EXPECT_TRUE(std::find(tables.begin(), tables.end(), table_name) != tables.end()); + + // Get table info + fluss::TableInfo table_info; + ASSERT_OK(adm.GetTableInfo(table_path, table_info)); + + EXPECT_EQ(table_info.comment, "Test table for user data (id, name, age, email)"); + EXPECT_EQ(table_info.primary_keys, std::vector{"id"}); + EXPECT_EQ(table_info.num_buckets, 3); + EXPECT_EQ(table_info.bucket_keys, std::vector{"id"}); + + // Drop table + ASSERT_OK(adm.DropTable(table_path, false)); + ASSERT_OK(adm.TableExists(table_path, exists)); + ASSERT_FALSE(exists); + + // Drop database + ASSERT_OK(adm.DropDatabase(db_name, false, true)); + ASSERT_OK(adm.DatabaseExists(db_name, exists)); + ASSERT_FALSE(exists); +} + +TEST_F(AdminTest, PartitionApis) { + auto& adm = admin(); + + std::string db_name = "test_partition_apis_cpp_db"; + fluss::DatabaseDescriptor db_desc; + db_desc.comment = "Database for test_partition_apis"; + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true)); + + fluss::TablePath table_path(db_name, "partitioned_table"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("dt", fluss::DataType::String()) + .AddColumn("region", fluss::DataType::String()) + .SetPrimaryKeys({"id", "dt", "region"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(3) + .SetBucketKeys({"id"}) + .SetPartitionKeys({"dt", "region"}) + .SetProperty("table.replication.factor", "1") + .SetLogFormat("arrow") + .SetKvFormat("compacted") + .Build(); + + ASSERT_OK(adm.CreateTable(table_path, table_descriptor, true)); + + // No partitions initially + std::vector partitions; + ASSERT_OK(adm.ListPartitionInfos(table_path, partitions)); + ASSERT_TRUE(partitions.empty()); + + // Create a partition + std::unordered_map partition_spec = { + {"dt", "2024-01-15"}, {"region", "EMEA"}}; + ASSERT_OK(adm.CreatePartition(table_path, partition_spec, false)); + + // Should have one partition + ASSERT_OK(adm.ListPartitionInfos(table_path, partitions)); + ASSERT_EQ(partitions.size(), 1u); + EXPECT_EQ(partitions[0].partition_name, "2024-01-15$EMEA"); + + // List with partial spec filter - should find the partition + std::unordered_map partial_spec = {{"dt", "2024-01-15"}}; + std::vector partitions_with_spec; + ASSERT_OK(adm.ListPartitionInfos(table_path, partial_spec, partitions_with_spec)); + ASSERT_EQ(partitions_with_spec.size(), 1u); + EXPECT_EQ(partitions_with_spec[0].partition_name, "2024-01-15$EMEA"); + + // List with non-matching spec - should find no partitions + std::unordered_map non_matching_spec = {{"dt", "2024-01-16"}}; + std::vector empty_partitions; + ASSERT_OK(adm.ListPartitionInfos(table_path, non_matching_spec, empty_partitions)); + ASSERT_TRUE(empty_partitions.empty()); + + // Drop partition + ASSERT_OK(adm.DropPartition(table_path, partition_spec, false)); + + ASSERT_OK(adm.ListPartitionInfos(table_path, partitions)); + ASSERT_TRUE(partitions.empty()); + + // Cleanup + ASSERT_OK(adm.DropTable(table_path, true)); + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} + +TEST_F(AdminTest, FlussErrorResponse) { + auto& adm = admin(); + + fluss::TablePath table_path("fluss", "not_exist_cpp"); + + fluss::TableInfo info; + auto result = adm.GetTableInfo(table_path, info); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST); +} + +TEST_F(AdminTest, ErrorDatabaseNotExist) { + auto& adm = admin(); + + // get_database_info for non-existent database + fluss::DatabaseInfo info; + auto result = adm.GetDatabaseInfo("no_such_db_cpp", info); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST); + + // drop_database without ignore flag + result = adm.DropDatabase("no_such_db_cpp", false, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST); + + // list_tables for non-existent database + std::vector tables; + result = adm.ListTables("no_such_db_cpp", tables); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_NOT_EXIST); +} + +TEST_F(AdminTest, ErrorDatabaseAlreadyExist) { + auto& adm = admin(); + + std::string db_name = "test_error_db_already_exist_cpp"; + fluss::DatabaseDescriptor descriptor; + + ASSERT_OK(adm.CreateDatabase(db_name, descriptor, false)); + + // Create same database again without ignore flag + auto result = adm.CreateDatabase(db_name, descriptor, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::DATABASE_ALREADY_EXIST); + + // With ignore flag should succeed + ASSERT_OK(adm.CreateDatabase(db_name, descriptor, true)); + + // Cleanup + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} + +TEST_F(AdminTest, ErrorTableAlreadyExist) { + auto& adm = admin(); + + std::string db_name = "test_error_tbl_already_exist_cpp_db"; + fluss::DatabaseDescriptor db_desc; + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true)); + + fluss::TablePath table_path(db_name, "my_table"); + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + auto table_desc = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(1) + .SetProperty("table.replication.factor", "1") + .Build(); + + ASSERT_OK(adm.CreateTable(table_path, table_desc, false)); + + // Create same table again without ignore flag + auto result = adm.CreateTable(table_path, table_desc, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_ALREADY_EXIST); + + // With ignore flag should succeed + ASSERT_OK(adm.CreateTable(table_path, table_desc, true)); + + // Cleanup + ASSERT_OK(adm.DropTable(table_path, true)); + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} + +TEST_F(AdminTest, ErrorTableNotExist) { + auto& adm = admin(); + + fluss::TablePath table_path("fluss", "no_such_table_cpp"); + + // Drop without ignore flag + auto result = adm.DropTable(table_path, false); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_EXIST); + + // Drop with ignore flag should succeed + ASSERT_OK(adm.DropTable(table_path, true)); +} + +TEST_F(AdminTest, ErrorTableNotPartitioned) { + auto& adm = admin(); + + std::string db_name = "test_error_not_partitioned_cpp_db"; + fluss::DatabaseDescriptor db_desc; + ASSERT_OK(adm.CreateDatabase(db_name, db_desc, true)); + + fluss::TablePath table_path(db_name, "non_partitioned_table"); + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + auto table_desc = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(1) + .SetProperty("table.replication.factor", "1") + .Build(); + + ASSERT_OK(adm.CreateTable(table_path, table_desc, false)); + + // list_partition_infos on non-partitioned table + std::vector partitions; + auto result = adm.ListPartitionInfos(table_path, partitions); + ASSERT_FALSE(result.Ok()); + EXPECT_EQ(result.error_code, fluss::ErrorCode::TABLE_NOT_PARTITIONED_EXCEPTION); + + // Cleanup + ASSERT_OK(adm.DropTable(table_path, true)); + ASSERT_OK(adm.DropDatabase(db_name, true, true)); +} diff --git a/bindings/cpp/test/test_kv_table.cpp b/bindings/cpp/test/test_kv_table.cpp new file mode 100644 index 00000000..1bb65e0d --- /dev/null +++ b/bindings/cpp/test/test_kv_table.cpp @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "test_utils.h" + +class KvTableTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } + + fluss::Connection& connection() { + return fluss_test::FlussTestEnvironment::Instance()->GetConnection(); + } +}; + +TEST_F(KvTableTest, UpsertDeleteAndLookup) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_upsert_and_lookup_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Create upsert writer + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Upsert 3 rows (fire-and-forget, then flush) + struct TestData { + int32_t id; + std::string name; + int64_t age; + }; + std::vector test_data = {{1, "Verso", 32}, {2, "Noco", 25}, {3, "Esquie", 35}}; + + for (const auto& d : test_data) { + fluss::GenericRow row(3); + row.SetInt32(0, d.id); + row.SetString(1, d.name); + row.SetInt64(2, d.age); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Verify lookup results + for (const auto& d : test_data) { + fluss::GenericRow key(3); + key.SetInt32(0, d.id); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()) << "Row with id=" << d.id << " should exist"; + + EXPECT_EQ(result.GetInt32(0), d.id) << "id mismatch"; + EXPECT_EQ(result.GetString(1), d.name) << "name mismatch"; + EXPECT_EQ(result.GetInt64(2), d.age) << "age mismatch"; + } + + // Update record with id=1 (await acknowledgment) + { + fluss::GenericRow updated_row(3); + updated_row.SetInt32(0, 1); + updated_row.SetString(1, "Verso"); + updated_row.SetInt64(2, 33); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(updated_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify the update + { + fluss::GenericRow key(3); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64(2), 33) << "Age should be updated"; + EXPECT_EQ(result.GetString(1), "Verso") << "Name should remain unchanged"; + } + + // Delete record with id=1 (await acknowledgment) + { + fluss::GenericRow delete_row(3); + delete_row.SetInt32(0, 1); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Delete(delete_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify deletion + { + fluss::GenericRow key(3); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Record 1 should not exist after delete"; + } + + // Verify other records still exist + for (int id : {2, 3}) { + fluss::GenericRow key(3); + key.SetInt32(0, id); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()) << "Record " << id + << " should still exist after deleting record 1"; + } + + // Lookup non-existent key + { + fluss::GenericRow key(3); + key.SetInt32(0, 999); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Non-existent key should return not found"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, CompositePrimaryKeys) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_composite_pk_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("region", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::BigInt()) + .AddColumn("user_id", fluss::DataType::Int()) + .SetPrimaryKeys({"region", "user_id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Insert records with composite keys + struct TestData { + std::string region; + int32_t user_id; + int64_t score; + }; + std::vector test_data = { + {"US", 1, 100}, {"US", 2, 200}, {"EU", 1, 150}, {"EU", 2, 250}}; + + for (const auto& d : test_data) { + auto row = table.NewRow(); + row.Set("region", d.region); + row.Set("score", d.score); + row.Set("user_id", d.user_id); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Lookup (US, 1) - should return score 100 + { + auto key = table.NewRow(); + key.Set("region", "US"); + key.Set("user_id", 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 100) << "Score for (US, 1) should be 100"; + } + + // Lookup (EU, 2) - should return score 250 + { + auto key = table.NewRow(); + key.Set("region", "EU"); + key.Set("user_id", 2); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 250) << "Score for (EU, 2) should be 250"; + } + + // Update (US, 1) score (await acknowledgment) + { + auto update_row = table.NewRow(); + update_row.Set("region", "US"); + update_row.Set("user_id", 1); + update_row.Set("score", static_cast(500)); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(update_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify update + { + auto key = table.NewRow(); + key.Set("region", "US"); + key.Set("user_id", 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 500) << "Row score should be updated"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, PartialUpdate) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partial_update_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Insert initial record with all columns + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + { + fluss::GenericRow row(4); + row.SetInt32(0, 1); + row.SetString(1, "Verso"); + row.SetInt64(2, 32); + row.SetInt64(3, 6942); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify initial record + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1); + EXPECT_EQ(result.GetString(1), "Verso"); + EXPECT_EQ(result.GetInt64(2), 32); + EXPECT_EQ(result.GetInt64(3), 6942); + } + + // Create partial update writer to update only score column + auto partial_upsert = table.NewUpsert(); + partial_upsert.PartialUpdateByName({"id", "score"}); + fluss::UpsertWriter partial_writer; + ASSERT_OK(partial_upsert.CreateWriter(partial_writer)); + + // Update only the score column (await acknowledgment) + { + fluss::GenericRow partial_row(4); + partial_row.SetInt32(0, 1); + partial_row.SetNull(1); // not in partial update + partial_row.SetNull(2); // not in partial update + partial_row.SetInt64(3, 420); + fluss::WriteResult wr; + ASSERT_OK(partial_writer.Upsert(partial_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify partial update - name and age should remain unchanged + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1"; + EXPECT_EQ(result.GetString(1), "Verso") << "name should remain unchanged"; + EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged"; + EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, PartitionedTableUpsertAndLookup) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partitioned_kv_table_cpp"); + + // Create a partitioned KV table with region as partition key + auto schema = fluss::Schema::NewBuilder() + .AddColumn("region", fluss::DataType::String()) + .AddColumn("user_id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"region", "user_id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetPartitionKeys({"region"}) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + // Create partitions + fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU", "APAC"}); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Insert records with different partitions + struct TestData { + std::string region; + int32_t user_id; + std::string name; + int64_t score; + }; + std::vector test_data = {{"US", 1, "Gustave", 100}, {"US", 2, "Lune", 200}, + {"EU", 1, "Sciel", 150}, {"EU", 2, "Maelle", 250}, + {"APAC", 1, "Noco", 300}}; + + for (const auto& d : test_data) { + fluss::GenericRow row(4); + row.SetString(0, d.region); + row.SetInt32(1, d.user_id); + row.SetString(2, d.name); + row.SetInt64(3, d.score); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Lookup records + for (const auto& d : test_data) { + fluss::GenericRow key(4); + key.SetString(0, d.region); + key.SetInt32(1, d.user_id); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + + EXPECT_EQ(std::string(result.GetString(0)), d.region) << "region mismatch"; + EXPECT_EQ(result.GetInt32(1), d.user_id) << "user_id mismatch"; + EXPECT_EQ(std::string(result.GetString(2)), d.name) << "name mismatch"; + EXPECT_EQ(result.GetInt64(3), d.score) << "score mismatch"; + } + + // Update within a partition (await acknowledgment) + { + fluss::GenericRow updated_row(4); + updated_row.SetString(0, "US"); + updated_row.SetInt32(1, 1); + updated_row.SetString(2, "Gustave Updated"); + updated_row.SetInt64(3, 999); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(updated_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify the update + { + fluss::GenericRow key(4); + key.SetString(0, "US"); + key.SetInt32(1, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(std::string(result.GetString(2)), "Gustave Updated"); + EXPECT_EQ(result.GetInt64(3), 999); + } + + // Lookup in non-existent partition should return not found + { + fluss::GenericRow key(4); + key.SetString(0, "UNKNOWN_REGION"); + key.SetInt32(1, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Lookup in non-existent partition should return not found"; + } + + // Delete a record within a partition (await acknowledgment) + { + fluss::GenericRow delete_key(4); + delete_key.SetString(0, "EU"); + delete_key.SetInt32(1, 1); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Delete(delete_key, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify deletion + { + fluss::GenericRow key(4); + key.SetString(0, "EU"); + key.SetInt32(1, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Deleted record should not exist"; + } + + // Verify other records in same partition still exist + { + fluss::GenericRow key(4); + key.SetString(0, "EU"); + key.SetInt32(1, 2); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(std::string(result.GetString(2)), "Maelle"); + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, AllSupportedDatatypes) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_all_datatypes_cpp"); + + // Create a table with all supported datatypes + auto schema = fluss::Schema::NewBuilder() + .AddColumn("pk_int", fluss::DataType::Int()) + .AddColumn("col_boolean", fluss::DataType::Boolean()) + .AddColumn("col_tinyint", fluss::DataType::TinyInt()) + .AddColumn("col_smallint", fluss::DataType::SmallInt()) + .AddColumn("col_int", fluss::DataType::Int()) + .AddColumn("col_bigint", fluss::DataType::BigInt()) + .AddColumn("col_float", fluss::DataType::Float()) + .AddColumn("col_double", fluss::DataType::Double()) + .AddColumn("col_char", fluss::DataType::Char(10)) + .AddColumn("col_string", fluss::DataType::String()) + .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2)) + .AddColumn("col_date", fluss::DataType::Date()) + .AddColumn("col_time", fluss::DataType::Time()) + .AddColumn("col_timestamp", fluss::DataType::Timestamp()) + .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz()) + .AddColumn("col_bytes", fluss::DataType::Bytes()) + .AddColumn("col_binary", fluss::DataType::Binary(20)) + .SetPrimaryKeys({"pk_int"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Test data + int32_t pk_int = 1; + bool col_boolean = true; + int32_t col_tinyint = 127; + int32_t col_smallint = 32767; + int32_t col_int = 2147483647; + int64_t col_bigint = 9223372036854775807LL; + float col_float = 3.14f; + double col_double = 2.718281828459045; + std::string col_char = "hello"; + std::string col_string = "world of fluss rust client"; + std::string col_decimal = "123.45"; + auto col_date = fluss::Date::FromDays(20476); // 2026-01-23 + auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47 + auto col_timestamp = fluss::Timestamp::FromMillis(1769163227123); // 2026-01-23 10:13:47.123 + auto col_timestamp_ltz = fluss::Timestamp::FromMillis(1769163227123); + std::vector col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 'a', 't', 'a'}; + std::vector col_binary = {'f', 'i', 'x', 'e', 'd', ' ', 'b', 'i', 'n', 'a', + 'r', 'y', ' ', 'd', 'a', 't', 'a', '!', '!', '!'}; + + // Upsert a row with all datatypes + { + fluss::GenericRow row(17); + row.SetInt32(0, pk_int); + row.SetBool(1, col_boolean); + row.SetInt32(2, col_tinyint); + row.SetInt32(3, col_smallint); + row.SetInt32(4, col_int); + row.SetInt64(5, col_bigint); + row.SetFloat32(6, col_float); + row.SetFloat64(7, col_double); + row.SetString(8, col_char); + row.SetString(9, col_string); + row.SetDecimal(10, col_decimal); + row.SetDate(11, col_date); + row.SetTime(12, col_time); + row.SetTimestampNtz(13, col_timestamp); + row.SetTimestampLtz(14, col_timestamp_ltz); + row.SetBytes(15, col_bytes); + row.SetBytes(16, col_binary); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Lookup the record + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + { + fluss::GenericRow key(17); + key.SetInt32(0, pk_int); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + + // Verify all datatypes + EXPECT_EQ(result.GetInt32(0), pk_int) << "pk_int mismatch"; + EXPECT_EQ(result.GetBool(1), col_boolean) << "col_boolean mismatch"; + EXPECT_EQ(result.GetInt32(2), col_tinyint) << "col_tinyint mismatch"; + EXPECT_EQ(result.GetInt32(3), col_smallint) << "col_smallint mismatch"; + EXPECT_EQ(result.GetInt32(4), col_int) << "col_int mismatch"; + EXPECT_EQ(result.GetInt64(5), col_bigint) << "col_bigint mismatch"; + EXPECT_NEAR(result.GetFloat32(6), col_float, 1e-6f) << "col_float mismatch"; + EXPECT_NEAR(result.GetFloat64(7), col_double, 1e-15) << "col_double mismatch"; + EXPECT_EQ(result.GetString(8), col_char) << "col_char mismatch"; + EXPECT_EQ(result.GetString(9), col_string) << "col_string mismatch"; + EXPECT_EQ(result.GetDecimalString(10), col_decimal) << "col_decimal mismatch"; + EXPECT_EQ(result.GetDate(11).days_since_epoch, col_date.days_since_epoch) << "col_date mismatch"; + EXPECT_EQ(result.GetTime(12).millis_since_midnight, col_time.millis_since_midnight) << "col_time mismatch"; + EXPECT_EQ(result.GetTimestamp(13).epoch_millis, col_timestamp.epoch_millis) + << "col_timestamp mismatch"; + EXPECT_EQ(result.GetTimestamp(14).epoch_millis, col_timestamp_ltz.epoch_millis) + << "col_timestamp_ltz mismatch"; + + auto [bytes_ptr, bytes_len] = result.GetBytes(15); + EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch"; + EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0) + << "col_bytes mismatch"; + + auto [binary_ptr, binary_len] = result.GetBytes(16); + EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length mismatch"; + EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 0) + << "col_binary mismatch"; + } + + // Test with null values for nullable columns + { + fluss::GenericRow row_with_nulls(17); + row_with_nulls.SetInt32(0, 2); // pk_int = 2 + for (size_t i = 1; i < 17; ++i) { + row_with_nulls.SetNull(i); + } + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row_with_nulls, wr)); + ASSERT_OK(wr.Wait()); + } + + // Lookup row with nulls + { + fluss::GenericRow key(17); + key.SetInt32(0, 2); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + + EXPECT_EQ(result.GetInt32(0), 2) << "pk_int mismatch"; + for (size_t i = 1; i < 17; ++i) { + EXPECT_TRUE(result.IsNull(i)) << "column " << i << " should be null"; + } + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} diff --git a/bindings/cpp/test/test_log_table.cpp b/bindings/cpp/test/test_log_table.cpp new file mode 100644 index 00000000..c4db3be8 --- /dev/null +++ b/bindings/cpp/test/test_log_table.cpp @@ -0,0 +1,854 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include +#include +#include +#include + +#include "test_utils.h" + +class LogTableTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } + + fluss::Connection& connection() { + return fluss_test::FlussTestEnvironment::Instance()->GetConnection(); + } +}; + +TEST_F(LogTableTest, AppendRecordBatchAndScan) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_append_record_batch_and_scan_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("c1", fluss::DataType::Int()) + .AddColumn("c2", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Create append writer + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Append Arrow record batches + { + auto c1 = arrow::Int32Builder(); + c1.AppendValues({1, 2, 3}).ok(); + auto c2 = arrow::StringBuilder(); + c2.AppendValues({"a1", "a2", "a3"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("c1", arrow::int32()), arrow::field("c2", arrow::utf8())}), + 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + { + auto c1 = arrow::Int32Builder(); + c1.AppendValues({4, 5, 6}).ok(); + auto c2 = arrow::StringBuilder(); + c2.AppendValues({"a4", "a5", "a6"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("c1", arrow::int32()), arrow::field("c2", arrow::utf8())}), + 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + ASSERT_OK(append_writer.Flush()); + + // Create scanner and subscribe + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + int32_t num_buckets = scan_table.GetTableInfo().num_buckets; + + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + + for (int32_t bucket_id = 0; bucket_id < num_buckets; ++bucket_id) { + ASSERT_OK(log_scanner.Subscribe(bucket_id, fluss::EARLIEST_OFFSET)); + } + + // Poll for records + fluss::ScanRecords scan_records; + ASSERT_OK(log_scanner.Poll(10000, scan_records)); + + ASSERT_EQ(scan_records.Size(), 6u) << "Expected 6 records"; + + // Collect and sort by offset + std::vector> records; + for (size_t i = 0; i < scan_records.Size(); ++i) { + auto rec = scan_records[i]; + records.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); + } + std::sort(records.begin(), records.end()); + + std::vector expected_c1 = {1, 2, 3, 4, 5, 6}; + std::vector expected_c2 = {"a1", "a2", "a3", "a4", "a5", "a6"}; + + for (size_t i = 0; i < 6; ++i) { + EXPECT_EQ(records[i].first, expected_c1[i]) << "c1 mismatch at row " << i; + EXPECT_EQ(records[i].second, expected_c2[i]) << "c2 mismatch at row " << i; + } + + // Test unsubscribe + ASSERT_OK(log_scanner.Unsubscribe(0)); + + // Verify unsubscribe_partition fails on a non-partitioned table + auto unsub_result = log_scanner.UnsubscribePartition(0, 0); + ASSERT_FALSE(unsub_result.Ok()) + << "unsubscribe_partition should fail on a non-partitioned table"; + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, ListOffsets) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_list_offsets_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + // Wait for table initialization + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Earliest offset should be 0 for empty table + std::unordered_map earliest_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), earliest_offsets)); + EXPECT_EQ(earliest_offsets[0], 0) << "Earliest offset should be 0 for bucket 0"; + + // Latest offset should be 0 for empty table + std::unordered_map latest_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), latest_offsets)); + EXPECT_EQ(latest_offsets[0], 0) << "Latest offset should be 0 for empty table"; + + auto before_append_ms = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // Append records + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues({1, 2, 3}).ok(); + auto name_builder = arrow::StringBuilder(); + name_builder.AppendValues({"alice", "bob", "charlie"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema( + {arrow::field("id", arrow::int32()), arrow::field("name", arrow::utf8())}), + 3, {id_builder.Finish().ValueOrDie(), name_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + auto after_append_ms = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // Latest offset after appending should be 3 + std::unordered_map latest_after; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), latest_after)); + EXPECT_EQ(latest_after[0], 3) << "Latest offset should be 3 after appending 3 records"; + + // Earliest offset should still be 0 + std::unordered_map earliest_after; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), earliest_after)); + EXPECT_EQ(earliest_after[0], 0) << "Earliest offset should still be 0"; + + // Timestamp before append should resolve to offset 0 + std::unordered_map ts_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Timestamp(before_append_ms), + ts_offsets)); + EXPECT_EQ(ts_offsets[0], 0) + << "Timestamp before append should resolve to offset 0"; + + // Timestamp after append should resolve to offset 3 + std::unordered_map ts_after_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Timestamp(after_append_ms), + ts_after_offsets)); + EXPECT_EQ(ts_after_offsets[0], 3) + << "Timestamp after append should resolve to offset 3"; + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, TestProject) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_project_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("col_a", fluss::DataType::Int()) + .AddColumn("col_b", fluss::DataType::String()) + .AddColumn("col_c", fluss::DataType::Int()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Append 3 records + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + { + auto col_a_builder = arrow::Int32Builder(); + col_a_builder.AppendValues({1, 2, 3}).ok(); + auto col_b_builder = arrow::StringBuilder(); + col_b_builder.AppendValues({"x", "y", "z"}).ok(); + auto col_c_builder = arrow::Int32Builder(); + col_c_builder.AppendValues({10, 20, 30}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("col_a", arrow::int32()), + arrow::field("col_b", arrow::utf8()), + arrow::field("col_c", arrow::int32())}), + 3, + {col_a_builder.Finish().ValueOrDie(), col_b_builder.Finish().ValueOrDie(), + col_c_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + // Test project_by_name: select col_b and col_c only + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto scan = proj_table.NewScan(); + scan.ProjectByName({"col_b", "col_c"}); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateLogScanner(scanner)); + + ASSERT_OK(scanner.Subscribe(0, 0)); + + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(10000, records)); + + ASSERT_EQ(records.Size(), 3u) << "Should have 3 records with project_by_name"; + + std::vector expected_col_b = {"x", "y", "z"}; + std::vector expected_col_c = {10, 20, 30}; + + // Collect and sort by col_c to get deterministic order + std::vector> collected; + for (size_t i = 0; i < records.Size(); ++i) { + auto rec = records[i]; + collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); + } + std::sort(collected.begin(), collected.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b mismatch at index " << i; + EXPECT_EQ(collected[i].second, expected_col_c[i]) << "col_c mismatch at index " << i; + } + } + + // Test project by column indices: select col_b (1) and col_a (0) in that order + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto scan = proj_table.NewScan(); + scan.ProjectByIndex({1, 0}); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateLogScanner(scanner)); + + ASSERT_OK(scanner.Subscribe(0, 0)); + + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(10000, records)); + + ASSERT_EQ(records.Size(), 3u); + + std::vector expected_col_b = {"x", "y", "z"}; + std::vector expected_col_a = {1, 2, 3}; + + std::vector> collected; + for (size_t i = 0; i < records.Size(); ++i) { + auto rec = records[i]; + collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); + } + std::sort(collected.begin(), collected.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b mismatch at index " << i; + EXPECT_EQ(collected[i].second, expected_col_a[i]) << "col_a mismatch at index " << i; + } + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, TestPollBatches) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_poll_batches_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto scan = table.NewScan(); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateRecordBatchLogScanner(scanner)); + ASSERT_OK(scanner.Subscribe(0, 0)); + + // Test 1: Empty table should return empty result + { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(500, batches)); + ASSERT_TRUE(batches.Empty()); + } + + // Append data + auto table_append = table.NewAppend(); + fluss::AppendWriter writer; + ASSERT_OK(table_append.CreateWriter(writer)); + + auto make_batch = [](std::vector ids, std::vector names) { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues(ids).ok(); + auto name_builder = arrow::StringBuilder(); + name_builder.AppendValues(names).ok(); + return arrow::RecordBatch::Make( + arrow::schema( + {arrow::field("id", arrow::int32()), arrow::field("name", arrow::utf8())}), + static_cast(ids.size()), + {id_builder.Finish().ValueOrDie(), name_builder.Finish().ValueOrDie()}); + }; + + ASSERT_OK(writer.AppendArrowBatch(make_batch({1, 2}, {"a", "b"}))); + ASSERT_OK(writer.AppendArrowBatch(make_batch({3, 4}, {"c", "d"}))); + ASSERT_OK(writer.AppendArrowBatch(make_batch({5, 6}, {"e", "f"}))); + ASSERT_OK(writer.Flush()); + + // Extract ids from Arrow batches + auto extract_ids = [](const fluss::ArrowRecordBatches& batches) { + std::vector ids; + for (const auto& batch : batches) { + auto arr = + std::static_pointer_cast(batch->GetArrowRecordBatch()->column(0)); + for (int64_t i = 0; i < arr->length(); ++i) { + ids.push_back(arr->Value(i)); + } + } + return ids; + }; + + // Test 2: Poll until we get all 6 records + std::vector all_ids; + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (all_ids.size() < 6 && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(5000, batches)); + auto ids = extract_ids(batches); + all_ids.insert(all_ids.end(), ids.begin(), ids.end()); + } + ASSERT_EQ(all_ids, (std::vector{1, 2, 3, 4, 5, 6})); + + // Test 3: Append more and verify offset continuation (no duplicates) + ASSERT_OK(writer.AppendArrowBatch(make_batch({7, 8}, {"g", "h"}))); + ASSERT_OK(writer.Flush()); + + std::vector new_ids; + deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (new_ids.size() < 2 && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(5000, batches)); + auto ids = extract_ids(batches); + new_ids.insert(new_ids.end(), ids.begin(), ids.end()); + } + ASSERT_EQ(new_ids, (std::vector{7, 8})); + + // Test 4: Subscribing from mid-offset should truncate batch + { + fluss::Table trunc_table; + ASSERT_OK(conn.GetTable(table_path, trunc_table)); + auto trunc_scan = trunc_table.NewScan(); + fluss::LogScanner trunc_scanner; + ASSERT_OK(trunc_scan.CreateRecordBatchLogScanner(trunc_scanner)); + ASSERT_OK(trunc_scanner.Subscribe(0, 3)); + + std::vector trunc_ids; + deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (trunc_ids.size() < 5 && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(trunc_scanner.PollRecordBatch(5000, batches)); + auto ids = extract_ids(batches); + trunc_ids.insert(trunc_ids.end(), ids.begin(), ids.end()); + } + ASSERT_EQ(trunc_ids, (std::vector{4, 5, 6, 7, 8})); + } + + // Test 5: Projection should only return requested columns + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto proj_scan = proj_table.NewScan(); + proj_scan.ProjectByName({"id"}); + fluss::LogScanner proj_scanner; + ASSERT_OK(proj_scan.CreateRecordBatchLogScanner(proj_scanner)); + ASSERT_OK(proj_scanner.Subscribe(0, 0)); + + fluss::ArrowRecordBatches proj_batches; + ASSERT_OK(proj_scanner.PollRecordBatch(10000, proj_batches)); + + ASSERT_FALSE(proj_batches.Empty()); + EXPECT_EQ(proj_batches[0]->GetArrowRecordBatch()->num_columns(), 1) + << "Projected batch should have 1 column (id), not 2"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, AllSupportedDatatypes) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_log_all_datatypes_cpp"); + + // Create a log table with all supported datatypes + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("col_tinyint", fluss::DataType::TinyInt()) + .AddColumn("col_smallint", fluss::DataType::SmallInt()) + .AddColumn("col_int", fluss::DataType::Int()) + .AddColumn("col_bigint", fluss::DataType::BigInt()) + .AddColumn("col_float", fluss::DataType::Float()) + .AddColumn("col_double", fluss::DataType::Double()) + .AddColumn("col_boolean", fluss::DataType::Boolean()) + .AddColumn("col_char", fluss::DataType::Char(10)) + .AddColumn("col_string", fluss::DataType::String()) + .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2)) + .AddColumn("col_date", fluss::DataType::Date()) + .AddColumn("col_time", fluss::DataType::Time()) + .AddColumn("col_timestamp", fluss::DataType::Timestamp()) + .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz()) + .AddColumn("col_bytes", fluss::DataType::Bytes()) + .AddColumn("col_binary", fluss::DataType::Binary(4)) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + size_t field_count = table.GetTableInfo().schema.columns.size(); + + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Test data + int32_t col_tinyint = 127; + int32_t col_smallint = 32767; + int32_t col_int = 2147483647; + int64_t col_bigint = 9223372036854775807LL; + float col_float = 3.14f; + double col_double = 2.718281828459045; + bool col_boolean = true; + std::string col_char = "hello"; + std::string col_string = "world of fluss rust client"; + std::string col_decimal = "123.45"; + auto col_date = fluss::Date::FromDays(20476); // 2026-01-23 + auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47 + auto col_timestamp = fluss::Timestamp::FromMillisNanos(1769163227123, 456000); + auto col_timestamp_ltz = fluss::Timestamp::FromMillisNanos(1769163227123, 456000); + std::vector col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 'a', 't', 'a'}; + std::vector col_binary = {0xDE, 0xAD, 0xBE, 0xEF}; + + // Append a row with all datatypes + { + fluss::GenericRow row(field_count); + row.SetInt32(0, col_tinyint); + row.SetInt32(1, col_smallint); + row.SetInt32(2, col_int); + row.SetInt64(3, col_bigint); + row.SetFloat32(4, col_float); + row.SetFloat64(5, col_double); + row.SetBool(6, col_boolean); + row.SetString(7, col_char); + row.SetString(8, col_string); + row.SetDecimal(9, col_decimal); + row.SetDate(10, col_date); + row.SetTime(11, col_time); + row.SetTimestampNtz(12, col_timestamp); + row.SetTimestampLtz(13, col_timestamp_ltz); + row.SetBytes(14, col_bytes); + row.SetBytes(15, col_binary); + ASSERT_OK(append_writer.Append(row)); + } + + // Append a row with null values + { + fluss::GenericRow row_with_nulls(field_count); + for (size_t i = 0; i < field_count; ++i) { + row_with_nulls.SetNull(i); + } + ASSERT_OK(append_writer.Append(row_with_nulls)); + } + + ASSERT_OK(append_writer.Flush()); + + // Scan the records + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + ASSERT_OK(log_scanner.Subscribe(0, 0)); + + // Poll until we get 2 records + std::vector all_records; + size_t total_records = 0; + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (total_records < 2 && std::chrono::steady_clock::now() < deadline) { + fluss::ScanRecords records; + ASSERT_OK(log_scanner.Poll(5000, records)); + total_records += records.Size(); + if (records.Size() > 0) { + all_records.push_back(std::move(records)); + } + } + ASSERT_EQ(total_records, 2u) << "Expected 2 records"; + + // Verify first record (all values) + auto rec = all_records[0][0]; + auto& row = rec.row; + + EXPECT_EQ(row.GetInt32(0), col_tinyint) << "col_tinyint mismatch"; + EXPECT_EQ(row.GetInt32(1), col_smallint) << "col_smallint mismatch"; + EXPECT_EQ(row.GetInt32(2), col_int) << "col_int mismatch"; + EXPECT_EQ(row.GetInt64(3), col_bigint) << "col_bigint mismatch"; + EXPECT_NEAR(row.GetFloat32(4), col_float, 1e-6f) << "col_float mismatch"; + EXPECT_NEAR(row.GetFloat64(5), col_double, 1e-15) << "col_double mismatch"; + EXPECT_EQ(row.GetBool(6), col_boolean) << "col_boolean mismatch"; + EXPECT_EQ(row.GetString(7), col_char) << "col_char mismatch"; + EXPECT_EQ(row.GetString(8), col_string) << "col_string mismatch"; + EXPECT_EQ(row.GetDecimalString(9), col_decimal) << "col_decimal mismatch"; + EXPECT_EQ(row.GetDate(10).days_since_epoch, col_date.days_since_epoch) << "col_date mismatch"; + EXPECT_EQ(row.GetTime(11).millis_since_midnight, col_time.millis_since_midnight) + << "col_time mismatch"; + EXPECT_EQ(row.GetTimestamp(12).epoch_millis, col_timestamp.epoch_millis) + << "col_timestamp millis mismatch"; + EXPECT_EQ(row.GetTimestamp(12).nano_of_millisecond, col_timestamp.nano_of_millisecond) + << "col_timestamp nanos mismatch"; + EXPECT_EQ(row.GetTimestamp(13).epoch_millis, col_timestamp_ltz.epoch_millis) + << "col_timestamp_ltz millis mismatch"; + EXPECT_EQ(row.GetTimestamp(13).nano_of_millisecond, col_timestamp_ltz.nano_of_millisecond) + << "col_timestamp_ltz nanos mismatch"; + + auto [bytes_ptr, bytes_len] = row.GetBytes(14); + EXPECT_EQ(bytes_len, col_bytes.size()) << "col_bytes length mismatch"; + EXPECT_TRUE(std::memcmp(bytes_ptr, col_bytes.data(), bytes_len) == 0) + << "col_bytes mismatch"; + + auto [binary_ptr, binary_len] = row.GetBytes(15); + EXPECT_EQ(binary_len, col_binary.size()) << "col_binary length mismatch"; + EXPECT_TRUE(std::memcmp(binary_ptr, col_binary.data(), binary_len) == 0) + << "col_binary mismatch"; + + // Verify second record (all nulls) + // The second record might be in the same ScanRecords or a different one + fluss::ScanRecord null_rec = (all_records[0].Size() > 1) ? all_records[0][1] : all_records[1][0]; + for (size_t i = 0; i < field_count; ++i) { + EXPECT_TRUE(null_rec.row.IsNull(i)) << "column " << i << " should be null"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, PartitionedTableAppendScan) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partitioned_log_append_cpp"); + + // Create a partitioned log table + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("region", fluss::DataType::String()) + .AddColumn("value", fluss::DataType::BigInt()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetPartitionKeys({"region"}) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + // Create partitions + fluss_test::CreatePartitions(adm, table_path, "region", {"US", "EU"}); + + // Wait for partitions + std::this_thread::sleep_for(std::chrono::seconds(2)); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Append rows + struct TestData { + int32_t id; + std::string region; + int64_t value; + }; + std::vector test_data = {{1, "US", 100}, {2, "US", 200}, {3, "EU", 300}, {4, "EU", 400}}; + + for (const auto& d : test_data) { + fluss::GenericRow row(3); + row.SetInt32(0, d.id); + row.SetString(1, d.region); + row.SetInt64(2, d.value); + ASSERT_OK(append_writer.Append(row)); + } + ASSERT_OK(append_writer.Flush()); + + // Append arrow batches per partition + { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues({5, 6}).ok(); + auto region_builder = arrow::StringBuilder(); + region_builder.AppendValues({"US", "US"}).ok(); + auto value_builder = arrow::Int64Builder(); + value_builder.AppendValues({500, 600}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("region", arrow::utf8()), + arrow::field("value", arrow::int64())}), + 2, + {id_builder.Finish().ValueOrDie(), region_builder.Finish().ValueOrDie(), + value_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues({7, 8}).ok(); + auto region_builder = arrow::StringBuilder(); + region_builder.AppendValues({"EU", "EU"}).ok(); + auto value_builder = arrow::Int64Builder(); + value_builder.AppendValues({700, 800}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("id", arrow::int32()), + arrow::field("region", arrow::utf8()), + arrow::field("value", arrow::int64())}), + 2, + {id_builder.Finish().ValueOrDie(), region_builder.Finish().ValueOrDie(), + value_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + // Test list partition offsets + std::unordered_map us_offsets; + ASSERT_OK(adm.ListPartitionOffsets(table_path, "US", {0}, fluss::OffsetSpec::Latest(), + us_offsets)); + EXPECT_EQ(us_offsets[0], 4) << "US partition should have 4 records"; + + std::unordered_map eu_offsets; + ASSERT_OK(adm.ListPartitionOffsets(table_path, "EU", {0}, fluss::OffsetSpec::Latest(), + eu_offsets)); + EXPECT_EQ(eu_offsets[0], 4) << "EU partition should have 4 records"; + + // Subscribe to all partitions and scan + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + + std::vector partition_infos; + ASSERT_OK(adm.ListPartitionInfos(table_path, partition_infos)); + + for (const auto& pi : partition_infos) { + ASSERT_OK(log_scanner.SubscribePartitionBuckets(pi.partition_id, 0, 0)); + } + + // Collect all records + using Record = std::tuple; + std::vector collected; + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (collected.size() < 8 && std::chrono::steady_clock::now() < deadline) { + fluss::ScanRecords records; + ASSERT_OK(log_scanner.Poll(500, records)); + for (size_t i = 0; i < records.Size(); ++i) { + auto rec = records[i]; + collected.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)), + rec.row.GetInt64(2)); + } + } + + ASSERT_EQ(collected.size(), 8u) << "Expected 8 records total"; + std::sort(collected.begin(), collected.end()); + + std::vector expected = {{1, "US", 100}, {2, "US", 200}, {3, "EU", 300}, + {4, "EU", 400}, {5, "US", 500}, {6, "US", 600}, + {7, "EU", 700}, {8, "EU", 800}}; + EXPECT_EQ(collected, expected); + + // Test unsubscribe_partition: unsubscribe EU, should only get US data + { + fluss::Table unsub_table; + ASSERT_OK(conn.GetTable(table_path, unsub_table)); + auto unsub_scan = unsub_table.NewScan(); + fluss::LogScanner unsub_scanner; + ASSERT_OK(unsub_scan.CreateLogScanner(unsub_scanner)); + + int64_t eu_partition_id = -1; + for (const auto& pi : partition_infos) { + ASSERT_OK(unsub_scanner.SubscribePartitionBuckets(pi.partition_id, 0, 0)); + if (pi.partition_name == "EU") { + eu_partition_id = pi.partition_id; + } + } + ASSERT_GE(eu_partition_id, 0) << "EU partition should exist"; + + ASSERT_OK(unsub_scanner.UnsubscribePartition(eu_partition_id, 0)); + + std::vector us_only; + auto unsub_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + while (us_only.size() < 4 && std::chrono::steady_clock::now() < unsub_deadline) { + fluss::ScanRecords records; + ASSERT_OK(unsub_scanner.Poll(300, records)); + for (size_t i = 0; i < records.Size(); ++i) { + auto rec = records[i]; + us_only.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)), + rec.row.GetInt64(2)); + } + } + + ASSERT_EQ(us_only.size(), 4u) << "Should receive exactly 4 US records"; + for (const auto& [id, region, val] : us_only) { + EXPECT_EQ(region, "US") << "After unsubscribe EU, only US data should be read"; + } + } + + // Test subscribe_partition_buckets (batch subscribe) + { + fluss::Table batch_table; + ASSERT_OK(conn.GetTable(table_path, batch_table)); + auto batch_scan = batch_table.NewScan(); + fluss::LogScanner batch_scanner; + ASSERT_OK(batch_scan.CreateLogScanner(batch_scanner)); + + std::vector subs; + for (const auto& pi : partition_infos) { + subs.push_back({pi.partition_id, 0, 0}); + } + ASSERT_OK(batch_scanner.SubscribePartitionBuckets(subs)); + + std::vector batch_collected; + auto batch_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (batch_collected.size() < 8 && std::chrono::steady_clock::now() < batch_deadline) { + fluss::ScanRecords records; + ASSERT_OK(batch_scanner.Poll(500, records)); + for (size_t i = 0; i < records.Size(); ++i) { + auto rec = records[i]; + batch_collected.emplace_back(rec.row.GetInt32(0), + std::string(rec.row.GetString(1)), + rec.row.GetInt64(2)); + } + } + ASSERT_EQ(batch_collected.size(), 8u); + std::sort(batch_collected.begin(), batch_collected.end()); + EXPECT_EQ(batch_collected, expected); + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp new file mode 100644 index 00000000..8c2e2d96 --- /dev/null +++ b/bindings/cpp/test/test_main.cpp @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "test_utils.h" + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + // Register the global test environment (manages the Fluss cluster lifecycle). + ::testing::AddGlobalTestEnvironment(fluss_test::FlussTestEnvironment::Instance()); + + return RUN_ALL_TESTS(); +} diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h new file mode 100644 index 00000000..f045423b --- /dev/null +++ b/bindings/cpp/test/test_utils.h @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#include +#pragma comment(lib, "ws2_32.lib") +#else +#include +#include +#include +#include +#endif + +#include "fluss.hpp" + +// Macro to assert Result is OK and print error message on failure +#define ASSERT_OK(result) ASSERT_TRUE((result).Ok()) << (result).error_message +#define EXPECT_OK(result) EXPECT_TRUE((result).Ok()) << (result).error_message + +namespace fluss_test { + +static constexpr const char* kFlussVersion = "0.7.0"; +static constexpr const char* kNetworkName = "fluss-cpp-test-network"; +static constexpr const char* kZookeeperName = "zookeeper-cpp-test"; +static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test"; +static constexpr const char* kTabletServerName = "tablet-server-cpp-test"; +static constexpr int kCoordinatorPort = 9123; +static constexpr int kTabletServerPort = 9124; + +/// Execute a shell command and return its exit code. +inline int RunCommand(const std::string& cmd) { + return system(cmd.c_str()); +} + +/// Wait until a TCP port is accepting connections, or timeout. +inline bool WaitForPort(const std::string& host, int port, int timeout_seconds = 60) { + auto deadline = + std::chrono::steady_clock::now() + std::chrono::seconds(timeout_seconds); + + while (std::chrono::steady_clock::now() < deadline) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + continue; + } + + struct sockaddr_in addr {}; + addr.sin_family = AF_INET; + addr.sin_port = htons(static_cast(port)); + inet_pton(AF_INET, host.c_str(), &addr.sin_addr); + + int result = connect(sock, reinterpret_cast(&addr), sizeof(addr)); +#ifdef _WIN32 + closesocket(sock); +#else + close(sock); +#endif + if (result == 0) { + return true; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + return false; +} + +/// Manages a Docker-based Fluss cluster for integration testing. +class FlussTestCluster { + public: + FlussTestCluster() = default; + + bool Start() { + const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS"); + if (env_servers && std::strlen(env_servers) > 0) { + bootstrap_servers_ = env_servers; + external_cluster_ = true; + std::cout << "Using external cluster: " << bootstrap_servers_ << std::endl; + return true; + } + + std::cout << "Starting Fluss cluster via Docker..." << std::endl; + + // Create network + RunCommand(std::string("docker network create ") + kNetworkName + " 2>/dev/null || true"); + + // Start ZooKeeper + std::string zk_cmd = std::string("docker run -d --rm") + + " --name " + kZookeeperName + + " --network " + kNetworkName + + " zookeeper:3.9.2"; + if (RunCommand(zk_cmd) != 0) { + std::cerr << "Failed to start ZooKeeper" << std::endl; + return false; + } + + // Wait for ZooKeeper to be ready before starting Fluss servers + std::this_thread::sleep_for(std::chrono::seconds(5)); + + // Start Coordinator Server + std::string coord_props = + "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n" + "bind.listeners: INTERNAL://" + std::string(kCoordinatorName) + ":0, CLIENT://" + + std::string(kCoordinatorName) + ":9123\\n" + "advertised.listeners: CLIENT://localhost:9123\\n" + "internal.listener.name: INTERNAL\\n" + "netty.server.num-network-threads: 1\\n" + "netty.server.num-worker-threads: 3"; + + std::string coord_cmd = std::string("docker run -d --rm") + + " --name " + kCoordinatorName + + " --network " + kNetworkName + + " -p 9123:9123" + + " -e FLUSS_PROPERTIES=\"$(printf '" + coord_props + "')\"" + + " fluss/fluss:" + kFlussVersion + + " coordinatorServer"; + if (RunCommand(coord_cmd) != 0) { + std::cerr << "Failed to start Coordinator Server" << std::endl; + Stop(); + return false; + } + + // Wait for coordinator to be ready + if (!WaitForPort("127.0.0.1", kCoordinatorPort)) { + std::cerr << "Coordinator Server did not become ready" << std::endl; + Stop(); + return false; + } + + // Start Tablet Server + std::string ts_props = + "zookeeper.address: " + std::string(kZookeeperName) + ":2181\\n" + "bind.listeners: INTERNAL://" + std::string(kTabletServerName) + ":0, CLIENT://" + + std::string(kTabletServerName) + ":9123\\n" + "advertised.listeners: CLIENT://localhost:" + std::to_string(kTabletServerPort) + "\\n" + "internal.listener.name: INTERNAL\\n" + "tablet-server.id: 0\\n" + "netty.server.num-network-threads: 1\\n" + "netty.server.num-worker-threads: 3"; + + std::string ts_cmd = std::string("docker run -d --rm") + + " --name " + kTabletServerName + + " --network " + kNetworkName + + " -p " + std::to_string(kTabletServerPort) + ":9123" + + " -e FLUSS_PROPERTIES=\"$(printf '" + ts_props + "')\"" + + " fluss/fluss:" + kFlussVersion + + " tabletServer"; + if (RunCommand(ts_cmd) != 0) { + std::cerr << "Failed to start Tablet Server" << std::endl; + Stop(); + return false; + } + + // Wait for tablet server to be ready + if (!WaitForPort("127.0.0.1", kTabletServerPort)) { + std::cerr << "Tablet Server did not become ready" << std::endl; + Stop(); + return false; + } + + bootstrap_servers_ = "127.0.0.1:9123"; + std::cout << "Fluss cluster started successfully." << std::endl; + return true; + } + + void Stop() { + if (external_cluster_) return; + + std::cout << "Stopping Fluss cluster..." << std::endl; + RunCommand(std::string("docker stop ") + kTabletServerName + " 2>/dev/null || true"); + RunCommand(std::string("docker stop ") + kCoordinatorName + " 2>/dev/null || true"); + RunCommand(std::string("docker stop ") + kZookeeperName + " 2>/dev/null || true"); + RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true"); + std::cout << "Fluss cluster stopped." << std::endl; + } + + const std::string& GetBootstrapServers() const { return bootstrap_servers_; } + + private: + std::string bootstrap_servers_; + bool external_cluster_{false}; +}; + +/// GoogleTest Environment that manages the Fluss cluster lifecycle. +class FlussTestEnvironment : public ::testing::Environment { + public: + static FlussTestEnvironment* Instance() { + static FlussTestEnvironment* instance = nullptr; + if (!instance) { + instance = new FlussTestEnvironment(); + } + return instance; + } + + void SetUp() override { + if (!cluster_.Start()) { + GTEST_SKIP() << "Failed to start Fluss cluster. Skipping integration tests."; + } + + // Retry connection creation until the coordinator is fully initialized. + fluss::Configuration config; + config.bootstrap_servers = cluster_.GetBootstrapServers(); + + auto deadline = + std::chrono::steady_clock::now() + std::chrono::seconds(60); + while (std::chrono::steady_clock::now() < deadline) { + auto result = fluss::Connection::Create(config, connection_); + if (result.Ok()) { + auto admin_result = connection_.GetAdmin(admin_); + if (admin_result.Ok()) { + std::cout << "Connected to Fluss cluster." << std::endl; + return; + } + } + std::cout << "Waiting for Fluss cluster to be ready..." << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + GTEST_SKIP() << "Fluss cluster did not become ready within timeout."; + } + + void TearDown() override { + cluster_.Stop(); + } + + fluss::Connection& GetConnection() { return connection_; } + fluss::Admin& GetAdmin() { return admin_; } + const std::string& GetBootstrapServers() { return cluster_.GetBootstrapServers(); } + + private: + FlussTestEnvironment() = default; + + FlussTestCluster cluster_; + fluss::Connection connection_; + fluss::Admin admin_; +}; + +/// Helper: create a table (assert success). Drops existing table first if it exists. +inline void CreateTable(fluss::Admin& admin, const fluss::TablePath& path, + const fluss::TableDescriptor& descriptor) { + admin.DropTable(path, true); // ignore if not exists + auto result = admin.CreateTable(path, descriptor, false); + ASSERT_OK(result); +} + +/// Helper: create partitions for a partitioned table. +inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path, + const std::string& partition_column, + const std::vector& values) { + for (const auto& value : values) { + std::unordered_map spec; + spec[partition_column] = value; + auto result = admin.CreatePartition(path, spec, true); + ASSERT_OK(result); + } +} + +/// Helper: poll a log scanner until we collect expected_count records, or timeout. +inline std::vector PollRecords(fluss::LogScanner& scanner, + size_t expected_count, + int timeout_seconds = 10) { + // We need to keep the ScanRecords alive since ScanRecord borrows from them + // Instead, we'll poll and collect data we need + + // Actually, ScanRecord borrows from ScanRecords, which borrows from the inner Rust data. + // We can't return ScanRecords in a vector since they're move-only. Instead let's + // accumulate all ScanRecords objects and return a flattened view. + // For simplicity, we'll return a vector of extracted values directly. + + // Wait - actually the caller needs the ScanRecord to read fields. The problem is + // ScanRecord has a RowView that borrows from ScanRecords. So we need to keep + // the ScanRecords alive while using ScanRecord. + + // For the tests, it's better to collect ScanRecords objects and then use them. + // But since we need a flat vector of ScanRecord, we need a different approach. + + // Let's just return here with a collected count and let the caller handle the polling. + // Actually, the simplest pattern is to poll in a loop and process inline. + // Let me return nothing and let the test do the polling. + + // Hmm, actually we CAN return ScanRecords if we keep them in a vector. + // The issue is that ScanRecord has a RowView borrowing from ScanRecords. + // But if we never move the ScanRecords after creating ScanRecords, the borrow is valid. + + // This is getting complicated. Let me just provide a simpler helper. + std::vector records; + return records; // placeholder - tests will poll inline +} + +/// Helper: get a fresh connection from the test environment. +inline fluss::Connection GetConnection() { + auto* env = FlussTestEnvironment::Instance(); + fluss::Configuration config; + config.bootstrap_servers = env->GetBootstrapServers(); + fluss::Connection conn; + auto result = fluss::Connection::Create(config, conn); + if (!result.Ok()) { + throw std::runtime_error("Failed to create connection: " + result.error_message); + } + return conn; +} + +/// Helper: get a table from a new connection. +inline fluss::Table GetTable(const fluss::TablePath& path) { + auto conn = GetConnection(); + fluss::Table table; + auto result = conn.GetTable(path, table); + if (!result.Ok()) { + throw std::runtime_error("Failed to get table: " + result.error_message); + } + return table; +} + +} // namespace fluss_test From 74ff3667a60e3a922a08318ef69b99c23b2f8769 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 19 Feb 2026 22:16:39 +0000 Subject: [PATCH 2/8] Add additional buckets for log table IT. Add partialUpdateByIndex for Cpp IT --- bindings/cpp/test/test_kv_table.cpp | 90 +++++++++++++++++++++ bindings/cpp/test/test_log_table.cpp | 36 ++++----- bindings/python/test/test_log_table.py | 4 +- crates/fluss/tests/integration/log_table.rs | 59 +++++++------- 4 files changed, 139 insertions(+), 50 deletions(-) diff --git a/bindings/cpp/test/test_kv_table.cpp b/bindings/cpp/test/test_kv_table.cpp index 1bb65e0d..9c4f7a02 100644 --- a/bindings/cpp/test/test_kv_table.cpp +++ b/bindings/cpp/test/test_kv_table.cpp @@ -341,6 +341,96 @@ TEST_F(KvTableTest, PartialUpdate) { ASSERT_OK(adm.DropTable(table_path, false)); } +TEST_F(KvTableTest, PartialUpdateByIndex) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partial_update_by_index_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Insert initial record with all columns + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + { + fluss::GenericRow row(4); + row.SetInt32(0, 1); + row.SetString(1, "Verso"); + row.SetInt64(2, 32); + row.SetInt64(3, 6942); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify initial record + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1); + EXPECT_EQ(result.GetString(1), "Verso"); + EXPECT_EQ(result.GetInt64(2), 32); + EXPECT_EQ(result.GetInt64(3), 6942); + } + + // Create partial update writer using column indices: 0 (id) and 3 (score) + auto partial_upsert = table.NewUpsert(); + partial_upsert.PartialUpdateByIndex({0, 3}); + fluss::UpsertWriter partial_writer; + ASSERT_OK(partial_upsert.CreateWriter(partial_writer)); + + // Update only the score column (await acknowledgment) + { + fluss::GenericRow partial_row(4); + partial_row.SetInt32(0, 1); + partial_row.SetNull(1); // not in partial update + partial_row.SetNull(2); // not in partial update + partial_row.SetInt64(3, 420); + fluss::WriteResult wr; + ASSERT_OK(partial_writer.Upsert(partial_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify partial update - name and age should remain unchanged + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1) << "id should remain 1"; + EXPECT_EQ(result.GetString(1), "Verso") << "name should remain unchanged"; + EXPECT_EQ(result.GetInt64(2), 32) << "age should remain unchanged"; + EXPECT_EQ(result.GetInt64(3), 420) << "score should be updated to 420"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + TEST_F(KvTableTest, PartitionedTableUpsertAndLookup) { auto& adm = admin(); auto& conn = connection(); diff --git a/bindings/cpp/test/test_log_table.cpp b/bindings/cpp/test/test_log_table.cpp index c4db3be8..e0b9c4cc 100644 --- a/bindings/cpp/test/test_log_table.cpp +++ b/bindings/cpp/test/test_log_table.cpp @@ -49,6 +49,8 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) { auto table_descriptor = fluss::TableDescriptor::NewBuilder() .SetSchema(schema) + .SetBucketCount(3) + .SetBucketKeys({"c1"}) .SetProperty("table.replication.factor", "1") .Build(); @@ -91,10 +93,11 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) { ASSERT_OK(append_writer.Flush()); - // Create scanner and subscribe + // Create scanner and subscribe to all 3 buckets fluss::Table scan_table; ASSERT_OK(conn.GetTable(table_path, scan_table)); int32_t num_buckets = scan_table.GetTableInfo().num_buckets; + ASSERT_EQ(num_buckets, 3) << "Table should have 3 buckets"; auto table_scan = scan_table.NewScan(); fluss::LogScanner log_scanner; @@ -104,27 +107,24 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) { ASSERT_OK(log_scanner.Subscribe(bucket_id, fluss::EARLIEST_OFFSET)); } - // Poll for records - fluss::ScanRecords scan_records; - ASSERT_OK(log_scanner.Poll(10000, scan_records)); - - ASSERT_EQ(scan_records.Size(), 6u) << "Expected 6 records"; - - // Collect and sort by offset + // Poll for records across all buckets std::vector> records; - for (size_t i = 0; i < scan_records.Size(); ++i) { - auto rec = scan_records[i]; - records.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (records.size() < 6 && std::chrono::steady_clock::now() < deadline) { + fluss::ScanRecords scan_records; + ASSERT_OK(log_scanner.Poll(500, scan_records)); + for (size_t i = 0; i < scan_records.Size(); ++i) { + auto rec = scan_records[i]; + records.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); + } } - std::sort(records.begin(), records.end()); - std::vector expected_c1 = {1, 2, 3, 4, 5, 6}; - std::vector expected_c2 = {"a1", "a2", "a3", "a4", "a5", "a6"}; + ASSERT_EQ(records.size(), 6u) << "Expected 6 records"; + std::sort(records.begin(), records.end()); - for (size_t i = 0; i < 6; ++i) { - EXPECT_EQ(records[i].first, expected_c1[i]) << "c1 mismatch at row " << i; - EXPECT_EQ(records[i].second, expected_c2[i]) << "c2 mismatch at row " << i; - } + std::vector> expected = { + {1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}}; + EXPECT_EQ(records, expected); // Test unsubscribe ASSERT_OK(log_scanner.Unsubscribe(0)); diff --git a/bindings/python/test/test_log_table.py b/bindings/python/test/test_log_table.py index bfa97897..dd1a4d4f 100644 --- a/bindings/python/test/test_log_table.py +++ b/bindings/python/test/test_log_table.py @@ -36,7 +36,9 @@ async def test_append_and_scan(connection, admin): schema = fluss.Schema( pa.schema([pa.field("c1", pa.int32()), pa.field("c2", pa.string())]) ) - table_descriptor = fluss.TableDescriptor(schema) + table_descriptor = fluss.TableDescriptor( + schema, bucket_count=3, bucket_keys=["c1"] + ) await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) table = await connection.get_table(table_path) diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 76420676..eac72e5c 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -39,7 +39,7 @@ mod table_test { }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan}; - use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::record::ScanRecord; use fluss::row::InternalRow; use fluss::rpc::message::OffsetSpec; @@ -79,6 +79,7 @@ mod table_test { .build() .expect("Failed to build schema"), ) + .distributed_by(Some(3), vec!["c1".to_string()]) .build() .expect("Failed to build table"); @@ -127,38 +128,34 @@ mod table_test { .expect("Failed to subscribe with EARLIEST_OFFSET"); } - // Poll for records - let scan_records = log_scanner - .poll(tokio::time::Duration::from_secs(10)) - .await - .expect("Failed to poll records"); - - // Verify the scanned records - let table_bucket = TableBucket::new(table.get_table_info().table_id, 0); - let records = scan_records.records(&table_bucket); - - assert_eq!(records.len(), 6, "Expected 6 records"); - - // Verify record contents match what was appended - let expected_c1_values = vec![1, 2, 3, 4, 5, 6]; - let expected_c2_values = vec!["a1", "a2", "a3", "a4", "a5", "a6"]; - - for (i, record) in records.iter().enumerate() { - let row = record.row(); - assert_eq!( - row.get_int(0), - expected_c1_values[i], - "c1 value mismatch at row {}", - i - ); - assert_eq!( - row.get_string(1), - expected_c2_values[i], - "c2 value mismatch at row {}", - i - ); + // Poll for records across all buckets + let mut collected: Vec<(i32, String)> = Vec::new(); + let start_time = std::time::Instant::now(); + while collected.len() < 6 && start_time.elapsed() < Duration::from_secs(10) { + let scan_records = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("Failed to poll records"); + for rec in scan_records { + let row = rec.row(); + collected.push((row.get_int(0), row.get_string(1).to_string())); + } } + assert_eq!(collected.len(), 6, "Expected 6 records"); + + // Sort and verify record contents + collected.sort(); + let expected: Vec<(i32, String)> = vec![ + (1, "a1".to_string()), + (2, "a2".to_string()), + (3, "a3".to_string()), + (4, "a4".to_string()), + (5, "a5".to_string()), + (6, "a6".to_string()), + ]; + assert_eq!(collected, expected); + // Test unsubscribe: unsubscribe from bucket 0, verify no error log_scanner .unsubscribe(0) From 6b842b617e43415ec6a30a1c3cf5db4fbfacdecc Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Fri, 20 Feb 2026 20:10:01 +0000 Subject: [PATCH 3/8] Add C++ per-bucket scan integration test --- bindings/cpp/test/test_log_table.cpp | 81 +++++++++++++++++++--------- 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/bindings/cpp/test/test_log_table.cpp b/bindings/cpp/test/test_log_table.cpp index e0b9c4cc..6dbc9b74 100644 --- a/bindings/cpp/test/test_log_table.cpp +++ b/bindings/cpp/test/test_log_table.cpp @@ -113,8 +113,7 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) { while (records.size() < 6 && std::chrono::steady_clock::now() < deadline) { fluss::ScanRecords scan_records; ASSERT_OK(log_scanner.Poll(500, scan_records)); - for (size_t i = 0; i < scan_records.Size(); ++i) { - auto rec = scan_records[i]; + for (auto rec : scan_records) { records.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); } } @@ -126,6 +125,45 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) { {1, "a1"}, {2, "a2"}, {3, "a3"}, {4, "a4"}, {5, "a5"}, {6, "a6"}}; EXPECT_EQ(records, expected); + // Verify per-bucket iteration via BucketView + { + fluss::Table bucket_table; + ASSERT_OK(conn.GetTable(table_path, bucket_table)); + auto bucket_scan = bucket_table.NewScan(); + fluss::LogScanner bucket_scanner; + ASSERT_OK(bucket_scan.CreateLogScanner(bucket_scanner)); + + for (int32_t bid = 0; bid < num_buckets; ++bid) { + ASSERT_OK(bucket_scanner.Subscribe(bid, fluss::EARLIEST_OFFSET)); + } + + std::vector> bucket_records; + auto bucket_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + size_t buckets_with_data = 0; + while (bucket_records.size() < 6 && std::chrono::steady_clock::now() < bucket_deadline) { + fluss::ScanRecords scan_records; + ASSERT_OK(bucket_scanner.Poll(500, scan_records)); + + // Iterate by bucket + for (size_t b = 0; b < scan_records.BucketCount(); ++b) { + auto bucket_view = scan_records.BucketAt(b); + if (!bucket_view.Empty()) { + buckets_with_data++; + } + for (auto rec : bucket_view) { + bucket_records.emplace_back(rec.row.GetInt32(0), + std::string(rec.row.GetString(1))); + } + } + } + + ASSERT_EQ(bucket_records.size(), 6u) << "Expected 6 records via per-bucket iteration"; + EXPECT_GT(buckets_with_data, 1u) << "Records should be distributed across multiple buckets"; + + std::sort(bucket_records.begin(), bucket_records.end()); + EXPECT_EQ(bucket_records, expected); + } + // Test unsubscribe ASSERT_OK(log_scanner.Unsubscribe(0)); @@ -290,15 +328,14 @@ TEST_F(LogTableTest, TestProject) { fluss::ScanRecords records; ASSERT_OK(scanner.Poll(10000, records)); - ASSERT_EQ(records.Size(), 3u) << "Should have 3 records with project_by_name"; + ASSERT_EQ(records.Count(), 3u) << "Should have 3 records with project_by_name"; std::vector expected_col_b = {"x", "y", "z"}; std::vector expected_col_c = {10, 20, 30}; // Collect and sort by col_c to get deterministic order std::vector> collected; - for (size_t i = 0; i < records.Size(); ++i) { - auto rec = records[i]; + for (auto rec : records) { collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); } std::sort(collected.begin(), collected.end(), @@ -324,14 +361,13 @@ TEST_F(LogTableTest, TestProject) { fluss::ScanRecords records; ASSERT_OK(scanner.Poll(10000, records)); - ASSERT_EQ(records.Size(), 3u); + ASSERT_EQ(records.Count(), 3u); std::vector expected_col_b = {"x", "y", "z"}; std::vector expected_col_a = {1, 2, 3}; std::vector> collected; - for (size_t i = 0; i < records.Size(); ++i) { - auto rec = records[i]; + for (auto rec : records) { collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); } std::sort(collected.begin(), collected.end(), @@ -585,22 +621,19 @@ TEST_F(LogTableTest, AllSupportedDatatypes) { ASSERT_OK(log_scanner.Subscribe(0, 0)); // Poll until we get 2 records - std::vector all_records; - size_t total_records = 0; + std::vector all_records; auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (total_records < 2 && std::chrono::steady_clock::now() < deadline) { + while (all_records.size() < 2 && std::chrono::steady_clock::now() < deadline) { fluss::ScanRecords records; ASSERT_OK(log_scanner.Poll(5000, records)); - total_records += records.Size(); - if (records.Size() > 0) { - all_records.push_back(std::move(records)); + for (auto rec : records) { + all_records.push_back(rec); } } - ASSERT_EQ(total_records, 2u) << "Expected 2 records"; + ASSERT_EQ(all_records.size(), 2u) << "Expected 2 records"; // Verify first record (all values) - auto rec = all_records[0][0]; - auto& row = rec.row; + auto& row = all_records[0].row; EXPECT_EQ(row.GetInt32(0), col_tinyint) << "col_tinyint mismatch"; EXPECT_EQ(row.GetInt32(1), col_smallint) << "col_smallint mismatch"; @@ -635,10 +668,9 @@ TEST_F(LogTableTest, AllSupportedDatatypes) { << "col_binary mismatch"; // Verify second record (all nulls) - // The second record might be in the same ScanRecords or a different one - fluss::ScanRecord null_rec = (all_records[0].Size() > 1) ? all_records[0][1] : all_records[1][0]; + auto& null_row = all_records[1].row; for (size_t i = 0; i < field_count; ++i) { - EXPECT_TRUE(null_rec.row.IsNull(i)) << "column " << i << " should be null"; + EXPECT_TRUE(null_row.IsNull(i)) << "column " << i << " should be null"; } ASSERT_OK(adm.DropTable(table_path, false)); @@ -767,8 +799,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) { while (collected.size() < 8 && std::chrono::steady_clock::now() < deadline) { fluss::ScanRecords records; ASSERT_OK(log_scanner.Poll(500, records)); - for (size_t i = 0; i < records.Size(); ++i) { - auto rec = records[i]; + for (auto rec : records) { collected.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)), rec.row.GetInt64(2)); } @@ -806,8 +837,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) { while (us_only.size() < 4 && std::chrono::steady_clock::now() < unsub_deadline) { fluss::ScanRecords records; ASSERT_OK(unsub_scanner.Poll(300, records)); - for (size_t i = 0; i < records.Size(); ++i) { - auto rec = records[i]; + for (auto rec : records) { us_only.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)), rec.row.GetInt64(2)); } @@ -838,8 +868,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) { while (batch_collected.size() < 8 && std::chrono::steady_clock::now() < batch_deadline) { fluss::ScanRecords records; ASSERT_OK(batch_scanner.Poll(500, records)); - for (size_t i = 0; i < records.Size(); ++i) { - auto rec = records[i]; + for (auto rec : records) { batch_collected.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)), rec.row.GetInt64(2)); From 8fc0bed962554dd5164f90b753658bb17bf639ae Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 21 Feb 2026 09:45:40 +0000 Subject: [PATCH 4/8] Remove unused utils methods --- bindings/cpp/test/test_utils.h | 56 ---------------------------------- 1 file changed, 56 deletions(-) diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h index f045423b..89874fbc 100644 --- a/bindings/cpp/test/test_utils.h +++ b/bindings/cpp/test/test_utils.h @@ -283,60 +283,4 @@ inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path, } } -/// Helper: poll a log scanner until we collect expected_count records, or timeout. -inline std::vector PollRecords(fluss::LogScanner& scanner, - size_t expected_count, - int timeout_seconds = 10) { - // We need to keep the ScanRecords alive since ScanRecord borrows from them - // Instead, we'll poll and collect data we need - - // Actually, ScanRecord borrows from ScanRecords, which borrows from the inner Rust data. - // We can't return ScanRecords in a vector since they're move-only. Instead let's - // accumulate all ScanRecords objects and return a flattened view. - // For simplicity, we'll return a vector of extracted values directly. - - // Wait - actually the caller needs the ScanRecord to read fields. The problem is - // ScanRecord has a RowView that borrows from ScanRecords. So we need to keep - // the ScanRecords alive while using ScanRecord. - - // For the tests, it's better to collect ScanRecords objects and then use them. - // But since we need a flat vector of ScanRecord, we need a different approach. - - // Let's just return here with a collected count and let the caller handle the polling. - // Actually, the simplest pattern is to poll in a loop and process inline. - // Let me return nothing and let the test do the polling. - - // Hmm, actually we CAN return ScanRecords if we keep them in a vector. - // The issue is that ScanRecord has a RowView borrowing from ScanRecords. - // But if we never move the ScanRecords after creating ScanRecords, the borrow is valid. - - // This is getting complicated. Let me just provide a simpler helper. - std::vector records; - return records; // placeholder - tests will poll inline -} - -/// Helper: get a fresh connection from the test environment. -inline fluss::Connection GetConnection() { - auto* env = FlussTestEnvironment::Instance(); - fluss::Configuration config; - config.bootstrap_servers = env->GetBootstrapServers(); - fluss::Connection conn; - auto result = fluss::Connection::Create(config, conn); - if (!result.Ok()) { - throw std::runtime_error("Failed to create connection: " + result.error_message); - } - return conn; -} - -/// Helper: get a table from a new connection. -inline fluss::Table GetTable(const fluss::TablePath& path) { - auto conn = GetConnection(); - fluss::Table table; - auto result = conn.GetTable(path, table); - if (!result.Ok()) { - throw std::runtime_error("Failed to get table: " + result.error_message); - } - return table; -} - } // namespace fluss_test From 94563cb8b55c0d9b1b220d34baa82f3ff5a0244d Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 21 Feb 2026 11:46:21 +0000 Subject: [PATCH 5/8] Remove unnecessary python versions run on rust only CI --- .../{ci_bindings_cpp.yml => ci_cpp.yml} | 2 +- .../{ci_bindings_python.yml => ci_python.yml} | 2 +- .github/workflows/ci_rust.yml | 22 +++++-------------- 3 files changed, 7 insertions(+), 19 deletions(-) rename .github/workflows/{ci_bindings_cpp.yml => ci_cpp.yml} (99%) rename .github/workflows/{ci_bindings_python.yml => ci_python.yml} (99%) diff --git a/.github/workflows/ci_bindings_cpp.yml b/.github/workflows/ci_cpp.yml similarity index 99% rename from .github/workflows/ci_bindings_cpp.yml rename to .github/workflows/ci_cpp.yml index 1404d8f3..658ccf0c 100644 --- a/.github/workflows/ci_bindings_cpp.yml +++ b/.github/workflows/ci_cpp.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Bindings C++ +name: C++ Bindings on: push: diff --git a/.github/workflows/ci_bindings_python.yml b/.github/workflows/ci_python.yml similarity index 99% rename from .github/workflows/ci_bindings_python.yml rename to .github/workflows/ci_python.yml index 17005fa1..a586eeb8 100644 --- a/.github/workflows/ci_bindings_python.yml +++ b/.github/workflows/ci_python.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Bindings Python +name: Python Bindings on: push: diff --git a/.github/workflows/ci_rust.yml b/.github/workflows/ci_rust.yml index f4cb7db9..fbfb8858 100644 --- a/.github/workflows/ci_rust.yml +++ b/.github/workflows/ci_rust.yml @@ -46,15 +46,9 @@ jobs: os: - ubuntu-latest - macos-latest - python: ["3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v4 - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python }} - - name: Install protoc run: | if [ "$RUNNER_OS" = "Linux" ]; then @@ -70,10 +64,10 @@ jobs: ~/.cargo/registry ~/.cargo/git target - key: build-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} + key: build-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - name: Build - run: cargo build --workspace --all-targets + run: cargo build --workspace --all-targets --exclude fluss_python --exclude fluss-cpp test: timeout-minutes: 60 @@ -83,15 +77,9 @@ jobs: os: - ubuntu-latest - macos-latest - python: ["3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v4 - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python }} - - name: Install protoc run: | if [ "$RUNNER_OS" = "Linux" ]; then @@ -107,10 +95,10 @@ jobs: ~/.cargo/registry ~/.cargo/git target - key: test-${{ runner.os }}-${{ matrix.python }}-${{ hashFiles('**/Cargo.lock') }} + key: test-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - name: Unit Test - run: cargo test --all-targets --workspace + run: cargo test --all-targets --workspace --exclude fluss_python --exclude fluss-cpp env: RUST_LOG: DEBUG RUST_BACKTRACE: full @@ -118,7 +106,7 @@ jobs: - name: Integration Test (Linux only) if: runner.os == 'Linux' run: | - RUST_TEST_THREADS=1 cargo test --features integration_tests --all-targets --workspace -- --nocapture + RUST_TEST_THREADS=1 cargo test --features integration_tests --all-targets --workspace --exclude fluss_python --exclude fluss-cpp -- --nocapture env: RUST_LOG: DEBUG RUST_BACKTRACE: full From ce628df9407a080eeb70d68f592bc35d38fd9c06 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 21 Feb 2026 12:03:02 +0000 Subject: [PATCH 6/8] Remove unnecessary separate build job (integration test job also builds) for rust --- ...docs-check.yml => check_documentation.yml} | 4 +-- ...k.yml => check_license_and_formatting.yml} | 4 +-- .github/workflows/ci_cpp.yml | 4 +-- .github/workflows/ci_python.yml | 4 +-- .github/workflows/ci_rust.yml | 35 +++---------------- 5 files changed, 12 insertions(+), 39 deletions(-) rename .github/workflows/{docs-check.yml => check_documentation.yml} (96%) rename .github/workflows/{ci_check.yml => check_license_and_formatting.yml} (96%) diff --git a/.github/workflows/docs-check.yml b/.github/workflows/check_documentation.yml similarity index 96% rename from .github/workflows/docs-check.yml rename to .github/workflows/check_documentation.yml index 6408c541..70e6a438 100644 --- a/.github/workflows/docs-check.yml +++ b/.github/workflows/check_documentation.yml @@ -17,7 +17,7 @@ ################################################################################ # This workflow is meant for checking broken links in the documentation. -name: Check Documentation +name: Documentation Check permissions: contents: read on: @@ -31,7 +31,7 @@ on: - 'website/**' jobs: - test-deploy: + check-documentation: runs-on: ubuntu-latest defaults: run: diff --git a/.github/workflows/ci_check.yml b/.github/workflows/check_license_and_formatting.yml similarity index 96% rename from .github/workflows/ci_check.yml rename to .github/workflows/check_license_and_formatting.yml index 00746d2b..a9603144 100644 --- a/.github/workflows/ci_check.yml +++ b/.github/workflows/check_license_and_formatting.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Check +name: License and Formatting Check on: push: @@ -34,7 +34,7 @@ concurrency: cancel-in-progress: true jobs: - check: + license-and-formatting-check: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/ci_cpp.yml b/.github/workflows/ci_cpp.yml index 658ccf0c..da151829 100644 --- a/.github/workflows/ci_cpp.yml +++ b/.github/workflows/ci_cpp.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: C++ Bindings +name: C++ Build and Tests on: push: @@ -38,7 +38,7 @@ concurrency: cancel-in-progress: true jobs: - cpp-integration-test: + cpp-build-and-tests: timeout-minutes: 60 runs-on: ubuntu-latest steps: diff --git a/.github/workflows/ci_python.yml b/.github/workflows/ci_python.yml index a586eeb8..6f3d28d8 100644 --- a/.github/workflows/ci_python.yml +++ b/.github/workflows/ci_python.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Python Bindings +name: Python Build and Tests on: push: @@ -38,7 +38,7 @@ concurrency: cancel-in-progress: true jobs: - python-integration-test: + python-build-and-tests: timeout-minutes: 60 runs-on: ubuntu-latest strategy: diff --git a/.github/workflows/ci_rust.yml b/.github/workflows/ci_rust.yml index fbfb8858..d1af2aa1 100644 --- a/.github/workflows/ci_rust.yml +++ b/.github/workflows/ci_rust.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Rust +name: Rust Build and Tests on: push: @@ -39,7 +39,8 @@ concurrency: cancel-in-progress: true jobs: - build: + rust-build-and-tests: + timeout-minutes: 60 runs-on: ${{ matrix.os }} strategy: matrix: @@ -64,39 +65,11 @@ jobs: ~/.cargo/registry ~/.cargo/git target - key: build-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - name: Build run: cargo build --workspace --all-targets --exclude fluss_python --exclude fluss-cpp - test: - timeout-minutes: 60 - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: - - ubuntu-latest - - macos-latest - steps: - - uses: actions/checkout@v4 - - - name: Install protoc - run: | - if [ "$RUNNER_OS" = "Linux" ]; then - sudo apt-get update && sudo apt-get install -y protobuf-compiler - elif [ "$RUNNER_OS" = "macOS" ]; then - brew install protobuf - fi - - - name: Rust Cache - uses: actions/cache@v4 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: test-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} - - name: Unit Test run: cargo test --all-targets --workspace --exclude fluss_python --exclude fluss-cpp env: From c090ec5d78166c110ebfec63290c411a8ff58cbe Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 21 Feb 2026 12:08:36 +0000 Subject: [PATCH 7/8] Consistent CI File and Job namings --- .github/workflows/{ci_cpp.yml => build_and_test_cpp.yml} | 2 +- .github/workflows/{ci_python.yml => build_and_test_python.yml} | 2 +- .github/workflows/{ci_rust.yml => build_and_test_rust.yml} | 2 +- .github/workflows/check_license_and_formatting.yml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) rename .github/workflows/{ci_cpp.yml => build_and_test_cpp.yml} (99%) rename .github/workflows/{ci_python.yml => build_and_test_python.yml} (98%) rename .github/workflows/{ci_rust.yml => build_and_test_rust.yml} (99%) diff --git a/.github/workflows/ci_cpp.yml b/.github/workflows/build_and_test_cpp.yml similarity index 99% rename from .github/workflows/ci_cpp.yml rename to .github/workflows/build_and_test_cpp.yml index da151829..5cdd14d7 100644 --- a/.github/workflows/ci_cpp.yml +++ b/.github/workflows/build_and_test_cpp.yml @@ -38,7 +38,7 @@ concurrency: cancel-in-progress: true jobs: - cpp-build-and-tests: + build-and-test-cpp: timeout-minutes: 60 runs-on: ubuntu-latest steps: diff --git a/.github/workflows/ci_python.yml b/.github/workflows/build_and_test_python.yml similarity index 98% rename from .github/workflows/ci_python.yml rename to .github/workflows/build_and_test_python.yml index 6f3d28d8..efb5caab 100644 --- a/.github/workflows/ci_python.yml +++ b/.github/workflows/build_and_test_python.yml @@ -38,7 +38,7 @@ concurrency: cancel-in-progress: true jobs: - python-build-and-tests: + build-and-test-python: timeout-minutes: 60 runs-on: ubuntu-latest strategy: diff --git a/.github/workflows/ci_rust.yml b/.github/workflows/build_and_test_rust.yml similarity index 99% rename from .github/workflows/ci_rust.yml rename to .github/workflows/build_and_test_rust.yml index d1af2aa1..c9e05b74 100644 --- a/.github/workflows/ci_rust.yml +++ b/.github/workflows/build_and_test_rust.yml @@ -39,7 +39,7 @@ concurrency: cancel-in-progress: true jobs: - rust-build-and-tests: + build-and-test-rust: timeout-minutes: 60 runs-on: ${{ matrix.os }} strategy: diff --git a/.github/workflows/check_license_and_formatting.yml b/.github/workflows/check_license_and_formatting.yml index a9603144..1b83b749 100644 --- a/.github/workflows/check_license_and_formatting.yml +++ b/.github/workflows/check_license_and_formatting.yml @@ -34,7 +34,7 @@ concurrency: cancel-in-progress: true jobs: - license-and-formatting-check: + check-license-and-formatting: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 From bdb8a28705a99a3fb0fda8bfb92cd6355104b615 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 21 Feb 2026 12:44:34 +0000 Subject: [PATCH 8/8] Reduce boilerplate for polling in C++ IT --- bindings/cpp/test/test_log_table.cpp | 80 +++++----------------------- bindings/cpp/test/test_utils.h | 29 ++++++++++ 2 files changed, 43 insertions(+), 66 deletions(-) diff --git a/bindings/cpp/test/test_log_table.cpp b/bindings/cpp/test/test_log_table.cpp index 6dbc9b74..47ab6f25 100644 --- a/bindings/cpp/test/test_log_table.cpp +++ b/bindings/cpp/test/test_log_table.cpp @@ -109,15 +109,9 @@ TEST_F(LogTableTest, AppendRecordBatchAndScan) { // Poll for records across all buckets std::vector> records; - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (records.size() < 6 && std::chrono::steady_clock::now() < deadline) { - fluss::ScanRecords scan_records; - ASSERT_OK(log_scanner.Poll(500, scan_records)); - for (auto rec : scan_records) { - records.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); - } - } - + fluss_test::PollRecords(log_scanner, 6, [](const fluss::ScanRecord& rec) { + return std::make_pair(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); + }, records); ASSERT_EQ(records.size(), 6u) << "Expected 6 records"; std::sort(records.begin(), records.end()); @@ -454,13 +448,7 @@ TEST_F(LogTableTest, TestPollBatches) { // Test 2: Poll until we get all 6 records std::vector all_ids; - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (all_ids.size() < 6 && std::chrono::steady_clock::now() < deadline) { - fluss::ArrowRecordBatches batches; - ASSERT_OK(scanner.PollRecordBatch(5000, batches)); - auto ids = extract_ids(batches); - all_ids.insert(all_ids.end(), ids.begin(), ids.end()); - } + fluss_test::PollRecordBatches(scanner, 6, extract_ids, all_ids); ASSERT_EQ(all_ids, (std::vector{1, 2, 3, 4, 5, 6})); // Test 3: Append more and verify offset continuation (no duplicates) @@ -468,13 +456,7 @@ TEST_F(LogTableTest, TestPollBatches) { ASSERT_OK(writer.Flush()); std::vector new_ids; - deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (new_ids.size() < 2 && std::chrono::steady_clock::now() < deadline) { - fluss::ArrowRecordBatches batches; - ASSERT_OK(scanner.PollRecordBatch(5000, batches)); - auto ids = extract_ids(batches); - new_ids.insert(new_ids.end(), ids.begin(), ids.end()); - } + fluss_test::PollRecordBatches(scanner, 2, extract_ids, new_ids); ASSERT_EQ(new_ids, (std::vector{7, 8})); // Test 4: Subscribing from mid-offset should truncate batch @@ -487,13 +469,7 @@ TEST_F(LogTableTest, TestPollBatches) { ASSERT_OK(trunc_scanner.Subscribe(0, 3)); std::vector trunc_ids; - deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (trunc_ids.size() < 5 && std::chrono::steady_clock::now() < deadline) { - fluss::ArrowRecordBatches batches; - ASSERT_OK(trunc_scanner.PollRecordBatch(5000, batches)); - auto ids = extract_ids(batches); - trunc_ids.insert(trunc_ids.end(), ids.begin(), ids.end()); - } + fluss_test::PollRecordBatches(trunc_scanner, 5, extract_ids, trunc_ids); ASSERT_EQ(trunc_ids, (std::vector{4, 5, 6, 7, 8})); } @@ -622,14 +598,8 @@ TEST_F(LogTableTest, AllSupportedDatatypes) { // Poll until we get 2 records std::vector all_records; - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (all_records.size() < 2 && std::chrono::steady_clock::now() < deadline) { - fluss::ScanRecords records; - ASSERT_OK(log_scanner.Poll(5000, records)); - for (auto rec : records) { - all_records.push_back(rec); - } - } + fluss_test::PollRecords(log_scanner, 2, + [](const fluss::ScanRecord& rec) { return rec; }, all_records); ASSERT_EQ(all_records.size(), 2u) << "Expected 2 records"; // Verify first record (all values) @@ -794,16 +764,11 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) { // Collect all records using Record = std::tuple; + auto extract_record = [](const fluss::ScanRecord& rec) -> Record { + return {rec.row.GetInt32(0), std::string(rec.row.GetString(1)), rec.row.GetInt64(2)}; + }; std::vector collected; - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (collected.size() < 8 && std::chrono::steady_clock::now() < deadline) { - fluss::ScanRecords records; - ASSERT_OK(log_scanner.Poll(500, records)); - for (auto rec : records) { - collected.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)), - rec.row.GetInt64(2)); - } - } + fluss_test::PollRecords(log_scanner, 8, extract_record, collected); ASSERT_EQ(collected.size(), 8u) << "Expected 8 records total"; std::sort(collected.begin(), collected.end()); @@ -833,15 +798,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) { ASSERT_OK(unsub_scanner.UnsubscribePartition(eu_partition_id, 0)); std::vector us_only; - auto unsub_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - while (us_only.size() < 4 && std::chrono::steady_clock::now() < unsub_deadline) { - fluss::ScanRecords records; - ASSERT_OK(unsub_scanner.Poll(300, records)); - for (auto rec : records) { - us_only.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1)), - rec.row.GetInt64(2)); - } - } + fluss_test::PollRecords(unsub_scanner, 4, extract_record, us_only); ASSERT_EQ(us_only.size(), 4u) << "Should receive exactly 4 US records"; for (const auto& [id, region, val] : us_only) { @@ -864,16 +821,7 @@ TEST_F(LogTableTest, PartitionedTableAppendScan) { ASSERT_OK(batch_scanner.SubscribePartitionBuckets(subs)); std::vector batch_collected; - auto batch_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (batch_collected.size() < 8 && std::chrono::steady_clock::now() < batch_deadline) { - fluss::ScanRecords records; - ASSERT_OK(batch_scanner.Poll(500, records)); - for (auto rec : records) { - batch_collected.emplace_back(rec.row.GetInt32(0), - std::string(rec.row.GetString(1)), - rec.row.GetInt64(2)); - } - } + fluss_test::PollRecords(batch_scanner, 8, extract_record, batch_collected); ASSERT_EQ(batch_collected.size(), 8u); std::sort(batch_collected.begin(), batch_collected.end()); EXPECT_EQ(batch_collected, expected); diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h index 89874fbc..bae52377 100644 --- a/bindings/cpp/test/test_utils.h +++ b/bindings/cpp/test/test_utils.h @@ -283,4 +283,33 @@ inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path, } } +/// Poll a LogScanner for ScanRecords until `expected_count` items are collected or timeout. +/// `extract_fn` is called for each ScanRecord and should return a value of type T. +template +void PollRecords(fluss::LogScanner& scanner, size_t expected_count, + ExtractFn extract_fn, std::vector& out) { + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (out.size() < expected_count && std::chrono::steady_clock::now() < deadline) { + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(1000, records)); + for (auto rec : records) { + out.push_back(extract_fn(rec)); + } + } +} + +/// Poll a LogScanner for ArrowRecordBatches until `expected_count` items are collected or timeout. +/// `extract_fn` is called with the full ArrowRecordBatches and should return a std::vector. +template +void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count, + ExtractFn extract_fn, std::vector& out) { + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (out.size() < expected_count && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(1000, batches)); + auto items = extract_fn(batches); + out.insert(out.end(), items.begin(), items.end()); + } +} + } // namespace fluss_test