From 267d77fe08c024ef8ff4fe58c08a41455ec90d92 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Fri, 26 Jun 2026 16:41:07 -0700 Subject: [PATCH] fix(#543): coerce simple_bench CSV numeric columns; drop bad rows instead of crashing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `vdbbench.simple_bench.calculate_statistics()` was crashing after the timed VectorDB benchmark had already completed: TypeError: unsupported operand type(s) for +: 'float' and 'str' at (all_data["timestamp"] + all_data["batch_time_seconds"]).max() The reporter (issue #543) diagnosed the root cause: a small number of rows in two per-process CSVs (`milvus_benchmark_p2.csv`, `milvus_benchmark_p8.csv`) carried a `batch_time_seconds` value of the string `True` instead of a float duration. Once any value in a numeric column is non-numeric, pandas demotes the whole column to object dtype, so the arithmetic on `(timestamp + batch_time_seconds).max()` fails with TypeError. The 60s benchmark completes, ground-truth runs, all 16 worker processes write their CSVs — then the wrapper marks the whole run as failed during the post-run statistics computation. This commit hardens the reader. Writer-side root-causing (why a row occasionally has shifted/missing fields) is left as follow-up; the reader-side defense is enough to recover the benchmark result so long as the malformed rows are a small fraction of the total. Fix: * Tag every loaded row with its source CSV name so the diagnostic can identify affected workers. * Coerce `timestamp`, `batch_size`, `batch_time_seconds`, `avg_query_time_seconds` with `pd.to_numeric(..., errors="coerce")` before any arithmetic. * Identify rows where any required numeric column became NaN, log a clear warning naming the dropped count and affected file(s), then drop those rows. * Continue with the remaining good rows. If every row was bad, fall through to the existing "No valid data found" error rather than raising. Tests (`vdb_benchmark/tests/tests/test_issue_543_csv_coercion.py`): * Reproduces the reporter's exact 16-worker scenario (14 clean CSVs, 2 with one malformed row each in the exact shape from the issue body) and asserts `calculate_statistics` returns valid stats. * Parametrized: any one non-numeric value in any of the four numeric columns must not crash. * Edge case: all rows malformed returns the existing error key, not a raise. * Sanity: clean CSVs still produce a complete stats dict (no regression of the happy path). --- .../tests/test_issue_543_csv_coercion.py | 173 ++++++++++++++++++ vdb_benchmark/vdbbench/simple_bench.py | 37 ++++ 2 files changed, 210 insertions(+) create mode 100644 vdb_benchmark/tests/tests/test_issue_543_csv_coercion.py diff --git a/vdb_benchmark/tests/tests/test_issue_543_csv_coercion.py b/vdb_benchmark/tests/tests/test_issue_543_csv_coercion.py new file mode 100644 index 00000000..42f85f9c --- /dev/null +++ b/vdb_benchmark/tests/tests/test_issue_543_csv_coercion.py @@ -0,0 +1,173 @@ +""" +Issue #543: simple_bench.calculate_statistics() must not crash on malformed +per-process CSV rows. + +Reporter scenario: 16 worker processes ran a 60s benchmark, finished writing +their per-process ``milvus_benchmark_p*.csv`` files, and statistics +calculation crashed with:: + + TypeError: unsupported operand type(s) for +: 'float' and 'str' + at (all_data["timestamp"] + all_data["batch_time_seconds"]).max() + +The root cause was that a small number of rows in two of the CSVs had a +``batch_time_seconds`` value of the string ``True`` instead of a float +duration — once any value in that column is non-numeric, pandas falls back +to ``object`` dtype for the whole column and the arithmetic on line 1042 +explodes. + +The fix is reader-side defense in ``calculate_statistics()``: coerce the +numeric columns with ``pd.to_numeric(..., errors='coerce')``, drop rows +that fail to coerce, log a clear diagnostic naming the dropped count and +the source files, and proceed with the valid rows. The benchmark run +should succeed as long as enough good rows remain. +""" + +from __future__ import annotations + +import csv +import sys +from pathlib import Path + +import pytest + +# Allow ``from vdbbench...`` from the source tree without an editable install. +sys.path.insert(0, str(Path(__file__).resolve().parents[3] / "vdb_benchmark")) + +from vdbbench.simple_bench import calculate_statistics, csv_fields # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _write_csv(path: Path, rows): + """Write a per-process CSV using the production header order.""" + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=csv_fields) + writer.writeheader() + for row in rows: + writer.writerow(row) + + +def _good_row(process_id, batch_id, timestamp, batch_size=100, + batch_time=0.5, avg_query=0.005, success=True): + return { + "process_id": process_id, + "batch_id": batch_id, + "timestamp": timestamp, + "batch_size": batch_size, + "batch_time_seconds": batch_time, + "avg_query_time_seconds": avg_query, + "success": success, + } + + +# Exact shape the reporter observed in milvus_benchmark_p2.csv and p8.csv: +# process_id empty, batch_id=100, the trailing two numeric fields and +# 'success' empty, batch_time_seconds set to the string 'True'. +_REPORTER_MALFORMED_ROW = { + "process_id": "", + "batch_id": 100, + "timestamp": 0.43958, + "batch_size": 0.004396, + "batch_time_seconds": True, + "avg_query_time_seconds": "", + "success": "", +} + + +# --------------------------------------------------------------------------- +# Reproducer: the exact shape from issue #543 +# --------------------------------------------------------------------------- + + +def test_calculate_statistics_recovers_from_reporter_malformed_row(tmp_path): + """The reporter's exact CSV shape must not crash and must return stats. + + Two worker CSVs (p2 and p8) each contain one malformed row matching the + issue body, surrounded by good rows. The other workers' CSVs are clean. + """ + # 14 clean CSVs + for pid in [0, 1, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15]: + _write_csv( + tmp_path / f"milvus_benchmark_p{pid}.csv", + [_good_row(pid, b, b * 0.5) for b in range(1, 11)], + ) + + # 2 CSVs with one malformed row each (exact reporter shape) + for pid in [2, 8]: + rows = [_good_row(pid, b, b * 0.5) for b in range(1, 11)] + rows.insert(5, _REPORTER_MALFORMED_ROW) + _write_csv(tmp_path / f"milvus_benchmark_p{pid}.csv", rows) + + # Must not raise + stats = calculate_statistics(str(tmp_path)) + + # No 'error' key — calculation succeeded + assert "error" not in stats, ( + f"calculate_statistics returned an error: {stats!r}" + ) + + # Stats should reflect ~16 * 10 = 160 good rows (2 malformed dropped) + assert stats.get("total_queries", 0) > 0 + assert stats.get("total_time_seconds", 0) > 0 + + +# --------------------------------------------------------------------------- +# Per-malformation coverage — any one bad numeric field should not crash +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("bad_field,bad_value", [ + ("timestamp", "True"), + ("batch_time_seconds", "True"), + ("batch_time_seconds", "not-a-number"), + ("batch_size", "True"), + ("avg_query_time_seconds", "True"), +]) +def test_calculate_statistics_drops_rows_with_non_numeric_values( + tmp_path, bad_field, bad_value +): + """A single non-numeric value in any numeric column must not crash.""" + good = [_good_row(0, b, b * 0.5) for b in range(1, 11)] + malformed = _good_row(0, 99, 99.0) + malformed[bad_field] = bad_value + good.insert(5, malformed) + + _write_csv(tmp_path / "milvus_benchmark_p0.csv", good) + + stats = calculate_statistics(str(tmp_path)) + assert "error" not in stats + assert stats.get("total_queries", 0) > 0 + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +def test_calculate_statistics_returns_error_when_every_row_is_malformed(tmp_path): + """If no good rows remain, return a clear error rather than crash.""" + bad_rows = [_REPORTER_MALFORMED_ROW for _ in range(10)] + _write_csv(tmp_path / "milvus_benchmark_p0.csv", bad_rows) + + stats = calculate_statistics(str(tmp_path)) + # Should return an error dict, not raise + assert "error" in stats + + +def test_calculate_statistics_unchanged_for_clean_csvs(tmp_path): + """Sanity: clean CSVs still produce a complete stats dict.""" + for pid in range(4): + _write_csv( + tmp_path / f"milvus_benchmark_p{pid}.csv", + [_good_row(pid, b, b * 0.5) for b in range(1, 11)], + ) + + stats = calculate_statistics(str(tmp_path)) + assert "error" not in stats + # Pre-fix code already produced these on clean input. + assert stats["total_queries"] > 0 + assert stats["total_time_seconds"] > 0 diff --git a/vdb_benchmark/vdbbench/simple_bench.py b/vdb_benchmark/vdbbench/simple_bench.py index 39c95600..c524a835 100644 --- a/vdb_benchmark/vdbbench/simple_bench.py +++ b/vdb_benchmark/vdbbench/simple_bench.py @@ -1027,6 +1027,9 @@ def calculate_statistics( try: df = pd.read_csv(file_path) if not df.empty: + # Tag each row with its source file so the post-coerce + # diagnostic can name the affected workers (issue #543). + df["_source_file"] = file_path.name dfs.append(df) except Exception as exc: print(f"Error reading result file {file_path}: {exc}") @@ -1035,6 +1038,40 @@ def calculate_statistics( return {"error": "No valid data found in benchmark result files"} all_data = pd.concat(dfs, ignore_index=True) + + # Issue #543: coerce numeric columns before any arithmetic. A single + # malformed row (e.g. batch_time_seconds=='True') makes pandas fall + # back to object dtype for the whole column, and `timestamp + + # batch_time_seconds` then raises TypeError after the timed work has + # already completed. Drop rows that fail coercion and continue with + # the rest — losing two rows out of a million is preferable to + # losing the entire benchmark result. + _numeric_cols = ( + "timestamp", + "batch_size", + "batch_time_seconds", + "avg_query_time_seconds", + ) + for col in _numeric_cols: + all_data[col] = pd.to_numeric(all_data[col], errors="coerce") + + bad_mask = all_data[list(_numeric_cols)].isna().any(axis=1) + if bad_mask.any(): + bad_count = int(bad_mask.sum()) + bad_files = sorted(all_data.loc[bad_mask, "_source_file"].unique()) + print( + f"Warning: dropping {bad_count} row(s) with non-numeric values " + f"in required columns. Affected file(s): {', '.join(bad_files)}. " + "See mlcommons/storage#543.", + flush=True, + ) + all_data = all_data.loc[~bad_mask].reset_index(drop=True) + + all_data = all_data.drop(columns=["_source_file"]) + + if all_data.empty: + return {"error": "No valid data found in benchmark result files"} + all_data.sort_values("timestamp", inplace=True) file_start_time = float(all_data["timestamp"].min())