Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test_cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ jobs:
cmake -B build -DFLUSS_ENABLE_TESTING=ON -DCMAKE_BUILD_TYPE=Debug
cmake --build build --parallel

- name: Run C++ integration tests
- name: Run C++ integration tests (parallel)
working-directory: bindings/cpp
run: cd build && ctest --output-on-failure --timeout 300
run: cd build && ctest -j$(nproc) --output-on-failure --timeout 300
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
15 changes: 14 additions & 1 deletion bindings/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ if (FLUSS_ENABLE_TESTING)
FetchContent_MakeAvailable(googletest)

enable_testing()
include(GoogleTest)

file(GLOB TEST_SOURCE_FILES "test/*.cpp")
add_executable(fluss_cpp_test ${TEST_SOURCE_FILES})
Expand All @@ -267,5 +268,17 @@ if (FLUSS_ENABLE_TESTING)
${PROJECT_SOURCE_DIR}/test
)

add_test(NAME fluss_cpp_integration_tests COMMAND fluss_cpp_test)
# Individual tests for parallel execution via ctest -j.
gtest_discover_tests(fluss_cpp_test
PROPERTIES
TIMEOUT 120
FIXTURES_REQUIRED fluss_cluster
)

# Cleanup: stop Docker containers after all tests finish.
# Mirrors Python's pytest_unconfigure and Rust's atexit cleanup.
add_test(NAME fluss_cluster_cleanup COMMAND fluss_cpp_test --cleanup)
set_tests_properties(fluss_cluster_cleanup PROPERTIES
FIXTURES_CLEANUP fluss_cluster
)
endif()
10 changes: 10 additions & 0 deletions bindings/cpp/test/test_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
#include "test_utils.h"

