diff --git a/.github/workflows/build_and_test_cpp.yml b/.github/workflows/build_and_test_cpp.yml index 5cdd14d7..1931983d 100644 --- a/.github/workflows/build_and_test_cpp.yml +++ b/.github/workflows/build_and_test_cpp.yml @@ -70,9 +70,9 @@ jobs: cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug cmake --build build --parallel - - name: Run C++ integration tests + - name: Run C++ integration tests (parallel) working-directory: bindings/cpp - run: cd build && ctest --output-on-failure --timeout 300 + run: cd build && ctest -j$(nproc) --output-on-failure --timeout 300 env: RUST_LOG: DEBUG RUST_BACKTRACE: full diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index 0cedf682..ac936116 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -256,6 +256,7 @@ if (FLUSS_ENABLE_TESTING) FetchContent_MakeAvailable(googletest) enable_testing() + include(GoogleTest) file(GLOB TEST_SOURCE_FILES "test/*.cpp") add_executable(fluss_cpp_test ${TEST_SOURCE_FILES}) @@ -267,5 +268,17 @@ if (FLUSS_ENABLE_TESTING) ${PROJECT_SOURCE_DIR}/test ) - add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test) + # Individual tests for parallel execution via ctest -j. + gtest_discover_tests(fluss_cpp_test + PROPERTIES + TIMEOUT 120 + FIXTURES_REQUIRED fluss_cluster + ) + + # Cleanup: stop Docker containers after all tests finish. + # Mirrors Python's pytest_unconfigure and Rust's atexit cleanup. + add_test(NAME fluss_cluster_cleanup COMMAND fluss_cpp_test --cleanup) + set_tests_properties(fluss_cluster_cleanup PROPERTIES + FIXTURES_CLEANUP fluss_cluster + ) endif() diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp index 8c2e2d96..7b132d2c 100644 --- a/bindings/cpp/test/test_main.cpp +++ b/bindings/cpp/test/test_main.cpp @@ -22,6 +22,16 @@ #include "test_utils.h" int main(int argc, char** argv) { + // --cleanup: stop Docker containers and exit (used by ctest FIXTURES_CLEANUP). + for (int i = 1; i < argc; ++i) { + if (std::string(argv[i]) == "--cleanup") { + const char* env = std::getenv("FLUSS_BOOTSTRAP_SERVERS"); + if (env && std::strlen(env) > 0) return 0; + fluss_test::FlussTestCluster::StopAll(); + return 0; + } + } + ::testing::InitGoogleTest(&argc, argv); // Register the global test environment (manages the Fluss cluster lifecycle). diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h index 05c32cf2..f5b49716 100644 --- a/bindings/cpp/test/test_utils.h +++ b/bindings/cpp/test/test_utils.h @@ -126,29 +126,42 @@ class FlussTestCluster { const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS"); if (env_servers && std::strlen(env_servers) > 0) { bootstrap_servers_ = env_servers; + const char* env_sasl = std::getenv("FLUSS_SASL_BOOTSTRAP_SERVERS"); + if (env_sasl && std::strlen(env_sasl) > 0) { + sasl_bootstrap_servers_ = env_sasl; + } external_cluster_ = true; std::cout << "Using external cluster: " << bootstrap_servers_ << std::endl; return true; } + // Reuse cluster started by another parallel test process or previous run. + if (WaitForPort("127.0.0.1", kPlainClientPort, /*timeout_seconds=*/1)) { + SetBootstrapServers(); + external_cluster_ = true; + return true; + } + std::cout << "Starting Fluss cluster via Docker..." << std::endl; - // Create network + // Remove stopped (not running) containers from previous runs. + RunCommand(std::string("docker rm ") + kTabletServerName + " 2>/dev/null || true"); + RunCommand(std::string("docker rm ") + kCoordinatorName + " 2>/dev/null || true"); + RunCommand(std::string("docker rm ") + kZookeeperName + " 2>/dev/null || true"); + RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true"); + RunCommand(std::string("docker network create ") + kNetworkName + " 2>/dev/null || true"); - // Start ZooKeeper std::string zk_cmd = std::string("docker run -d --rm") + " --name " + kZookeeperName + " --network " + kNetworkName + " zookeeper:3.9.2"; if (RunCommand(zk_cmd) != 0) { - std::cerr << "Failed to start ZooKeeper" << std::endl; - return false; + return WaitForCluster(); } - // Wait for ZooKeeper to be ready before starting Fluss servers + // Wait for ZooKeeper to be ready std::this_thread::sleep_for(std::chrono::seconds(5)); - // Start Coordinator Server (dual listeners: CLIENT=SASL on 9123, PLAIN_CLIENT=plaintext on - // 9223) + // Coordinator Server (dual listeners: SASL on 9123, plaintext on 9223) std::string sasl_jaas = "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required" " user_admin=\"admin-secret\" user_alice=\"alice-secret\";"; @@ -171,19 +184,15 @@ class FlussTestCluster { std::string coord_cmd = DockerRunCmd(kCoordinatorName, coord_props, {"9123:9123", "9223:9223"}, "coordinatorServer"); if (RunCommand(coord_cmd) != 0) { - std::cerr << "Failed to start Coordinator Server" << std::endl; - Stop(); - return false; + return WaitForCluster(); } - // Wait for coordinator to be ready if (!WaitForPort("127.0.0.1", kCoordinatorPort)) { std::cerr << "Coordinator Server did not become ready" << std::endl; - Stop(); return false; } - // Start Tablet Server (dual listeners: CLIENT=SASL on 9123, PLAIN_CLIENT=plaintext on 9223) + // Tablet Server (dual listeners: SASL on 9124, plaintext on 9224) std::string ts = std::string(kTabletServerName); std::string ts_props = JoinProps({ "zookeeper.address: " + zk + ":2181", @@ -205,43 +214,33 @@ class FlussTestCluster { std::to_string(kPlainClientTabletPort) + ":9223"}, "tabletServer"); if (RunCommand(ts_cmd) != 0) { - std::cerr << "Failed to start Tablet Server" << std::endl; - Stop(); - return false; + return WaitForCluster(); } - // Wait for tablet server to be ready - if (!WaitForPort("127.0.0.1", kTabletServerPort)) { - std::cerr << "Tablet Server did not become ready" << std::endl; - Stop(); - return false; - } - - // Wait for plaintext listeners - if (!WaitForPort("127.0.0.1", kPlainClientPort)) { - std::cerr << "Coordinator plaintext listener did not become ready" << std::endl; - Stop(); - return false; - } - if (!WaitForPort("127.0.0.1", kPlainClientTabletPort)) { - std::cerr << "Tablet Server plaintext listener did not become ready" << std::endl; - Stop(); + if (!WaitForPort("127.0.0.1", kTabletServerPort) || + !WaitForPort("127.0.0.1", kPlainClientPort) || + !WaitForPort("127.0.0.1", kPlainClientTabletPort)) { + std::cerr << "Cluster listeners did not become ready" << std::endl; return false; } - bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort); - sasl_bootstrap_servers_ = "127.0.0.1:" + std::to_string(kCoordinatorPort); + SetBootstrapServers(); std::cout << "Fluss cluster started successfully." << std::endl; return true; } void Stop() { if (external_cluster_) return; + StopAll(); + } + /// Unconditionally stop and remove all cluster containers and the network. + /// Used by the --cleanup flag from ctest FIXTURES_CLEANUP. + static void StopAll() { std::cout << "Stopping Fluss cluster..." << std::endl; - RunCommand(std::string("docker stop ") + kTabletServerName + " 2>/dev/null || true"); - RunCommand(std::string("docker stop ") + kCoordinatorName + " 2>/dev/null || true"); - RunCommand(std::string("docker stop ") + kZookeeperName + " 2>/dev/null || true"); + RunCommand(std::string("docker rm -f ") + kTabletServerName + " 2>/dev/null || true"); + RunCommand(std::string("docker rm -f ") + kCoordinatorName + " 2>/dev/null || true"); + RunCommand(std::string("docker rm -f ") + kZookeeperName + " 2>/dev/null || true"); RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true"); std::cout << "Fluss cluster stopped." << std::endl; } @@ -250,6 +249,32 @@ class FlussTestCluster { const std::string& GetSaslBootstrapServers() const { return sasl_bootstrap_servers_; } private: + void SetBootstrapServers() { + bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort); + sasl_bootstrap_servers_ = "127.0.0.1:" + std::to_string(kCoordinatorPort); + } + + /// Wait for a cluster being started by another process. + /// Fails fast if no containers exist (real Docker failure vs race). + bool WaitForCluster() { + if (RunCommand(std::string("docker inspect ") + kZookeeperName + " >/dev/null 2>&1") != 0) { + std::cerr << "Failed to start cluster (docker error)" << std::endl; + return false; + } + std::cout << "Waiting for cluster started by another process..." << std::endl; + if (!WaitForPort("127.0.0.1", kPlainClientPort) || + !WaitForPort("127.0.0.1", kPlainClientTabletPort) || + !WaitForPort("127.0.0.1", kCoordinatorPort) || + !WaitForPort("127.0.0.1", kTabletServerPort)) { + std::cerr << "Cluster did not become ready" << std::endl; + return false; + } + SetBootstrapServers(); + external_cluster_ = true; + std::cout << "Cluster ready." << std::endl; + return true; + } + std::string bootstrap_servers_; std::string sasl_bootstrap_servers_; bool external_cluster_{false}; @@ -291,7 +316,8 @@ class FlussTestEnvironment : public ::testing::Environment { GTEST_SKIP() << "Fluss cluster did not become ready within timeout."; } - void TearDown() override { cluster_.Stop(); } + // Cluster stays alive for parallel processes and subsequent runs. + void TearDown() override {} fluss::Connection& GetConnection() { return connection_; } fluss::Admin& GetAdmin() { return admin_; }