Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions vdb_benchmark/tests/tests/test_issue_543_csv_coercion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Issue #543: simple_bench.calculate_statistics() must not crash on malformed
per-process CSV rows.
Reporter scenario: 16 worker processes ran a 60s benchmark, finished writing
their per-process ``milvus_benchmark_p*.csv`` files, and statistics
calculation crashed with::
TypeError: unsupported operand type(s) for +: 'float' and 'str'
at (all_data["timestamp"] + all_data["batch_time_seconds"]).max()
The root cause was that a small number of rows in two of the CSVs had a
``batch_time_seconds`` value of the string ``True`` instead of a float
duration — once any value in that column is non-numeric, pandas falls back
to ``object`` dtype for the whole column and the arithmetic on line 1042
explodes.
The fix is reader-side defense in ``calculate_statistics()``: coerce the
numeric columns with ``pd.to_numeric(..., errors='coerce')``, drop rows
that fail to coerce, log a clear diagnostic naming the dropped count and
the source files, and proceed with the valid rows. The benchmark run
should succeed as long as enough good rows remain.
"""

from __future__ import annotations

import csv
import sys
from pathlib import Path

import pytest

# Allow ``from vdbbench...`` from the source tree without an editable install.
sys.path.insert(0, str(Path(__file__).resolve().parents[3] / "vdb_benchmark"))

from vdbbench.simple_bench import calculate_statistics, csv_fields # noqa: E402


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _write_csv(path: Path, rows):
"""Write a per-process CSV using the production header order."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(fh, fieldnames=csv_fields)
writer.writeheader()
for row in rows:
writer.writerow(row)


def _good_row(process_id, batch_id, timestamp, batch_size=100,
batch_time=0.5, avg_query=0.005, success=True):
return {
"process_id": process_id,
"batch_id": batch_id,
"timestamp": timestamp,
"batch_size": batch_size,
"batch_time_seconds": batch_time,
"avg_query_time_seconds": avg_query,
"success": success,
}


# Exact shape the reporter observed in milvus_benchmark_p2.csv and p8.csv:
# process_id empty, batch_id=100, the trailing two numeric fields and
# 'success' empty, batch_time_seconds set to the string 'True'.
_REPORTER_MALFORMED_ROW = {
"process_id": "",
"batch_id": 100,
"timestamp": 0.43958,
"batch_size": 0.004396,
"batch_time_seconds": True,
"avg_query_time_seconds": "",
"success": "",
}


# ---------------------------------------------------------------------------
# Reproducer: the exact shape from issue #543
# ---------------------------------------------------------------------------


def test_calculate_statistics_recovers_from_reporter_malformed_row(tmp_path):
"""The reporter's exact CSV shape must not crash and must return stats.
Two worker CSVs (p2 and p8) each contain one malformed row matching the
issue body, surrounded by good rows. The other workers' CSVs are clean.
"""
# 14 clean CSVs
for pid in [0, 1, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15]:
_write_csv(
tmp_path / f"milvus_benchmark_p{pid}.csv",
[_good_row(pid, b, b * 0.5) for b in range(1, 11)],
)

# 2 CSVs with one malformed row each (exact reporter shape)
for pid in [2, 8]:
rows = [_good_row(pid, b, b * 0.5) for b in range(1, 11)]
rows.insert(5, _REPORTER_MALFORMED_ROW)
_write_csv(tmp_path / f"milvus_benchmark_p{pid}.csv", rows)

# Must not raise
stats = calculate_statistics(str(tmp_path))

# No 'error' key — calculation succeeded
assert "error" not in stats, (
f"calculate_statistics returned an error: {stats!r}"
)

# Stats should reflect ~16 * 10 = 160 good rows (2 malformed dropped)
assert stats.get("total_queries", 0) > 0
assert stats.get("total_time_seconds", 0) > 0


