Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/mcp/client/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ async def post_writer(endpoint_url: str):
)
response.raise_for_status()
logger.debug(f"Client message sent successfully: {response.status_code}")
except Exception: # pragma: lax no cover
except Exception as exc: # pragma: lax no cover
logger.exception("Error in post_writer")
await read_stream_writer.send(exc)
finally:
await write_stream.aclose()

Expand Down
3 changes: 2 additions & 1 deletion src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,9 @@ async def handle_request_async():
else:
await handle_request_async()

except Exception: # pragma: lax no cover
except Exception as exc: # pragma: lax no cover
logger.exception("Error in post_writer")
await read_stream_writer.send(exc)
finally:
await read_stream_writer.aclose()
await write_stream.aclose()
Expand Down
106 changes: 77 additions & 29 deletions tests/client/test_stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,64 @@
tee = shutil.which("tee")


async def _wait_for_file_to_exist(file_path: str, timeout: float = 5.0) -> None:
"""Wait for a file to exist and be non-empty.

Uses condition-based waiting instead of arbitrary sleep to eliminate
race conditions in tests that verify child processes have started
writing to marker files.

Args:
file_path: Path to the file to wait for
timeout: Maximum time to wait in seconds

Raises:
TimeoutError: If file doesn't exist or remains empty after timeout
"""
start_time = time.time()
poll_interval = 0.01 # Poll every 10ms

while time.time() - start_time < timeout:
if os.path.exists(file_path):
size = os.path.getsize(file_path)
if size > 0:
return
await anyio.sleep(poll_interval)

# Check one more time for better error message
if os.path.exists(file_path):
size = os.path.getsize(file_path)
raise TimeoutError(f"File {file_path} exists but is empty after {timeout}s (size={size})")
else:
raise TimeoutError(f"File {file_path} does not exist after {timeout}s")


@pytest.mark.anyio
async def test_wait_for_file_to_exist_timeout():
"""Test _wait_for_file_to_exist raises TimeoutError when file doesn't exist."""
nonexistent_file = "/tmp/nonexistent_file_test_1234567890"

with pytest.raises(TimeoutError, match="does not exist"):
with anyio.fail_after(1.0):
await _wait_for_file_to_exist(nonexistent_file, timeout=0.1)


@pytest.mark.anyio
async def test_wait_for_file_to_exist_empty_file():
"""Test _wait_for_file_to_exist raises TimeoutError when file exists but is empty."""
import tempfile

with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
empty_file = f.name

try:
with pytest.raises(TimeoutError, match="exists but is empty"):
with anyio.fail_after(1.0):
await _wait_for_file_to_exist(empty_file, timeout=0.1)
finally:
os.unlink(empty_file)


@pytest.mark.anyio
@pytest.mark.skipif(tee is None, reason="could not find tee command")
async def test_stdio_context_manager_exiting():
Expand Down Expand Up @@ -296,19 +354,15 @@ async def test_basic_child_process_cleanup(self):
# Start the parent process
proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script])

# Wait for processes to start
await anyio.sleep(0.5)

# Verify parent started
# Wait for parent to start (condition-based)
with anyio.fail_after(5.0):
await _wait_for_file_to_exist(parent_marker)
assert os.path.exists(parent_marker), "Parent process didn't start"

# Verify child is writing
if os.path.exists(marker_file): # pragma: no branch
initial_size = os.path.getsize(marker_file)
await anyio.sleep(0.3)
size_after_wait = os.path.getsize(marker_file)
assert size_after_wait > initial_size, "Child process should be writing"
print(f"Child is writing (file grew from {initial_size} to {size_after_wait} bytes)")
# Wait for child to start writing (condition-based instead of arbitrary 0.5s + 0.3s)
with anyio.fail_after(5.0):
await _wait_for_file_to_exist(marker_file)
print(f"Child is writing (file size: {os.path.getsize(marker_file)} bytes)")

# Terminate using our function
print("Terminating process and children...")
Expand Down Expand Up @@ -398,16 +452,15 @@ async def test_nested_process_tree(self):
# Start the parent process
proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script])

# Let all processes start
await anyio.sleep(1.0)

# Verify all are writing
# Wait for all processes to start (condition-based)
for file_path, name in [(parent_file, "parent"), (child_file, "child"), (grandchild_file, "grandchild")]:
if os.path.exists(file_path): # pragma: no branch
initial_size = os.path.getsize(file_path)
await anyio.sleep(0.3)
new_size = os.path.getsize(file_path)
assert new_size > initial_size, f"{name} process should be writing"
with anyio.fail_after(5.0):
await _wait_for_file_to_exist(file_path)

parent_size = os.path.getsize(parent_file)
child_size = os.path.getsize(child_file)
grandchild_size = os.path.getsize(grandchild_file)
print(f"parent={parent_size}, child={child_size}, grandchild={grandchild_size}")

# Terminate the whole tree
await _terminate_process_tree(proc)
Expand Down Expand Up @@ -477,15 +530,10 @@ def handle_term(sig, frame):
# Start the parent process
proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script])

# Let child start writing
await anyio.sleep(0.5)

# Verify child is writing
if os.path.exists(marker_file): # pragma: no branch
size1 = os.path.getsize(marker_file)
await anyio.sleep(0.3)
size2 = os.path.getsize(marker_file)
assert size2 > size1, "Child should be writing"
# Wait for child to start writing (condition-based)
with anyio.fail_after(5.0):
await _wait_for_file_to_exist(marker_file)
print(f"Child is writing (file size: {os.path.getsize(marker_file)} bytes)")

# Terminate - this will kill the process group even if parent exits first
await _terminate_process_tree(proc)
Expand Down