int main(int argc, char** argv) {
// --cleanup: stop Docker containers and exit (used by ctest FIXTURES_CLEANUP).
for (int i = 1; i < argc; ++i) {
if (std::string(argv[i]) == "--cleanup") {
const char* env = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
if (env && std::strlen(env) > 0) return 0;
fluss_test::FlussTestCluster::StopAll();
return 0;
}
}

::testing::InitGoogleTest(&argc, argv);

// Register the global test environment (manages the Fluss cluster lifecycle).
Expand Down
102 changes: 64 additions & 38 deletions bindings/cpp/test/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,42 @@ class FlussTestCluster {
const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS");
if (env_servers && std::strlen(env_servers) > 0) {
bootstrap_servers_ = env_servers;
const char* env_sasl = std::getenv("FLUSS_SASL_BOOTSTRAP_SERVERS");
if (env_sasl && std::strlen(env_sasl) > 0) {
sasl_bootstrap_servers_ = env_sasl;
}
external_cluster_ = true;
std::cout << "Using external cluster: " << bootstrap_servers_ << std::endl;
return true;
}

// Reuse cluster started by another parallel test process or previous run.
if (WaitForPort("127.0.0.1", kPlainClientPort, /*timeout_seconds=*/1)) {
SetBootstrapServers();
external_cluster_ = true;
return true;
}

std::cout << "Starting Fluss cluster via Docker..." << std::endl;

// Create network
// Remove stopped (not running) containers from previous runs.
RunCommand(std::string("docker rm ") + kTabletServerName + " 2>/dev/null || true");
RunCommand(std::string("docker rm ") + kCoordinatorName + " 2>/dev/null || true");
RunCommand(std::string("docker rm ") + kZookeeperName + " 2>/dev/null || true");
RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true");

RunCommand(std::string("docker network create ") + kNetworkName + " 2>/dev/null || true");

// Start ZooKeeper
std::string zk_cmd = std::string("docker run -d --rm") + " --name " + kZookeeperName +
" --network " + kNetworkName + " zookeeper:3.9.2";
if (RunCommand(zk_cmd) != 0) {
std::cerr << "Failed to start ZooKeeper" << std::endl;
return false;
return WaitForCluster();
}

// Wait for ZooKeeper to be ready before starting Fluss servers
// Wait for ZooKeeper to be ready
std::this_thread::sleep_for(std::chrono::seconds(5));

// Start Coordinator Server (dual listeners: CLIENT=SASL on 9123, PLAIN_CLIENT=plaintext on
// 9223)
// Coordinator Server (dual listeners: SASL on 9123, plaintext on 9223)
std::string sasl_jaas =
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"
" user_admin=\"admin-secret\" user_alice=\"alice-secret\";";
Expand All @@ -171,19 +184,15 @@ class FlussTestCluster {
std::string coord_cmd = DockerRunCmd(kCoordinatorName, coord_props,
{"9123:9123", "9223:9223"}, "coordinatorServer");
if (RunCommand(coord_cmd) != 0) {
std::cerr << "Failed to start Coordinator Server" << std::endl;
Stop();
return false;
return WaitForCluster();
}

// Wait for coordinator to be ready
if (!WaitForPort("127.0.0.1", kCoordinatorPort)) {
std::cerr << "Coordinator Server did not become ready" << std::endl;
Stop();
return false;
}

// Start Tablet Server (dual listeners: CLIENT=SASL on 9123, PLAIN_CLIENT=plaintext on 9223)
// Tablet Server (dual listeners: SASL on 9124, plaintext on 9224)
std::string ts = std::string(kTabletServerName);
std::string ts_props = JoinProps({
"zookeeper.address: " + zk + ":2181",
Expand All @@ -205,43 +214,33 @@ class FlussTestCluster {
std::to_string(kPlainClientTabletPort) + ":9223"},
"tabletServer");
if (RunCommand(ts_cmd) != 0) {
std::cerr << "Failed to start Tablet Server" << std::endl;
Stop();
return false;
return WaitForCluster();
}

// Wait for tablet server to be ready
if (!WaitForPort("127.0.0.1", kTabletServerPort)) {
std::cerr << "Tablet Server did not become ready" << std::endl;
Stop();
return false;
}

// Wait for plaintext listeners
if (!WaitForPort("127.0.0.1", kPlainClientPort)) {
std::cerr << "Coordinator plaintext listener did not become ready" << std::endl;
Stop();
return false;
}
if (!WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
std::cerr << "Tablet Server plaintext listener did not become ready" << std::endl;
Stop();
if (!WaitForPort("127.0.0.1", kTabletServerPort) ||
!WaitForPort("127.0.0.1", kPlainClientPort) ||
!WaitForPort("127.0.0.1", kPlainClientTabletPort)) {
std::cerr << "Cluster listeners did not become ready" << std::endl;
return false;
}

bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
sasl_bootstrap_servers_ = "127.0.0.1:" + std::to_string(kCoordinatorPort);
SetBootstrapServers();
std::cout << "Fluss cluster started successfully." << std::endl;
return true;
}

void Stop() {
if (external_cluster_) return;
StopAll();
}

/// Unconditionally stop and remove all cluster containers and the network.
/// Used by the --cleanup flag from ctest FIXTURES_CLEANUP.
static void StopAll() {
std::cout << "Stopping Fluss cluster..." << std::endl;
RunCommand(std::string("docker stop ") + kTabletServerName + " 2>/dev/null || true");
RunCommand(std::string("docker stop ") + kCoordinatorName + " 2>/dev/null || true");
RunCommand(std::string("docker stop ") + kZookeeperName + " 2>/dev/null || true");
RunCommand(std::string("docker rm -f ") + kTabletServerName + " 2>/dev/null || true");
RunCommand(std::string("docker rm -f ") + kCoordinatorName + " 2>/dev/null || true");
RunCommand(std::string("docker rm -f ") + kZookeeperName + " 2>/dev/null || true");
RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true");
std::cout << "Fluss cluster stopped." << std::endl;
}
Expand All @@ -250,6 +249,32 @@ class FlussTestCluster {
const std::string& GetSaslBootstrapServers() const { return sasl_bootstrap_servers_; }

private:
void SetBootstrapServers() {
bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort);
sasl_bootstrap_servers_ = "127.0.0.1:" + std::to_string(kCoordinatorPort);
}

/// Wait for a cluster being started by another process.
/// Fails fast if no containers exist (real Docker failure vs race).
bool WaitForCluster() {
if (RunCommand(std::string("docker inspect ") + kZookeeperName + " >/dev/null 2>&1") != 0) {
std::cerr << "Failed to start cluster (docker error)" << std::endl;
return false;
}
std::cout << "Waiting for cluster started by another process..." << std::endl;
if (!WaitForPort("127.0.0.1", kPlainClientPort) ||
!WaitForPort("127.0.0.1", kPlainClientTabletPort) ||
!WaitForPort("127.0.0.1", kCoordinatorPort) ||
!WaitForPort("127.0.0.1", kTabletServerPort)) {
std::cerr << "Cluster did not become ready" << std::endl;
return false;
}
SetBootstrapServers();
external_cluster_ = true;
std::cout << "Cluster ready." << std::endl;
return true;
}

std::string bootstrap_servers_;
std::string sasl_bootstrap_servers_;
bool external_cluster_{false};
Expand Down Expand Up @@ -291,7 +316,8 @@ class FlussTestEnvironment : public ::testing::Environment {
GTEST_SKIP() << "Fluss cluster did not become ready within timeout.";
}

void TearDown() override { cluster_.Stop(); }
// Cluster stays alive for parallel processes and subsequent runs.
void TearDown() override {}

fluss::Connection& GetConnection() { return connection_; }
fluss::Admin& GetAdmin() { return admin_; }
Expand Down
Loading