# ---------------------------------------------------------------------------
# Per-malformation coverage — any one bad numeric field should not crash
# ---------------------------------------------------------------------------


@pytest.mark.parametrize("bad_field,bad_value", [
("timestamp", "True"),
("batch_time_seconds", "True"),
("batch_time_seconds", "not-a-number"),
("batch_size", "True"),
("avg_query_time_seconds", "True"),
])
def test_calculate_statistics_drops_rows_with_non_numeric_values(
tmp_path, bad_field, bad_value
):
"""A single non-numeric value in any numeric column must not crash."""
good = [_good_row(0, b, b * 0.5) for b in range(1, 11)]
malformed = _good_row(0, 99, 99.0)
malformed[bad_field] = bad_value
good.insert(5, malformed)

_write_csv(tmp_path / "milvus_benchmark_p0.csv", good)

stats = calculate_statistics(str(tmp_path))
assert "error" not in stats
assert stats.get("total_queries", 0) > 0


# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------


def test_calculate_statistics_returns_error_when_every_row_is_malformed(tmp_path):
"""If no good rows remain, return a clear error rather than crash."""
bad_rows = [_REPORTER_MALFORMED_ROW for _ in range(10)]
_write_csv(tmp_path / "milvus_benchmark_p0.csv", bad_rows)

stats = calculate_statistics(str(tmp_path))
# Should return an error dict, not raise
assert "error" in stats


def test_calculate_statistics_unchanged_for_clean_csvs(tmp_path):
"""Sanity: clean CSVs still produce a complete stats dict."""
for pid in range(4):
_write_csv(
tmp_path / f"milvus_benchmark_p{pid}.csv",
[_good_row(pid, b, b * 0.5) for b in range(1, 11)],
)

stats = calculate_statistics(str(tmp_path))
assert "error" not in stats
# Pre-fix code already produced these on clean input.
assert stats["total_queries"] > 0
assert stats["total_time_seconds"] > 0
37 changes: 37 additions & 0 deletions vdb_benchmark/vdbbench/simple_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,9 @@ def calculate_statistics(
try:
df = pd.read_csv(file_path)
if not df.empty:
# Tag each row with its source file so the post-coerce
# diagnostic can name the affected workers (issue #543).
df["_source_file"] = file_path.name
dfs.append(df)
except Exception as exc:
print(f"Error reading result file {file_path}: {exc}")
Expand All @@ -1035,6 +1038,40 @@ def calculate_statistics(
return {"error": "No valid data found in benchmark result files"}

all_data = pd.concat(dfs, ignore_index=True)

# Issue #543: coerce numeric columns before any arithmetic. A single
# malformed row (e.g. batch_time_seconds=='True') makes pandas fall
# back to object dtype for the whole column, and `timestamp +
# batch_time_seconds` then raises TypeError after the timed work has
# already completed. Drop rows that fail coercion and continue with
# the rest — losing two rows out of a million is preferable to
# losing the entire benchmark result.
_numeric_cols = (
"timestamp",
"batch_size",
"batch_time_seconds",
"avg_query_time_seconds",
)
for col in _numeric_cols:
all_data[col] = pd.to_numeric(all_data[col], errors="coerce")

bad_mask = all_data[list(_numeric_cols)].isna().any(axis=1)
if bad_mask.any():
bad_count = int(bad_mask.sum())
bad_files = sorted(all_data.loc[bad_mask, "_source_file"].unique())
print(
f"Warning: dropping {bad_count} row(s) with non-numeric values "
f"in required columns. Affected file(s): {', '.join(bad_files)}. "
"See mlcommons/storage#543.",
flush=True,
)
all_data = all_data.loc[~bad_mask].reset_index(drop=True)

all_data = all_data.drop(columns=["_source_file"])

if all_data.empty:
return {"error": "No valid data found in benchmark result files"}

all_data.sort_values("timestamp", inplace=True)

file_start_time = float(all_data["timestamp"].min())
Expand Down
Loading