Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ jobs:
uv sync --extra dev
uv run maturin develop

- name: Run Python integration tests
- name: Run Python integration tests (parallel)
working-directory: bindings/python
run: uv run pytest test/ -v
run: uv run pytest test/ -v -n auto
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ __pycache__/
*.py[cod]
*$py.class
*.so
*.dylib
*.dSYM/
*.egg-info/
dist/
build/
Expand Down
1 change: 1 addition & 0 deletions bindings/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dev = [
"mypy>=1.17.1",
"pytest>=8.3.5",
"pytest-asyncio>=0.25.3",
"pytest-xdist>=3.5.0",
"ruff>=0.9.10",
"maturin>=1.8.2",
"testcontainers>=4.0.0",
Expand Down
218 changes: 165 additions & 53 deletions bindings/python/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,27 @@
If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster.
Otherwise, a Fluss cluster is started automatically via testcontainers.

The first pytest-xdist worker to run starts the cluster; other workers
detect it via port check and reuse it (matching the C++ test pattern).
Containers are cleaned up after all workers finish via pytest_unconfigure.

Run with:
uv run maturin develop && uv run pytest test/ -v
uv run maturin develop && uv run pytest test/ -v -n auto
"""

import asyncio
import os
import socket
import subprocess
import time

# Disable testcontainers Ryuk reaper for xdist runs — it would kill
# containers when the first worker exits, while others are still running.
# We handle cleanup ourselves in pytest_unconfigure.
# In single-process mode, keep Ryuk as a safety net for hard crashes.
if "PYTEST_XDIST_WORKER" in os.environ:
os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true")

import pytest
import pytest_asyncio

Expand All @@ -37,47 +50,77 @@
FLUSS_VERSION = "0.9.0-incubating"
BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")

# Container / network names
NETWORK_NAME = "fluss-python-test-network"
ZOOKEEPER_NAME = "zookeeper-python-test"
COORDINATOR_NAME = "coordinator-server-python-test"
TABLET_SERVER_NAME = "tablet-server-python-test"

# Fixed host ports (must match across workers)
COORDINATOR_PORT = 9123
TABLET_SERVER_PORT = 9124
PLAIN_CLIENT_PORT = 9223
PLAIN_CLIENT_TABLET_PORT = 9224

ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, PLAIN_CLIENT_TABLET_PORT]


def _wait_for_port(host, port, timeout=60):
"""Wait for a TCP port to become available."""
start = time.time()
while time.time() - start < timeout:
try:
with socket.create_connection((host, port), timeout=1):
return
return True
except (ConnectionRefusedError, TimeoutError, OSError):
time.sleep(1)
raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
return False


@pytest.fixture(scope="session")
def fluss_cluster():
"""Start a Fluss cluster using testcontainers, or use an existing one."""
if BOOTSTRAP_SERVERS_ENV:
yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
def _all_ports_ready(timeout=60):
"""Wait for all cluster ports to become available."""
deadline = time.time() + timeout
for port in ALL_PORTS:
remaining = deadline - time.time()
if remaining <= 0 or not _wait_for_port("localhost", port, timeout=remaining):
return False
return True


def _run_cmd(cmd):
"""Run a command (list form), return exit code."""
return subprocess.run(cmd, capture_output=True).returncode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if run fails, we will only see error code, but not the error message now, not sure if it will hinder debugging if needed?

Copy link
Contributor Author

@fresh-borzoni fresh-borzoni Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now it's used for docker network call only, so not much to debug, but valid observation if used for more stuff later 👍



def _start_cluster():
"""Start the Fluss Docker cluster via testcontainers.

If another worker already started the cluster (detected via port check),
reuse it. If container creation fails (name conflict from a racing worker),
wait for the other worker's cluster to become ready.
"""
# Reuse cluster started by another parallel worker or previous run.
if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am curious if 1s too short?

Copy link
Contributor Author

@fresh-borzoni fresh-borzoni Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a fast check, so we are just quickly checking if there is warm cluster running already, usual integration CI/CD would always have this fast path failed.

For local dev if i wish to quickly iterate on tests, i just comment out cleanup and the next run would reuse existing cluster and run superfast, as the cluster is already warm. So I left this hook if someone wishes to use it as well.

One improvement: we may use some env var and wire it in a such way that we can specify to retain cluster and skip cleanup. But it's followup, the main focus was integration tests.

print("Reusing existing cluster via port check.")
return

from testcontainers.core.container import DockerContainer
from testcontainers.core.network import Network

network = Network()
network.create()
print("Starting Fluss cluster via testcontainers...")

zookeeper = (
DockerContainer("zookeeper:3.9.2")
.with_network(network)
.with_name("zookeeper-python-test")
)
# Create a named network via Docker CLI (idempotent, avoids orphaned
# random-named networks when multiple xdist workers race).
_run_cmd(["docker", "network", "create", NETWORK_NAME])

sasl_jaas = (
"org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required"
' user_admin="admin-secret" user_alice="alice-secret";'
)
coordinator_props = "\n".join([
"zookeeper.address: zookeeper-python-test:2181",
"bind.listeners: INTERNAL://coordinator-server-python-test:0,"
" CLIENT://coordinator-server-python-test:9123,"
" PLAIN_CLIENT://coordinator-server-python-test:9223",
f"zookeeper.address: {ZOOKEEPER_NAME}:2181",
f"bind.listeners: INTERNAL://{COORDINATOR_NAME}:0,"
f" CLIENT://{COORDINATOR_NAME}:9123,"
f" PLAIN_CLIENT://{COORDINATOR_NAME}:9223",
"advertised.listeners: CLIENT://localhost:9123,"
" PLAIN_CLIENT://localhost:9223",
"internal.listener.name: INTERNAL",
Expand All @@ -87,21 +130,11 @@ def fluss_cluster():
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])
coordinator = (
DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
.with_network(network)
.with_name("coordinator-server-python-test")
.with_bind_ports(9123, 9123)
.with_bind_ports(9223, 9223)
.with_command("coordinatorServer")
.with_env("FLUSS_PROPERTIES", coordinator_props)
)

tablet_props = "\n".join([
"zookeeper.address: zookeeper-python-test:2181",
"bind.listeners: INTERNAL://tablet-server-python-test:0,"
" CLIENT://tablet-server-python-test:9123,"
" PLAIN_CLIENT://tablet-server-python-test:9223",
f"zookeeper.address: {ZOOKEEPER_NAME}:2181",
f"bind.listeners: INTERNAL://{TABLET_SERVER_NAME}:0,"
f" CLIENT://{TABLET_SERVER_NAME}:9123,"
f" PLAIN_CLIENT://{TABLET_SERVER_NAME}:9223",
"advertised.listeners: CLIENT://localhost:9124,"
" PLAIN_CLIENT://localhost:9224",
"internal.listener.name: INTERNAL",
Expand All @@ -112,42 +145,121 @@ def fluss_cluster():
"netty.server.num-network-threads: 1",
"netty.server.num-worker-threads: 3",
])

zookeeper = (
DockerContainer("zookeeper:3.9.2")
.with_kwargs(network=NETWORK_NAME)
.with_name(ZOOKEEPER_NAME)
)
coordinator = (
DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
.with_kwargs(network=NETWORK_NAME)
.with_name(COORDINATOR_NAME)
.with_bind_ports(9123, 9123)
.with_bind_ports(9223, 9223)
.with_command("coordinatorServer")
.with_env("FLUSS_PROPERTIES", coordinator_props)
)
tablet_server = (
DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}")
.with_network(network)
.with_name("tablet-server-python-test")
.with_kwargs(network=NETWORK_NAME)
.with_name(TABLET_SERVER_NAME)
.with_bind_ports(9123, 9124)
.with_bind_ports(9223, 9224)
.with_command("tabletServer")
.with_env("FLUSS_PROPERTIES", tablet_props)
)

zookeeper.start()
coordinator.start()
tablet_server.start()
try:
zookeeper.start()
coordinator.start()
tablet_server.start()
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit broad here, maybe consider narrowing to docker specific errors?

btw IIUC, we want to start fluss clsuter exactly once per test session
according to pytest-xdist: https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once , they recommend to use file lock to coordinate startup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call 👍
file lock approcach is cleaner, but it requires more restructuring, so I just used the same approach as in cpp for consistency.

Let's create follow up if someone wishes to improve this and play with XDIST.
I don't expect tests time to change though :)

# Another worker may have started containers with the same names.
# Wait for the cluster to become ready instead of failing.
print(f"Container start failed ({e}), waiting for cluster from another worker...")
if _all_ports_ready():
return
raise

_wait_for_port("localhost", 9123)
_wait_for_port("localhost", 9124)
_wait_for_port("localhost", 9223)
_wait_for_port("localhost", 9224)
# Extra wait for cluster to fully initialize
time.sleep(10)
if not _all_ports_ready():
raise RuntimeError("Cluster listeners did not become ready")

# (plaintext_bootstrap, sasl_bootstrap)
yield ("127.0.0.1:9223", "127.0.0.1:9123")
print("Fluss cluster started successfully.")


def _stop_cluster():
"""Stop and remove the Fluss Docker cluster containers."""
for name in [TABLET_SERVER_NAME, COORDINATOR_NAME, ZOOKEEPER_NAME]:
subprocess.run(["docker", "rm", "-f", name], capture_output=True)
subprocess.run(["docker", "network", "rm", NETWORK_NAME], capture_output=True)


async def _connect_with_retry(bootstrap_servers, timeout=60):
"""Connect to the Fluss cluster with retries until it's fully ready.

Waits until both the coordinator and at least one tablet server are
available, matching the Rust wait_for_cluster_ready pattern.
"""
config = fluss.Config({"bootstrap.servers": bootstrap_servers})
start = time.time()
last_err = None
while time.time() - start < timeout:
conn = None
try:
conn = await fluss.FlussConnection.create(config)
admin = await conn.get_admin()
nodes = await admin.get_server_nodes()
if any(n.server_type == "TabletServer" for n in nodes):
return conn
last_err = RuntimeError("No TabletServer available yet")
except Exception as e:
last_err = e
if conn is not None:
conn.close()
await asyncio.sleep(1)
raise RuntimeError(
f"Could not connect to cluster after {timeout}s: {last_err}"
)

tablet_server.stop()
coordinator.stop()
zookeeper.stop()
network.remove()

def pytest_unconfigure(config):
"""Clean up Docker containers after all xdist workers finish.

Runs once on the controller process (or the single process when
not using xdist). Workers are identified by the 'workerinput' attr.
"""
if BOOTSTRAP_SERVERS_ENV:
return
if hasattr(config, "workerinput"):
return # This is a worker, skip
_stop_cluster()


@pytest.fixture(scope="session")
def fluss_cluster():
"""Start a Fluss cluster using testcontainers, or use an existing one."""
if BOOTSTRAP_SERVERS_ENV:
sasl_env = os.environ.get(
"FLUSS_SASL_BOOTSTRAP_SERVERS", BOOTSTRAP_SERVERS_ENV
)
yield (BOOTSTRAP_SERVERS_ENV, sasl_env)
return

_start_cluster()

# (plaintext_bootstrap, sasl_bootstrap)
yield (
f"127.0.0.1:{PLAIN_CLIENT_PORT}",
f"127.0.0.1:{COORDINATOR_PORT}",
)


@pytest_asyncio.fixture(scope="session")
async def connection(fluss_cluster):
"""Session-scoped connection to the Fluss cluster (plaintext)."""
plaintext_addr, _sasl_addr = fluss_cluster
config = fluss.Config({"bootstrap.servers": plaintext_addr})
conn = await fluss.FlussConnection.create(config)
conn = await _connect_with_retry(plaintext_addr)
yield conn
conn.close()

Expand Down
Loading