From d15cfa54aae083f86470689407e6599174a0326b Mon Sep 17 00:00:00 2001 From: Atul Nair Date: Wed, 24 Jun 2026 19:21:04 -0400 Subject: [PATCH 1/7] feat(gpu): add GPU polling loop, metrics, and threshold gates - gpu.py: parse_gpu_metrics, capture_gpu_metrics, _mean, agg_readings, poll_gpu_metrics - VllmJob.is_client_done(): non-raising completion predicate - vllm_single test: poll GPU while client runs, write gpu_poll.log, derive 5 metrics - _shared.py: Peak VRAM / Compute % / BW % columns in results table - test_gpu.py: TestMean, TestAggReadings, TestPollGpuMetrics unit test classes - threshold JSON: gpu.* placeholder SLO entries for all 5 cells - test_vllm_orch_parse: update threshold path + exclude gpu.* from client key guard --- ...vllm-single_llama31-70b_fp8_threshold.json | 35 +- .../unittests/test_vllm_orch_parse.py | 8 +- cvs/lib/inference/vllm_single.py | 16 + cvs/lib/utils/gpu.py | 302 ++++++ cvs/lib/utils/unittests/test_gpu.py | 901 ++++++++++++++++++ cvs/tests/inference/vllm/_shared.py | 6 + cvs/tests/inference/vllm/vllm_single.py | 138 ++- 7 files changed, 1385 insertions(+), 21 deletions(-) create mode 100644 cvs/lib/utils/gpu.py create mode 100644 cvs/lib/utils/unittests/test_gpu.py diff --git a/cvs/input/config_file/inference/vllm_single/mi300x_vllm-single_llama31-70b_fp8_threshold.json b/cvs/input/config_file/inference/vllm_single/mi300x_vllm-single_llama31-70b_fp8_threshold.json index c95c5353..d29d35a2 100644 --- a/cvs/input/config_file/inference/vllm_single/mi300x_vllm-single_llama31-70b_fp8_threshold.json +++ b/cvs/input/config_file/inference/vllm_single/mi300x_vllm-single_llama31-70b_fp8_threshold.json @@ -23,7 +23,12 @@ "client.p95_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.p99_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.success_rate": { "kind": "min", "value": 0 }, - "client.failed": { "kind": "max", "value": 1000000000 } + "client.failed": { "kind": "max", "value": 1000000000 }, + "gpu.peak_gpu_memory_mb": { "kind": "max", "value": 200000 }, + "gpu.model_load_memory_mb": { "kind": "max", "value": 180000 }, + "gpu.model_load_s": { "kind": "max", "value": 300 }, + "gpu.gpu_compute_util_pct": { "kind": "min", "value": 70 }, + "gpu.gpu_bandwidth_util_pct": { "kind": "min", "value": 60 } }, "ISL=8000,OSL=1000,TP=8,CONC=16": { "client.total_token_throughput": { "kind": "min_tok_s", "value": 0 }, @@ -48,7 +53,12 @@ "client.p95_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.p99_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.success_rate": { "kind": "min", "value": 0 }, - "client.failed": { "kind": "max", "value": 1000000000 } + "client.failed": { "kind": "max", "value": 1000000000 }, + "gpu.peak_gpu_memory_mb": { "kind": "max", "value": 200000 }, + "gpu.model_load_memory_mb": { "kind": "max", "value": 180000 }, + "gpu.model_load_s": { "kind": "max", "value": 300 }, + "gpu.gpu_compute_util_pct": { "kind": "min", "value": 70 }, + "gpu.gpu_bandwidth_util_pct": { "kind": "min", "value": 60 } }, "ISL=1000,OSL=8000,TP=8,CONC=16": { "client.total_token_throughput": { "kind": "min_tok_s", "value": 0 }, @@ -73,7 +83,12 @@ "client.p95_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.p99_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.success_rate": { "kind": "min", "value": 0 }, - "client.failed": { "kind": "max", "value": 1000000000 } + "client.failed": { "kind": "max", "value": 1000000000 }, + "gpu.peak_gpu_memory_mb": { "kind": "max", "value": 200000 }, + "gpu.model_load_memory_mb": { "kind": "max", "value": 180000 }, + "gpu.model_load_s": { "kind": "max", "value": 300 }, + "gpu.gpu_compute_util_pct": { "kind": "min", "value": 70 }, + "gpu.gpu_bandwidth_util_pct": { "kind": "min", "value": 60 } }, "ISL=1000,OSL=4000,TP=8,CONC=16": { "client.total_token_throughput": { "kind": "min_tok_s", "value": 0 }, @@ -98,7 +113,12 @@ "client.p95_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.p99_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.success_rate": { "kind": "min", "value": 0 }, - "client.failed": { "kind": "max", "value": 1000000000 } + "client.failed": { "kind": "max", "value": 1000000000 }, + "gpu.peak_gpu_memory_mb": { "kind": "max", "value": 200000 }, + "gpu.model_load_memory_mb": { "kind": "max", "value": 180000 }, + "gpu.model_load_s": { "kind": "max", "value": 300 }, + "gpu.gpu_compute_util_pct": { "kind": "min", "value": 70 }, + "gpu.gpu_bandwidth_util_pct": { "kind": "min", "value": 60 } }, "ISL=5000,OSL=1024,TP=8,CONC=16": { "client.total_token_throughput": { "kind": "min_tok_s", "value": 0 }, @@ -123,6 +143,11 @@ "client.p95_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.p99_e2el_ms": { "kind": "max_ms", "value": 1000000 }, "client.success_rate": { "kind": "min", "value": 0 }, - "client.failed": { "kind": "max", "value": 1000000000 } + "client.failed": { "kind": "max", "value": 1000000000 }, + "gpu.peak_gpu_memory_mb": { "kind": "max", "value": 200000 }, + "gpu.model_load_memory_mb": { "kind": "max", "value": 180000 }, + "gpu.model_load_s": { "kind": "max", "value": 300 }, + "gpu.gpu_compute_util_pct": { "kind": "min", "value": 70 }, + "gpu.gpu_bandwidth_util_pct": { "kind": "min", "value": 60 } } } diff --git a/cvs/lib/inference/unittests/test_vllm_orch_parse.py b/cvs/lib/inference/unittests/test_vllm_orch_parse.py index 95d6b42e..79a373c1 100644 --- a/cvs/lib/inference/unittests/test_vllm_orch_parse.py +++ b/cvs/lib/inference/unittests/test_vllm_orch_parse.py @@ -20,7 +20,7 @@ _FIXTURES = _HERE / "fixtures" _REPO = _HERE.parents[3] # cvs/lib/inference/unittests -> repo root _SHARED = _REPO / "cvs/tests/inference/vllm/_shared.py" -_THRESHOLD = _REPO / "cvs/input/config_file/inference/vllm_single/w1_llama31_70b_fp8kv/llama31_70b_fp8_threshold.json" +_THRESHOLD = _REPO / "cvs/input/config_file/inference/vllm_single/mi300x_vllm-single_llama31-70b_fp8_threshold.json" # isl/tp used to build the job; must match the fixture's run for the derived # math assertions to be meaningful (real artifact: isl=128, tp=8). @@ -190,7 +190,11 @@ def test_threshold_keys_are_produced(self): continue threshold_metric_keys.update(metrics.keys()) self.assertTrue(threshold_metric_keys, "no threshold metric keys found") - missing = threshold_metric_keys - self._produced + # gpu.* keys are injected by the test fixture (test_vllm_inference) after + # parse_results() returns; they are NOT part of the parse_results contract. + # Exclude them from this check so the guard stays focused on client.* metrics. + client_threshold_keys = {k for k in threshold_metric_keys if not k.startswith("gpu.")} + missing = client_threshold_keys - self._produced self.assertEqual(missing, set(), f"threshold asserts keys parse_results never emits: {missing}") diff --git a/cvs/lib/inference/vllm_single.py b/cvs/lib/inference/vllm_single.py index 78274b32..23e0850d 100644 --- a/cvs/lib/inference/vllm_single.py +++ b/cvs/lib/inference/vllm_single.py @@ -360,6 +360,22 @@ def run_client(self): client_cmd = f"source /tmp/server_env_script.sh && {bench_cmd} > {shlex.quote(self.client_log)} 2>&1 &" self.orch.exec("bash -c " + shlex.quote(client_cmd)) + def is_client_done(self) -> bool: + """Non-raising predicate: True if the client has finished (success or crash).""" + try: + out = self.orch.exec(f"cat {self.client_log}") + for _host, text in out.items(): + txt = text or "" + if ( + self.COMPLETION_RE.search(txt) + or self.CLIENT_CRASH_RE.search(txt) + or self.CLIENT_LAUNCH_FAIL_RE.search(txt) + ): + return True + return False + except Exception: + return False + def wait_client_complete(self): log.info("client initial wait %ds", self._client_initial_wait) time.sleep(self._client_initial_wait) diff --git a/cvs/lib/utils/gpu.py b/cvs/lib/utils/gpu.py new file mode 100644 index 00000000..fe5d135d --- /dev/null +++ b/cvs/lib/utils/gpu.py @@ -0,0 +1,302 @@ +'''Copyright 2025 Advanced Micro Devices, Inc. +All rights reserved. +''' + +from __future__ import annotations + +import json + +# Human-readable derived metrics exposed as HTML rows (one row per entry per cell). +# These are computed in vllm_single.py from the raw amd-smi snapshots and stored +# under "gpu." keys in inf_res_dict. +GPU_METRICS: list[tuple[str, str]] = [ + ("peak_gpu_memory_mb", "MB"), + ("model_load_memory_mb", "MB"), + ("model_load_s", "s"), + ("gpu_bandwidth_util_pct", "%"), + ("gpu_compute_util_pct", "%"), +] +GPU_METRIC_UNITS: dict[str, str] = {k: u for k, u in GPU_METRICS} + +# Raw amd-smi field keys emitted by parse_gpu_metrics(). Not used as test rows. +_RAW_GPU_FIELDS: list[tuple[str, str]] = [ + ("gfx_activity", "%"), + ("umc_activity", "%"), + ("mm_activity", "%"), + ("total_vram", "MB"), + ("used_vram", "MB"), + ("free_vram", "MB"), + ("energy_j", "J"), +] +_RAW_GPU_FIELD_UNITS: dict[str, str] = {k: u for k, u in _RAW_GPU_FIELDS} + + +def _safe_get(d, *keys, default=None): + """Navigate nested dicts safely; return default on missing key or 'N/A' value.""" + cur = d + for key in keys: + if not isinstance(cur, dict): + return default + cur = cur.get(key, default) + if cur is default: + return default + if cur == "N/A": + return default + return cur + + +def parse_usage(gpu_entry: dict) -> dict: + """Extract activity metrics from one GPU entry dict. + + Returns: {"gpu.gfx_activity", "gpu.umc_activity", "gpu.mm_activity"} + Values are int or None; never raises. + """ + fields = ("gfx_activity", "umc_activity", "mm_activity") + result = {} + for field in fields: + val = _safe_get(gpu_entry, "usage", field, "value") + result[f"gpu.{field}"] = val + return result + + +def parse_mem_usage(gpu_entry: dict) -> dict: + """Extract memory usage metrics from one GPU entry dict. + + Returns: {"gpu.total_vram", "gpu.used_vram", "gpu.free_vram"} + Values are int or None; never raises. + """ + fields = ("total_vram", "used_vram", "free_vram") + result = {} + for field in fields: + val = _safe_get(gpu_entry, "mem_usage", field, "value") + result[f"gpu.{field}"] = val + return result + + +def parse_energy(gpu_entry: dict) -> dict: + """Extract energy consumption from one GPU entry dict. + + Returns: {"gpu.energy_j"} + Value is float or None; never raises. + """ + val = _safe_get(gpu_entry, "energy", "total_energy_consumption", "value") + if val is not None: + val = float(val) + return {"gpu.energy_j": val} + + +def parse_gpu_metrics(raw: list) -> dict: + """Aggregate all GPU entries from one host's amd-smi --json output. + + raw: the parsed JSON list (one dict per GPU per host). + Activity metrics (%) -> averaged across GPUs (only non-None values counted). + Memory / energy metrics -> summed across GPUs (only non-None values counted). + Empty/missing -> all None. + """ + all_none = {f"gpu.{k}": None for k, _u in _RAW_GPU_FIELDS} + if not raw: + return all_none + + activity_keys = ("gpu.gfx_activity", "gpu.umc_activity", "gpu.mm_activity") + vram_keys = ("gpu.total_vram", "gpu.used_vram", "gpu.free_vram") + energy_key = "gpu.energy_j" + + # Accumulators: sum and count per field (None excluded from both) + activity_sums: dict[str, float] = {k: 0.0 for k in activity_keys} + activity_counts: dict[str, int] = {k: 0 for k in activity_keys} + vram_sums: dict[str, int | None] = {k: None for k in vram_keys} + energy_sum: float | None = None + + for entry in raw: + usage = parse_usage(entry) + mem = parse_mem_usage(entry) + eng = parse_energy(entry) + + for key in activity_keys: + val = usage[key] + if val is not None: + activity_sums[key] += val + activity_counts[key] += 1 + + for key in vram_keys: + val = mem[key] + if val is not None: + if vram_sums[key] is None: + vram_sums[key] = val + else: + vram_sums[key] += val + + e = eng[energy_key] + if e is not None: + if energy_sum is None: + energy_sum = e + else: + energy_sum += e + + result = {} + for key in activity_keys: + count = activity_counts[key] + result[key] = (activity_sums[key] / count) if count > 0 else None + + for key in vram_keys: + result[key] = vram_sums[key] + + result[energy_key] = energy_sum + return result + + +def _try_parse(text: str) -> list: + """Parse JSON text; return [] on empty/None/invalid JSON or non-list result. + + Accepts both bare-list format and the {"gpu_data": [...]} envelope that + amd-smi metric --json emits on ROCm 6.x nodes. + """ + if not text: + return [] + try: + parsed = json.loads(text) + except (json.JSONDecodeError, ValueError, TypeError): + return [] + if isinstance(parsed, dict): + parsed = parsed.get("gpu_data", []) + if not isinstance(parsed, list): + return [] + return parsed + + +def capture_gpu_metrics(orch) -> dict: + """One amd-smi exec via orch.exec(). Returns flat {gpu.* metrics} dict. + + orch: ContainerOrchestrator (has .exec(cmd) -> {host: str}). + Degrades gracefully on empty/unparseable JSON per host (returns all-None dict). + """ + out = orch.exec("sudo amd-smi metric --json") + all_entries = [] + for _host, text in out.items(): + all_entries.extend(_try_parse(text)) + return parse_gpu_metrics(all_entries) + + +def _mean(values: list) -> "float | None": + vals = [v for v in values if v is not None] + return sum(vals) / len(vals) if vals else None + + +def agg_readings(readings: list) -> dict: + """Aggregate poll readings into derived metrics. + Returns dict with peak_gpu_memory_mb, gpu_compute_util_pct, gpu_bandwidth_util_pct. + Any metric is None if no valid readings exist for it. + + Readings are raw snapshot dicts from capture_gpu_metrics (keys use gpu.* prefix). + """ + used_vrams = [r.get("gpu.used_vram") for r in readings if r.get("gpu.used_vram") is not None] + gfx_vals = [r.get("gpu.gfx_activity") for r in readings if r.get("gpu.gfx_activity") is not None] + umc_vals = [r.get("gpu.umc_activity") for r in readings if r.get("gpu.umc_activity") is not None] + return { + "peak_gpu_memory_mb": max(used_vrams) if used_vrams else None, + "gpu_compute_util_pct": _mean(gfx_vals), + "gpu_bandwidth_util_pct": _mean(umc_vals), + } + + +def poll_gpu_metrics( + orch, + is_done_fn, + poll_interval_s: float = 15, + label: str = "poll", + log_path=None, + max_consecutive_failures: int = 3, + model_load_s=None, + model_load_memory_mb=None, +) -> list: + """Poll GPU metrics while a vLLM client is running. + + Calls capture_gpu_metrics repeatedly until is_done_fn() returns True + or max_consecutive_failures consecutive exceptions are raised. + Returns list of raw snapshot dicts (failed polls excluded). + Never raises. Writes per-poll lines + summary to log_path if given. + """ + import time + import logging + + log = logging.getLogger(__name__) + readings: list = [] + log_lines: list = [] + poll_n = 0 + consecutive_failures = 0 + + while True: + poll_n += 1 + try: + snap = capture_gpu_metrics(orch) + consecutive_failures = 0 + readings.append(snap) + used = snap.get("gpu.used_vram") + gfx = snap.get("gpu.gfx_activity") + umc = snap.get("gpu.umc_activity") + mm = snap.get("gpu.mm_activity") + done = is_done_fn() + done_tag = " [done]" if done else "" + line = f"[gpu {label} {poll_n}/?] used_vram={used} MB gfx={gfx}% umc={umc}% mm={mm}%{done_tag}" + log_lines.append(line) + if done: + break + except Exception as exc: + consecutive_failures += 1 + line = ( + f"[gpu {label} {poll_n}/?] FAILED" + f" [{consecutive_failures}/{max_consecutive_failures} consecutive]:" + f" {type(exc).__name__}: {exc} (skipped)" + ) + log_lines.append(line) + if consecutive_failures >= max_consecutive_failures: + log.warning( + "poll_gpu_metrics: %d consecutive failures, stopping early", + consecutive_failures, + ) + break + + time.sleep(poll_interval_s) + + # Build summary + agg = agg_readings(readings) + n_failed = poll_n - len(readings) + failed_note = f" ({n_failed} failed, excluded)" if n_failed else "" + peak = agg.get("peak_gpu_memory_mb") + compute = agg.get("gpu_compute_util_pct") + bw = agg.get("gpu_bandwidth_util_pct") + ml_mem = f"{model_load_memory_mb:.0f}" if model_load_memory_mb is not None else "-" + ml_s = f"{model_load_s:.1f}" if model_load_s is not None else "-" + compute_s = f"{compute:.1f}" if compute is not None else "-" + bw_s = f"{bw:.1f}" if bw is not None else "-" + peak_s = f"{peak:.0f}" if peak is not None else "-" + + summary_lines = [ + "", + "--- summary ---", + f"samples: {poll_n}{failed_note}", + f"peak_gpu_memory_mb: {peak_s} MB", + f"model_load_memory_mb: {ml_mem} MB", + f"model_load_s: {ml_s} s", + f"gpu_compute_util_pct: {compute_s} %", + f"gpu_bandwidth_util_pct: {bw_s} %", + ] + log_lines.extend(summary_lines) + + if log_path is not None: + try: + import pathlib + + pathlib.Path(log_path).write_text("\n".join(log_lines) + "\n") + except Exception as exc: + log.warning("poll_gpu_metrics: failed to write log %s: %s", log_path, exc) + + log.info( + "poll_gpu_metrics: %d readings (%d failed) | peak_vram=%s MB compute=%s%% bw=%s%%", + len(readings), + n_failed, + peak_s, + compute_s, + bw_s, + ) + return readings diff --git a/cvs/lib/utils/unittests/test_gpu.py b/cvs/lib/utils/unittests/test_gpu.py new file mode 100644 index 00000000..ea31a9aa --- /dev/null +++ b/cvs/lib/utils/unittests/test_gpu.py @@ -0,0 +1,901 @@ +''' +Copyright 2025 Advanced Micro Devices, Inc. +All rights reserved. + +Unit tests for cvs.lib.utils.gpu. + +Black-box tests authored from the behavioral spec only (impl-blind). The module +contains pure parsers for `amd-smi metric --json` output: no I/O, no hardware, +pure dict transformations. + +Contract under test (from spec): + parse_usage(gpu_entry) -> {"gpu.gfx_activity", "gpu.umc_activity", + "gpu.mm_activity"}; int|None each. Degrades to + None for any missing key or "N/A" value; never raises. + parse_mem_usage(gpu_entry) -> {"gpu.total_vram", "gpu.used_vram", + "gpu.free_vram"}; int|None each. Degrades; never raises. + parse_energy(gpu_entry) -> {"gpu.energy_j"}; float|None. Degrades; never raises. + parse_gpu_metrics(raw) -> single dict with all 7 gpu.* keys. + activity fields averaged across GPUs; + vram + energy_j summed across GPUs. + [] -> all 7 keys present, all None. Never raises. + GPU_METRICS / GPU_METRIC_UNITS: every metric short_name has a matching unit; + parse_gpu_metrics([full]) emits "gpu." for every k. + +Framework: unittest.TestCase + self.subTest + unittest.mock (no pytest). +''' + +import unittest +from unittest.mock import MagicMock, patch + +from cvs.lib.utils.gpu import ( + GPU_METRICS, + GPU_METRIC_UNITS, + _RAW_GPU_FIELDS, + _RAW_GPU_FIELD_UNITS, + _mean, + agg_readings, + poll_gpu_metrics, + capture_gpu_metrics, + parse_usage, + parse_mem_usage, + parse_energy, + parse_gpu_metrics, +) + +# --------------------------------------------------------------------------- +# Shared fixtures — amd-smi JSON schema (one GPU entry) +# --------------------------------------------------------------------------- + +# The seven spec'd metrics, each as the bare "gpu." key produced by +# the parsers / aggregator. +ACTIVITY_KEYS = ["gpu.gfx_activity", "gpu.umc_activity", "gpu.mm_activity"] +VRAM_KEYS = ["gpu.total_vram", "gpu.used_vram", "gpu.free_vram"] +ENERGY_KEY = "gpu.energy_j" +ALL_KEYS = ACTIVITY_KEYS + VRAM_KEYS + [ENERGY_KEY] + + +def _full_gpu_entry(gfx=30, umc=20, mm=10, total=196608, used=4096, free=192512, energy=12345.5): + """A complete amd-smi entry for one GPU with all seven fields present.""" + return { + "usage": { + "gfx_activity": {"value": gfx}, + "umc_activity": {"value": umc}, + "mm_activity": {"value": mm}, + }, + "mem_usage": { + "total_vram": {"value": total}, + "used_vram": {"value": used}, + "free_vram": {"value": free}, + }, + "energy": { + "total_energy_consumption": {"value": energy}, + }, + } + + +# --------------------------------------------------------------------------- +# parse_usage — pure function (dict -> dict) +# --------------------------------------------------------------------------- + + +class TestParseUsage(unittest.TestCase): + """parse_usage extracts ["usage"]; degrades to None; never raises.""" + + def test_full_entry_extracts_all_three(self): + out = parse_usage(_full_gpu_entry(gfx=55, umc=44, mm=33)) + self.assertEqual( + out, + { + "gpu.gfx_activity": 55, + "gpu.umc_activity": 44, + "gpu.mm_activity": 33, + }, + ) + + def test_returns_exactly_the_three_activity_keys(self): + out = parse_usage(_full_gpu_entry()) + self.assertEqual(set(out.keys()), set(ACTIVITY_KEYS)) + + def test_value_types_are_int(self): + out = parse_usage(_full_gpu_entry(gfx=1, umc=2, mm=3)) + for k in ACTIVITY_KEYS: + with self.subTest(key=k): + self.assertIsInstance(out[k], int) + + def test_degradation_table(self): + """Each degraded-input shape maps every activity field to None. + + Boundary classes: empty dict, missing "usage", "N/A" string value. + """ + na = "N/A" + cases = [ + # (description, gpu_entry) + ("empty entry", {}), + ("missing usage key", {"mem_usage": {}}), + ( + "all fields N/A", + { + "usage": { + "gfx_activity": {"value": na}, + "umc_activity": {"value": na}, + "mm_activity": {"value": na}, + } + }, + ), + ] + expected = {k: None for k in ACTIVITY_KEYS} + for desc, entry in cases: + with self.subTest(case=desc): + self.assertEqual(parse_usage(entry), expected) + + def test_partial_entry_degrades_only_missing_field(self): + """One field missing/N/A -> None for that field; others extracted.""" + entry = { + "usage": { + "gfx_activity": {"value": 77}, + "umc_activity": {"value": "N/A"}, + # mm_activity entirely absent + } + } + out = parse_usage(entry) + self.assertEqual(out["gpu.gfx_activity"], 77) + self.assertIsNone(out["gpu.umc_activity"]) + self.assertIsNone(out["gpu.mm_activity"]) + + def test_zero_values_not_coerced_to_none(self): + """0 is a valid reading (fully idle GPU); must not degrade to None.""" + out = parse_usage(_full_gpu_entry(gfx=0, umc=0, mm=0)) + self.assertEqual(out["gpu.gfx_activity"], 0) + self.assertEqual(out["gpu.umc_activity"], 0) + self.assertEqual(out["gpu.mm_activity"], 0) + + def test_never_raises_on_malformed_shapes(self): + """Contract: degrades, never raises. Always returns all three keys as None.""" + malformed = [ + {}, + {"usage": {}}, + {"usage": {"gfx_activity": {}}}, + ] + for entry in malformed: + with self.subTest(entry=entry): + out = parse_usage(entry) + self.assertEqual(set(out.keys()), set(ACTIVITY_KEYS)) + for k in ACTIVITY_KEYS: + self.assertIsNone(out[k]) + + +# --------------------------------------------------------------------------- +# parse_mem_usage — pure function (dict -> dict) +# --------------------------------------------------------------------------- + + +class TestParseMemUsage(unittest.TestCase): + """parse_mem_usage extracts ["mem_usage"]; degrades; never raises.""" + + def test_full_entry_extracts_all_three(self): + out = parse_mem_usage(_full_gpu_entry(total=196608, used=4096, free=192512)) + self.assertEqual( + out, + { + "gpu.total_vram": 196608, + "gpu.used_vram": 4096, + "gpu.free_vram": 192512, + }, + ) + + def test_returns_exactly_the_three_vram_keys(self): + out = parse_mem_usage(_full_gpu_entry()) + self.assertEqual(set(out.keys()), set(VRAM_KEYS)) + + def test_value_types_are_int(self): + out = parse_mem_usage(_full_gpu_entry(total=10, used=3, free=7)) + for k in VRAM_KEYS: + with self.subTest(key=k): + self.assertIsInstance(out[k], int) + + def test_degradation_table(self): + na = "N/A" + cases = [ + ("empty entry", {}), + ("missing mem_usage", {"usage": {}}), + ( + "all N/A", + { + "mem_usage": { + "total_vram": {"value": na}, + "used_vram": {"value": na}, + "free_vram": {"value": na}, + } + }, + ), + ] + expected = {k: None for k in VRAM_KEYS} + for desc, entry in cases: + with self.subTest(case=desc): + self.assertEqual(parse_mem_usage(entry), expected) + + def test_partial_entry_degrades_only_missing_field(self): + entry = { + "mem_usage": { + "total_vram": {"value": 1000}, + "used_vram": {"value": "N/A"}, + # free_vram absent + } + } + out = parse_mem_usage(entry) + self.assertEqual(out["gpu.total_vram"], 1000) + self.assertIsNone(out["gpu.used_vram"]) + self.assertIsNone(out["gpu.free_vram"]) + + def test_zero_values_not_coerced_to_none(self): + """0 is a valid reading (idle GPU); must not degrade to None.""" + out = parse_mem_usage(_full_gpu_entry(total=0, used=0, free=0)) + self.assertEqual(out["gpu.total_vram"], 0) + self.assertEqual(out["gpu.used_vram"], 0) + self.assertEqual(out["gpu.free_vram"], 0) + + def test_never_raises_on_malformed_shapes(self): + malformed = [ + {}, + {"mem_usage": {}}, + {"mem_usage": {"used_vram": {}}}, + ] + for entry in malformed: + with self.subTest(entry=entry): + out = parse_mem_usage(entry) + self.assertEqual(set(out.keys()), set(VRAM_KEYS)) + for k in VRAM_KEYS: + self.assertIsNone(out[k]) + + +# --------------------------------------------------------------------------- +# parse_energy — pure function (dict -> dict) +# --------------------------------------------------------------------------- + + +class TestParseEnergy(unittest.TestCase): + """parse_energy extracts total_energy_consumption; degrades; never raises.""" + + def test_full_entry_extracts_energy(self): + out = parse_energy(_full_gpu_entry(energy=99999.25)) + self.assertEqual(out, {"gpu.energy_j": 99999.25}) + + def test_returns_exactly_the_energy_key(self): + out = parse_energy(_full_gpu_entry()) + self.assertEqual(set(out.keys()), {ENERGY_KEY}) + + def test_value_type_is_float(self): + out = parse_energy(_full_gpu_entry(energy=1.5)) + self.assertIsInstance(out[ENERGY_KEY], float) + + def test_degradation_table(self): + na = "N/A" + cases = [ + ("empty entry", {}), + ("missing energy", {"usage": {}}), + ("missing total_energy_consumption", {"energy": {}}), + ( + "N/A value", + {"energy": {"total_energy_consumption": {"value": na}}}, + ), + ] + for desc, entry in cases: + with self.subTest(case=desc): + self.assertEqual(parse_energy(entry), {ENERGY_KEY: None}) + + def test_never_raises_on_malformed_shapes(self): + malformed = [ + {}, + {"energy": {}}, + {"energy": {"total_energy_consumption": {}}}, + ] + for entry in malformed: + with self.subTest(entry=entry): + out = parse_energy(entry) + self.assertEqual(set(out.keys()), {ENERGY_KEY}) + self.assertIsNone(out[ENERGY_KEY]) + + def test_zero_energy_not_coerced_to_none(self): + """0.0 is a valid reading (GPU powered but idle); must not degrade to None.""" + out = parse_energy(_full_gpu_entry(energy=0.0)) + self.assertEqual(out[ENERGY_KEY], 0.0) + self.assertIsInstance(out[ENERGY_KEY], float) + + def test_int_energy_coerced_to_float(self): + """parse_energy must return float even when the raw value is a Python int.""" + out = parse_energy(_full_gpu_entry(energy=100)) + self.assertIsInstance(out[ENERGY_KEY], float) + + +# --------------------------------------------------------------------------- +# parse_gpu_metrics — pure aggregator (list -> dict) +# --------------------------------------------------------------------------- + + +class TestParseGpuMetrics(unittest.TestCase): + """Aggregates per-GPU entries: activity averaged, vram + energy summed.""" + + # --- key-presence contract --- + + def test_all_seven_keys_present_for_full_entry(self): + out = parse_gpu_metrics([_full_gpu_entry()]) + self.assertIsInstance(out, dict) + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + + def test_empty_list_yields_all_keys_none(self): + """[] -> all 7 keys present, every value None. Never raises.""" + out = parse_gpu_metrics([]) + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + for k in ALL_KEYS: + with self.subTest(key=k): + self.assertIsNone(out[k]) + + # --- single-GPU identity invariant --- + + def test_single_gpu_equals_that_gpus_values(self): + """Single GPU: averaged/summed result equals that GPU's values exactly.""" + entry = _full_gpu_entry(gfx=30, umc=20, mm=10, total=196608, used=4096, free=192512, energy=500.0) + out = parse_gpu_metrics([entry]) + self.assertEqual(out["gpu.gfx_activity"], 30) + self.assertEqual(out["gpu.umc_activity"], 20) + self.assertEqual(out["gpu.mm_activity"], 10) + self.assertEqual(out["gpu.total_vram"], 196608) + self.assertEqual(out["gpu.used_vram"], 4096) + self.assertEqual(out["gpu.free_vram"], 192512) + self.assertEqual(out["gpu.energy_j"], 500.0) + + # --- aggregation semantics: average vs sum --- + + def test_activity_fields_averaged_across_gpus(self): + """gfx/umc/mm averaged. Odd-sum pair verifies true division, not floor.""" + g0 = _full_gpu_entry(gfx=10, umc=40, mm=60) + g1 = _full_gpu_entry(gfx=21, umc=80, mm=20) + out = parse_gpu_metrics([g0, g1]) + self.assertEqual(out["gpu.gfx_activity"], 15.5) # (10+21)/2 — not 15 + self.assertEqual(out["gpu.umc_activity"], 60) # (40+80)/2 + self.assertEqual(out["gpu.mm_activity"], 40) # (60+20)/2 + + def test_vram_and_energy_summed_across_gpus(self): + """total/used/free_vram and energy_j summed across GPUs.""" + g0 = _full_gpu_entry(total=100, used=30, free=70, energy=1.5) + g1 = _full_gpu_entry(total=200, used=50, free=150, energy=2.5) + out = parse_gpu_metrics([g0, g1]) + self.assertEqual(out["gpu.total_vram"], 300) + self.assertEqual(out["gpu.used_vram"], 80) + self.assertEqual(out["gpu.free_vram"], 220) + self.assertEqual(out["gpu.energy_j"], 4.0) + + def test_activity_aggregation_is_average_not_sum(self): + """Guards against an impl that sums activity instead of averaging: + two equal nonzero GPUs must yield the per-GPU value, not double it.""" + g = _full_gpu_entry(gfx=50, umc=50, mm=50) + out = parse_gpu_metrics([g, _full_gpu_entry(gfx=50, umc=50, mm=50)]) + self.assertEqual(out["gpu.gfx_activity"], 50) + self.assertNotEqual(out["gpu.gfx_activity"], 100) + + def test_vram_aggregation_is_sum_not_average(self): + """Guards against an impl that averages vram/energy instead of summing: + two equal GPUs must total double, not stay equal.""" + g0 = _full_gpu_entry(total=100, used=40, free=60, energy=10.0) + g1 = _full_gpu_entry(total=100, used=40, free=60, energy=10.0) + out = parse_gpu_metrics([g0, g1]) + self.assertEqual(out["gpu.total_vram"], 200) + self.assertEqual(out["gpu.energy_j"], 20.0) + self.assertNotEqual(out["gpu.total_vram"], 100) + + # --- partial-entry aggregation --- + + def test_partial_entry_field_excluded_others_aggregated(self): + """A field missing on one GPU -> aggregate the remaining GPUs for it; + other fields still aggregate across all GPUs that have them.""" + g0 = _full_gpu_entry(gfx=20, total=100, used=40, free=60, energy=5.0) + # g1 has no usage block at all -> gfx None for g1 + g1 = { + "mem_usage": { + "total_vram": {"value": 200}, + "used_vram": {"value": 60}, + "free_vram": {"value": 140}, + }, + "energy": {"total_energy_consumption": {"value": 7.0}}, + } + out = parse_gpu_metrics([g0, g1]) + # activity only present on g0 -> aggregate is just g0's values + self.assertEqual(out["gpu.gfx_activity"], 20) + self.assertEqual(out["gpu.umc_activity"], 20) # g0 fixture default + self.assertEqual(out["gpu.mm_activity"], 10) # g0 fixture default + # vram present on both -> summed + self.assertEqual(out["gpu.total_vram"], 300) + self.assertEqual(out["gpu.used_vram"], 100) + self.assertEqual(out["gpu.free_vram"], 200) + # energy present on both -> summed + self.assertEqual(out["gpu.energy_j"], 12.0) + + def test_field_absent_on_all_gpus_yields_none(self): + """If no GPU supplies a field, the aggregate for that field is None, + while present fields still aggregate.""" + no_energy = { + "usage": { + "gfx_activity": {"value": 10}, + "umc_activity": {"value": 10}, + "mm_activity": {"value": 10}, + }, + "mem_usage": { + "total_vram": {"value": 100}, + "used_vram": {"value": 50}, + "free_vram": {"value": 50}, + }, + } + out = parse_gpu_metrics([no_energy, dict(no_energy)]) + self.assertIsNone(out["gpu.energy_j"]) + self.assertEqual(out["gpu.gfx_activity"], 10) + self.assertEqual(out["gpu.total_vram"], 200) + + def test_single_gpu_aggregated_field_types(self): + """Activity and vram fields from a single full entry must be int (or float for energy).""" + out = parse_gpu_metrics([_full_gpu_entry(gfx=10, umc=20, mm=30, total=1000, used=200, free=800, energy=5.0)]) + for k in ACTIVITY_KEYS: + with self.subTest(key=k): + self.assertIsInstance(out[k], (int, float)) + for k in VRAM_KEYS: + with self.subTest(key=k): + self.assertIsInstance(out[k], int) + self.assertIsInstance(out[ENERGY_KEY], float) + + def test_partial_vram_none_excluded_from_sum(self): + """GPU with no mem_usage block: its vram fields are None and excluded; + only the GPU that has vram contributes to the sum.""" + g0 = _full_gpu_entry(total=100, used=40, free=60, energy=2.0) + g1 = { + "usage": {"gfx_activity": {"value": 10}, "umc_activity": {"value": 10}, "mm_activity": {"value": 10}}, + "energy": {"total_energy_consumption": {"value": 3.0}}, + } + out = parse_gpu_metrics([g0, g1]) + self.assertEqual(out["gpu.total_vram"], 100) + self.assertEqual(out["gpu.used_vram"], 40) + self.assertEqual(out["gpu.free_vram"], 60) + self.assertEqual(out["gpu.energy_j"], 5.0) + + def test_partial_energy_none_excluded_from_sum(self): + """GPU with no energy block: its energy is None and excluded; + only the GPU that has energy contributes to the sum.""" + g0 = _full_gpu_entry(energy=500.0) + g1 = { + "usage": {"gfx_activity": {"value": 5}, "umc_activity": {"value": 5}, "mm_activity": {"value": 5}}, + "mem_usage": {"total_vram": {"value": 50}, "used_vram": {"value": 10}, "free_vram": {"value": 40}}, + } + out = parse_gpu_metrics([g0, g1]) + self.assertEqual(out["gpu.energy_j"], 500.0) + + def test_zero_vram_not_excluded_from_aggregation(self): + """total_vram=0 is valid; a falsy-zero aggregation bug (if val: acc += val) + would skip 0 and return None instead of 0. Single-GPU with all-zero VRAM.""" + out = parse_gpu_metrics([_full_gpu_entry(total=0, used=0, free=0)]) + self.assertEqual(out["gpu.total_vram"], 0) + self.assertEqual(out["gpu.used_vram"], 0) + self.assertEqual(out["gpu.free_vram"], 0) + + def test_zero_energy_not_excluded_from_aggregation(self): + """energy=0.0 is valid; a falsy-zero aggregation bug (if energy: skip) would + incorrectly exclude it. Two GPUs each with energy=0.0 must sum to 0.0.""" + g0 = _full_gpu_entry(energy=0.0) + g1 = _full_gpu_entry(energy=0.0) + out = parse_gpu_metrics([g0, g1]) + self.assertEqual(out["gpu.energy_j"], 0.0) + self.assertIsInstance(out["gpu.energy_j"], float) + + def test_zero_activity_not_excluded_from_average(self): + """gfx_activity=0 is valid (GPU idle). A falsy-zero bug in the aggregator + would exclude it from the average, giving wrong denominator+numerator.""" + g0 = _full_gpu_entry(gfx=0) + g1 = _full_gpu_entry(gfx=20) + out = parse_gpu_metrics([g0, g1]) + self.assertEqual(out["gpu.gfx_activity"], 10.0) # (0+20)/2, not 20/1=20 + + def test_partial_none_activity_averaging_three_gpus(self): + """With N=3 where one has no activity, mean of non-None values is: + (30 + 60) / 2 = 45.0 — not sum=90, not divide-by-3=30.""" + g_no_usage = { + "mem_usage": {"total_vram": {"value": 50}, "used_vram": {"value": 20}, "free_vram": {"value": 30}}, + "energy": {"total_energy_consumption": {"value": 1.0}}, + } + g0 = _full_gpu_entry(gfx=30) + g1 = _full_gpu_entry(gfx=60) + out = parse_gpu_metrics([g0, g_no_usage, g1]) + self.assertEqual(out["gpu.gfx_activity"], 45.0) + + def test_activity_averaging_three_full_gpus(self): + """N=3 averaging: (10+20+30)/3=20.0. Guards against hardcoded denominator=2.""" + g0 = _full_gpu_entry(gfx=10, umc=0, mm=5) + g1 = _full_gpu_entry(gfx=20, umc=60, mm=5) + g2 = _full_gpu_entry(gfx=30, umc=120, mm=5) + out = parse_gpu_metrics([g0, g1, g2]) + self.assertEqual(out["gpu.gfx_activity"], 20.0) # (10+20+30)/3 + self.assertEqual(out["gpu.umc_activity"], 60.0) # (0+60+120)/3 + self.assertEqual(out["gpu.mm_activity"], 5.0) # (5+5+5)/3 + + def test_vram_and_energy_summed_three_gpus(self): + """N=3 sum: guards against loop body that caps at 2 entries or re-inits acc.""" + g0 = _full_gpu_entry(total=100, used=10, free=90, energy=1.0) + g1 = _full_gpu_entry(total=200, used=20, free=180, energy=2.0) + g2 = _full_gpu_entry(total=300, used=30, free=270, energy=3.0) + out = parse_gpu_metrics([g0, g1, g2]) + self.assertEqual(out["gpu.total_vram"], 600) + self.assertEqual(out["gpu.used_vram"], 60) + self.assertEqual(out["gpu.free_vram"], 540) + self.assertEqual(out["gpu.energy_j"], 6.0) + + def test_never_raises_on_list_of_empty_entries(self): + """Contract: never raises. All-empty entries -> all keys present, None.""" + out = parse_gpu_metrics([{}, {}, {}]) + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + for k in ALL_KEYS: + with self.subTest(key=k): + self.assertIsNone(out[k]) + + +# --------------------------------------------------------------------------- +# capture_gpu_metrics — I/O subsystem (orch-delegating, not a pure parser) +# Classification: integration boundary; tested only at the mock seam. +# Contract: calls orch to run amd-smi, passes the JSON list to parse_gpu_metrics, +# returns whatever parse_gpu_metrics returns. Never raises on malformed output. +# --------------------------------------------------------------------------- + + +class TestCaptureGpuMetrics(unittest.TestCase): + """capture_gpu_metrics delegates to parse_gpu_metrics and wraps the orch call. + + The function requires a live ContainerOrchestrator to invoke amd-smi, so + unit tests mock the orch dependency and verify delegation semantics only. + They never assert on hardware-specific values. + """ + + def _make_orch(self, raw_gpu_list): + """Return a mock orchestrator whose exec result decodes to raw_gpu_list. + + The real ContainerOrchestrator.exec(cmd) returns {host: str}; we mock + the same shape so tests are grounded in the actual interface contract. + """ + import json + + orch = MagicMock() + orch.exec.return_value = {"node0": json.dumps(raw_gpu_list)} + return orch + + def test_happy_path_key_set_matches_all_keys(self): + """Given a valid amd-smi JSON list, capture_gpu_metrics returns all 7 keys, + delegates to parse_gpu_metrics, and passes the parsed values through.""" + orch = self._make_orch([_full_gpu_entry()]) + with patch("cvs.lib.utils.gpu.parse_gpu_metrics", wraps=parse_gpu_metrics) as mock_parse: + out = capture_gpu_metrics(orch) + self.assertIsInstance(out, dict) + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + mock_parse.assert_called_once_with([_full_gpu_entry()]) + # Pin the exact command string sent to amd-smi. + orch.exec.assert_called_once_with("sudo amd-smi metric --json") + # Verify parse result is actually returned, not silently discarded. + self.assertEqual(out["gpu.gfx_activity"], 30) + self.assertIsNotNone(out["gpu.total_vram"]) + + def test_multi_host_entries_aggregated_together(self): + """All hosts' GPU entries must be pooled before aggregation. + + A mutant that reads only the first host's data would yield gfx=10 + (average of one entry), not 15.0 (average across both hosts' entries). + """ + import json + + orch = MagicMock() + orch.exec.return_value = { + "node0": json.dumps([_full_gpu_entry(gfx=10)]), + "node1": json.dumps([_full_gpu_entry(gfx=20)]), + } + out = capture_gpu_metrics(orch) + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + self.assertAlmostEqual(out["gpu.gfx_activity"], 15.0) + + def test_no_raise_on_empty_gpu_list(self): + """Empty GPU list -> all 7 keys, all None. Must not raise.""" + orch = self._make_orch([]) + out = capture_gpu_metrics(orch) + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + for k in ALL_KEYS: + with self.subTest(key=k): + self.assertIsNone(out[k]) + + def test_no_raise_on_malformed_orch_output(self): + """If orch returns non-JSON text, capture_gpu_metrics degrades; never raises.""" + orch = MagicMock() + orch.exec.return_value = {"node0": "not valid json at all"} + try: + out = capture_gpu_metrics(orch) + except Exception as exc: # noqa: BLE001 + self.fail(f"capture_gpu_metrics raised unexpectedly: {exc!r}") + else: + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + for k in ALL_KEYS: + with self.subTest(key=k): + self.assertIsNone(out[k]) + + def test_no_raise_on_valid_json_wrong_type(self): + """Valid JSON that decodes to a non-list (dict, null, scalar, string) + must degrade gracefully — never raises, returns all-None.""" + import json + + non_list_values = [{}, None, 42, "string"] + for val in non_list_values: + with self.subTest(decoded_type=type(val).__name__): + orch = MagicMock() + orch.exec.return_value = {"node0": json.dumps(val)} + try: + out = capture_gpu_metrics(orch) + except Exception as exc: # noqa: BLE001 + self.fail(f"capture_gpu_metrics raised on decoded {val!r}: {exc!r}") + else: + self.assertEqual(set(out.keys()), set(ALL_KEYS)) + for k in ALL_KEYS: + self.assertIsNone(out[k]) + + +# --------------------------------------------------------------------------- +# GPU_METRICS / GPU_METRIC_UNITS — module constants (invariants) +# --------------------------------------------------------------------------- + + +class TestGpuMetricsConstants(unittest.TestCase): + """Invariants tying GPU_METRICS, GPU_METRIC_UNITS, _RAW_GPU_FIELDS, and parser output keys.""" + + # Human-readable derived metrics that appear as HTML rows in the test report. + EXPECTED_DERIVED_NAMES = { + "peak_gpu_memory_mb", + "model_load_memory_mb", + "model_load_s", + "gpu_bandwidth_util_pct", + "gpu_compute_util_pct", + } + + # Raw amd-smi parser output fields (internal; not surfaced as HTML rows). + EXPECTED_RAW_NAMES = { + "gfx_activity", + "umc_activity", + "mm_activity", + "total_vram", + "used_vram", + "free_vram", + "energy_j", + } + + # --- GPU_METRICS (derived, human-readable) --- + + def test_gpu_metrics_covers_all_five_derived_names(self): + short_names = {short for short, _unit in GPU_METRICS} + self.assertEqual(short_names, self.EXPECTED_DERIVED_NAMES) + + def test_gpu_metrics_entries_are_name_unit_pairs(self): + for entry in GPU_METRICS: + with self.subTest(entry=entry): + self.assertEqual(len(entry), 2) + short, unit = entry + self.assertIsInstance(short, str) + self.assertIsInstance(unit, str) + + def test_every_derived_metric_has_matching_unit(self): + """Invariant: every short_name in GPU_METRICS has a key in GPU_METRIC_UNITS.""" + for short, unit in GPU_METRICS: + with self.subTest(short=short): + self.assertIn(short, GPU_METRIC_UNITS) + self.assertEqual(GPU_METRIC_UNITS[short], unit) + + def test_units_dict_derived_from_metrics(self): + """GPU_METRIC_UNITS is exactly the dict form of GPU_METRICS (no extras).""" + self.assertEqual(GPU_METRIC_UNITS, dict(GPU_METRICS)) + + def test_derived_unit_strings_match_spec(self): + """Unit strings pinned to spec values.""" + EXPECTED_UNITS = { + "peak_gpu_memory_mb": "MB", + "model_load_memory_mb": "MB", + "model_load_s": "s", + "gpu_bandwidth_util_pct": "%", + "gpu_compute_util_pct": "%", + } + self.assertEqual(GPU_METRIC_UNITS, EXPECTED_UNITS) + + # --- _RAW_GPU_FIELDS (amd-smi parser output) --- + + def test_raw_fields_covers_all_seven_amd_smi_fields(self): + raw_names = {short for short, _unit in _RAW_GPU_FIELDS} + self.assertEqual(raw_names, self.EXPECTED_RAW_NAMES) + + def test_raw_field_units_derived_from_raw_fields(self): + """_RAW_GPU_FIELD_UNITS is exactly the dict form of _RAW_GPU_FIELDS.""" + self.assertEqual(_RAW_GPU_FIELD_UNITS, dict(_RAW_GPU_FIELDS)) + + def test_raw_unit_strings_match_spec(self): + EXPECTED_RAW_UNITS = { + "gfx_activity": "%", + "umc_activity": "%", + "mm_activity": "%", + "total_vram": "MB", + "used_vram": "MB", + "free_vram": "MB", + "energy_j": "J", + } + self.assertEqual(_RAW_GPU_FIELD_UNITS, EXPECTED_RAW_UNITS) + + def test_parse_gpu_metrics_emits_key_for_every_raw_field(self): + """parse_gpu_metrics([full]) produces "gpu." for every k in _RAW_GPU_FIELDS.""" + self.assertGreater(len(_RAW_GPU_FIELDS), 0, "_RAW_GPU_FIELDS must not be empty") + out = parse_gpu_metrics([_full_gpu_entry()]) + for short, _unit in _RAW_GPU_FIELDS: + with self.subTest(metric=short): + self.assertIn(f"gpu.{short}", out) + + def test_derived_metrics_not_emitted_by_parser(self): + """GPU_METRICS (derived) are computed in vllm_single, not by the parser. + parse_gpu_metrics must NOT emit keys for derived short names.""" + out = parse_gpu_metrics([_full_gpu_entry()]) + for short, _unit in GPU_METRICS: + with self.subTest(metric=short): + self.assertNotIn(f"gpu.{short}", out) + + +class TestMean(unittest.TestCase): + def test_empty(self): + self.assertIsNone(_mean([])) + + def test_all_none(self): + self.assertIsNone(_mean([None, None])) + + def test_normal(self): + self.assertAlmostEqual(_mean([1.0, 3.0]), 2.0) + + def test_skips_none(self): + self.assertAlmostEqual(_mean([None, 4.0, None, 2.0]), 3.0) + + +class TestAggReadings(unittest.TestCase): + def test_empty(self): + result = agg_readings([]) + self.assertIsNone(result["peak_gpu_memory_mb"]) + self.assertIsNone(result["gpu_compute_util_pct"]) + self.assertIsNone(result["gpu_bandwidth_util_pct"]) + + def test_all_none_values(self): + readings = [{"gpu.used_vram": None, "gpu.gfx_activity": None, "gpu.umc_activity": None}] + result = agg_readings(readings) + self.assertIsNone(result["peak_gpu_memory_mb"]) + + def test_normal(self): + readings = [ + {"gpu.used_vram": 1000, "gpu.gfx_activity": 80.0, "gpu.umc_activity": 60.0}, + {"gpu.used_vram": 2000, "gpu.gfx_activity": 90.0, "gpu.umc_activity": 70.0}, + ] + result = agg_readings(readings) + self.assertEqual(result["peak_gpu_memory_mb"], 2000) + self.assertAlmostEqual(result["gpu_compute_util_pct"], 85.0) + self.assertAlmostEqual(result["gpu_bandwidth_util_pct"], 65.0) + + +class TestPollGpuMetrics(unittest.TestCase): + def _make_orch(self): + return unittest.mock.MagicMock() + + def test_happy_path_stops_when_done(self): + orch = self._make_orch() + snap = { + "gpu.used_vram": 1000, + "gpu.gfx_activity": 80.0, + "gpu.umc_activity": 60.0, + "gpu.mm_activity": 1.0, + "gpu.free_vram": 5000, + "gpu.total_vram": 6000, + "gpu.energy_j": 100.0, + } + call_count = [0] + + def is_done(): + call_count[0] += 1 + return call_count[0] >= 2 # done after 2nd poll + + with ( + unittest.mock.patch("cvs.lib.utils.gpu.capture_gpu_metrics", return_value=snap), + unittest.mock.patch("time.sleep"), + ): + readings = poll_gpu_metrics(orch, is_done_fn=is_done, poll_interval_s=0) + + self.assertEqual(len(readings), 2) + + def test_node_death_stops_after_max_consecutive_failures(self): + orch = self._make_orch() + + def is_done(): + return False + + with ( + unittest.mock.patch("cvs.lib.utils.gpu.capture_gpu_metrics", side_effect=RuntimeError("SSH timeout")), + unittest.mock.patch("time.sleep"), + ): + readings = poll_gpu_metrics( + orch, + is_done_fn=is_done, + poll_interval_s=0, + max_consecutive_failures=3, + ) + + self.assertEqual(readings, []) + + def test_writes_log_file(self): + import tempfile + import os + + orch = self._make_orch() + snap = { + "gpu.used_vram": 1000, + "gpu.gfx_activity": 80.0, + "gpu.umc_activity": 60.0, + "gpu.mm_activity": 1.0, + "gpu.free_vram": 5000, + "gpu.total_vram": 6000, + "gpu.energy_j": 100.0, + } + done_calls = [0] + + def is_done(): + done_calls[0] += 1 + return done_calls[0] >= 1 + + with tempfile.NamedTemporaryFile(delete=False, suffix=".log") as f: + log_path = f.name + try: + with ( + unittest.mock.patch("cvs.lib.utils.gpu.capture_gpu_metrics", return_value=snap), + unittest.mock.patch("time.sleep"), + ): + poll_gpu_metrics(orch, is_done_fn=is_done, poll_interval_s=0, log_path=log_path) + content = open(log_path).read() + self.assertIn("summary", content) + finally: + os.unlink(log_path) + + def test_failure_then_recovery_resets_counter(self): + orch = self._make_orch() + snap = { + "gpu.used_vram": 1000, + "gpu.gfx_activity": 80.0, + "gpu.umc_activity": 60.0, + "gpu.mm_activity": 1.0, + "gpu.free_vram": 5000, + "gpu.total_vram": 6000, + "gpu.energy_j": 100.0, + } + call_seq = [RuntimeError("fail"), RuntimeError("fail"), snap, snap] + call_iter = iter(call_seq) + done_calls = [0] + + def capture(*a, **kw): + v = next(call_iter) + if isinstance(v, Exception): + raise v + return v + + def is_done(): + done_calls[0] += 1 + return done_calls[0] >= 2 + + with ( + unittest.mock.patch("cvs.lib.utils.gpu.capture_gpu_metrics", side_effect=capture), + unittest.mock.patch("time.sleep"), + ): + readings = poll_gpu_metrics( + orch, + is_done_fn=is_done, + poll_interval_s=0, + max_consecutive_failures=3, + ) + + self.assertEqual(len(readings), 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/cvs/tests/inference/vllm/_shared.py b/cvs/tests/inference/vllm/_shared.py index c8d1d062..a1576988 100644 --- a/cvs/tests/inference/vllm/_shared.py +++ b/cvs/tests/inference/vllm/_shared.py @@ -44,6 +44,9 @@ def test_print_results_table(inf_res_dict): "P95 TPOT (ms)", "P99 ITL (ms)", "Goodput (req/s)", + "Peak VRAM (MB)", + "Compute %", + "BW %", ] rows = [] for key, host_dict in inf_res_dict.items(): @@ -66,6 +69,9 @@ def test_print_results_table(inf_res_dict): _cell(m, "client.p95_tpot_ms"), _cell(m, "client.p99_itl_ms"), _cell(m, "client.goodput"), + _cell(m, "gpu.peak_gpu_memory_mb"), + _cell(m, "gpu.gpu_compute_util_pct"), + _cell(m, "gpu.gpu_bandwidth_util_pct"), ] ) log.info("\n" + tabulate(rows, headers=headers, tablefmt="github")) diff --git a/cvs/tests/inference/vllm/vllm_single.py b/cvs/tests/inference/vllm/vllm_single.py index fd33dccc..3b45626f 100644 --- a/cvs/tests/inference/vllm/vllm_single.py +++ b/cvs/tests/inference/vllm/vllm_single.py @@ -17,6 +17,13 @@ from cvs.lib.utils.verdict import evaluate_all from cvs.lib.inference.utils.vllm_parsing import CLIENT_METRICS as _METRICS, CLIENT_METRIC_UNITS as _METRIC_UNITS from cvs.lib.inference.vllm_single import VllmJob +from cvs.lib.utils.gpu import ( + capture_gpu_metrics, + poll_gpu_metrics, + agg_readings, + GPU_METRICS as _GPU_METRICS, + GPU_METRIC_UNITS as _GPU_METRIC_UNITS, +) import importlib.util as _ilu import pathlib as _pl @@ -86,6 +93,14 @@ def pytest_generate_tests(metafunc): metric_cases.append((combo, c, short)) metric_ids.append(cid + "-" + short) metafunc.parametrize("seq_combo,concurrency,metric", metric_cases, ids=metric_ids) + elif "gpu_metric" in metafunc.fixturenames and cases: + gpu_metric_cases = [] + gpu_metric_ids = [] + for (combo, c), cid in zip(cases, ids): + for short, _unit in _GPU_METRICS: + gpu_metric_cases.append((combo, c, short)) + gpu_metric_ids.append(cid + "-gpu." + short) + metafunc.parametrize("seq_combo,concurrency,gpu_metric", gpu_metric_cases, ids=gpu_metric_ids) elif "seq_combo" in metafunc.fixturenames and "concurrency" in metafunc.fixturenames and cases: metafunc.parametrize("seq_combo,concurrency", cases, ids=ids) @@ -202,7 +217,18 @@ def test_model_fetch(orch, variant_config, lifecycle, request): pytest.fail(f"no model bytes under {models_dir} after fetch") -def test_vllm_inference(orch, variant_config, hf_token, seq_combo, concurrency, inf_res_dict, lifecycle, request): +def _snap(orch, label): + """Capture GPU metrics; return {} and log a warning on any failure.""" + try: + return capture_gpu_metrics(orch) + except Exception as exc: # noqa: BLE001 + log.warning("GPU snapshot %r failed (ignored): %s", label, exc) + return {} + + +def test_vllm_inference( + orch, variant_config, hf_token, seq_combo, concurrency, inf_res_dict, gpu_metrics_snap, lifecycle, request +): if lifecycle.failed: pytest.skip("a prior lifecycle stage failed") isl = seq_combo["isl"] @@ -219,6 +245,15 @@ def test_vllm_inference(orch, variant_config, hf_token, seq_combo, concurrency, client_poll_count=int(variant_config.params.client_poll_count), ) + cell_key = ( + variant_config.model.id, + variant_config.gpu_arch, + isl, + osl, + seq_combo.get("name", "default"), + concurrency, + ) + # A failure mid-sweep flips lifecycle.failed so the remaining cells skip # cleanly (instead of each re-failing) AND the orch leak-guard finalizer # still tears the container down. The explicit teardown row may not run on @@ -226,29 +261,67 @@ def test_vllm_inference(orch, variant_config, hf_token, seq_combo, concurrency, try: job.stop_server() job.build_server_cmd() + + # Preload snapshot: baseline VRAM before the model is loaded into GPU HBM. + gpu_metrics_snap[(cell_key, "preload")] = _snap(orch, "preload") + t = time.monotonic() job.start_server() job.wait_ready() - lifecycle.record(request.node.nodeid, "server_ready", time.monotonic() - t) + model_load_s = time.monotonic() - t + lifecycle.record(request.node.nodeid, "server_ready", model_load_s) + + # Post-load snapshot: weights are resident, KV cache not yet allocated. + gpu_metrics_snap[(cell_key, "loaded")] = _snap(orch, "loaded") + + # Background the client, then poll GPU while it runs. job.run_client() + + # Poll GPU while client runs + _gpu_log = _pl.Path(job.out_dir) / "gpu_poll.log" + _model_load_mb = ( + (gpu_metrics_snap.get((cell_key, "loaded"), {}).get("gpu.used_vram") or 0) + - (gpu_metrics_snap.get((cell_key, "preload"), {}).get("gpu.used_vram") or 0) + ) or None + _model_load_s = model_load_s + _poll_readings = poll_gpu_metrics( + orch, + is_done_fn=job.is_client_done, + poll_interval_s=15, + label="poll", + log_path=str(_gpu_log), + model_load_s=_model_load_s, + model_load_memory_mb=_model_load_mb, + ) + _agg = agg_readings(_poll_readings) + job.wait_client_complete() + results = job.parse_results() except Exception: lifecycle.failed = True raise - key = ( - variant_config.model.id, - variant_config.gpu_arch, - isl, - osl, - seq_combo.get("name", "default"), - concurrency, - ) - inf_res_dict[key] = results - # Verdict is no longer asserted here: each metric is its own test (test_metric, - # one HTML row per metric per cell). This test only runs the benchmark and - # records the cell's results into the module-scoped inf_res_dict. + # Compute the five human-readable derived metrics from the poll aggregation. + gpu_actuals = { + # Peak VRAM = max used_vram across poll readings. + "gpu.peak_gpu_memory_mb": _agg.get("peak_gpu_memory_mb"), + # Model load memory = VRAM increase from before start_server to after wait_ready. + "gpu.model_load_memory_mb": _model_load_mb, + # Model load wall-clock time. + "gpu.model_load_s": _model_load_s, + # Memory bandwidth utilisation (UMC activity) averaged over poll readings. + "gpu.gpu_bandwidth_util_pct": _agg.get("gpu_bandwidth_util_pct"), + # Compute utilisation (GFX activity) averaged over poll readings. + "gpu.gpu_compute_util_pct": _agg.get("gpu_compute_util_pct"), + } + for _host, client_m in results.items(): + client_m.update(gpu_actuals) + + inf_res_dict[cell_key] = results + # Verdict is no longer asserted here: each metric is its own test (test_metric / + # test_gpu_metric, one HTML row per metric per cell). This test only runs the + # benchmark and records the cell's results into the module-scoped inf_res_dict. def test_metric(seq_combo, concurrency, metric, inf_res_dict, variant_config, lifecycle, request): @@ -298,6 +371,43 @@ def test_metric(seq_combo, concurrency, metric, inf_res_dict, variant_config, li evaluate_all(actuals, {full: spec}) +def test_gpu_metric(seq_combo, concurrency, gpu_metric, inf_res_dict, variant_config, lifecycle, request): + """One pytest test (= one HTML row) per GPU metric per cell. + + Mirrors test_metric exactly: reads a single cached metric from the + module-scoped inf_res_dict and surfaces it as its own pass/fail row. + GPU metrics are merged into the actuals dict by test_vllm_inference. + """ + if lifecycle.failed: + pytest.skip("a prior lifecycle stage failed") + isl = seq_combo["isl"] + osl = seq_combo["osl"] + key = ( + variant_config.model.id, + variant_config.gpu_arch, + isl, + osl, + seq_combo.get("name", "default"), + concurrency, + ) + if key not in inf_res_dict: + pytest.skip(f"no recorded results for cell {key!r}") + _host, actuals = next(iter(inf_res_dict[key].items())) + full = f"gpu.{gpu_metric}" + value = actuals.get(full) + unit = _GPU_METRIC_UNITS.get(gpu_metric, "-") + request.node.user_properties.append(("metric_value", value)) + request.node.user_properties.append(("metric_unit", unit)) + + if not variant_config.enforce_thresholds: + return + cell = variant_config.cell_key(isl, osl, concurrency) + spec = (variant_config.thresholds.get(cell) or {}).get(full) + if spec is None: + return + evaluate_all(actuals, {full: spec}) + + def test_teardown(orch, lifecycle, request): """Final stage: explicit container teardown, timed, asserting it is gone. From c555c510b3366fb68f909a9988813520814f942b Mon Sep 17 00:00:00 2001 From: Atul Nair Date: Wed, 24 Jun 2026 20:42:53 -0400 Subject: [PATCH 2/7] fix(vllm_single): add missing gpu_metrics_snap module-scope fixture The fixture was referenced in test_vllm_inference's parameter list but never defined, causing a setup Error before any inference ran. --- cvs/tests/inference/vllm/conftest.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cvs/tests/inference/vllm/conftest.py b/cvs/tests/inference/vllm/conftest.py index 01c6dd71..be4c3dbb 100644 --- a/cvs/tests/inference/vllm/conftest.py +++ b/cvs/tests/inference/vllm/conftest.py @@ -122,6 +122,11 @@ def inf_res_dict(): return {} +@pytest.fixture(scope="module") +def gpu_metrics_snap(): + return {} + + def pytest_collection_modifyitems(items): """Pin the lifecycle order explicitly instead of relying on definition order. From b8ad9006fd0de03c1f130b77582dd9d26282cbf4 Mon Sep 17 00:00:00 2001 From: Atul Nair Date: Wed, 24 Jun 2026 21:07:10 -0400 Subject: [PATCH 3/7] fix(gpu): use exec_on_head for amd-smi; mkdir gpu_poll.log parent dir MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit amd-smi is a host-side tool — running it via orch.exec() sends it into the container where it doesn't exist. Switch capture_gpu_metrics to orch.exec_on_head() so the command runs on the bare-metal node. Also ensure the out_dir exists before poll_gpu_metrics attempts to write gpu_poll.log, since the directory is created lazily by the job setup. Update unit test mocks from exec to exec_on_head to match. --- cvs/lib/utils/gpu.py | 7 ++++--- cvs/lib/utils/unittests/test_gpu.py | 20 +++++++++++--------- cvs/tests/inference/vllm/vllm_single.py | 1 + 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/cvs/lib/utils/gpu.py b/cvs/lib/utils/gpu.py index fe5d135d..d9d3411c 100644 --- a/cvs/lib/utils/gpu.py +++ b/cvs/lib/utils/gpu.py @@ -165,12 +165,13 @@ def _try_parse(text: str) -> list: def capture_gpu_metrics(orch) -> dict: - """One amd-smi exec via orch.exec(). Returns flat {gpu.* metrics} dict. + """One amd-smi exec on the host node. Returns flat {gpu.* metrics} dict. - orch: ContainerOrchestrator (has .exec(cmd) -> {host: str}). + orch: ContainerOrchestrator (has .exec_on_head(cmd) -> {host: str}). + amd-smi is a host-side tool and must run outside the container. Degrades gracefully on empty/unparseable JSON per host (returns all-None dict). """ - out = orch.exec("sudo amd-smi metric --json") + out = orch.exec_on_head("amd-smi metric --json") all_entries = [] for _host, text in out.items(): all_entries.extend(_try_parse(text)) diff --git a/cvs/lib/utils/unittests/test_gpu.py b/cvs/lib/utils/unittests/test_gpu.py index ea31a9aa..e8c28955 100644 --- a/cvs/lib/utils/unittests/test_gpu.py +++ b/cvs/lib/utils/unittests/test_gpu.py @@ -551,15 +551,17 @@ class TestCaptureGpuMetrics(unittest.TestCase): """ def _make_orch(self, raw_gpu_list): - """Return a mock orchestrator whose exec result decodes to raw_gpu_list. + """Return a mock orchestrator whose exec_on_head result decodes to raw_gpu_list. - The real ContainerOrchestrator.exec(cmd) returns {host: str}; we mock - the same shape so tests are grounded in the actual interface contract. + amd-smi is a host-side tool; capture_gpu_metrics uses exec_on_head so + the command runs on the bare-metal node, not inside the container. + The real ContainerOrchestrator.exec_on_head(cmd) returns {host: str}; + we mock the same shape so tests are grounded in the actual interface contract. """ import json orch = MagicMock() - orch.exec.return_value = {"node0": json.dumps(raw_gpu_list)} + orch.exec_on_head.return_value = {"node0": json.dumps(raw_gpu_list)} return orch def test_happy_path_key_set_matches_all_keys(self): @@ -571,8 +573,8 @@ def test_happy_path_key_set_matches_all_keys(self): self.assertIsInstance(out, dict) self.assertEqual(set(out.keys()), set(ALL_KEYS)) mock_parse.assert_called_once_with([_full_gpu_entry()]) - # Pin the exact command string sent to amd-smi. - orch.exec.assert_called_once_with("sudo amd-smi metric --json") + # Pin the exact command string sent to amd-smi (host-side, no sudo needed). + orch.exec_on_head.assert_called_once_with("amd-smi metric --json") # Verify parse result is actually returned, not silently discarded. self.assertEqual(out["gpu.gfx_activity"], 30) self.assertIsNotNone(out["gpu.total_vram"]) @@ -586,7 +588,7 @@ def test_multi_host_entries_aggregated_together(self): import json orch = MagicMock() - orch.exec.return_value = { + orch.exec_on_head.return_value = { "node0": json.dumps([_full_gpu_entry(gfx=10)]), "node1": json.dumps([_full_gpu_entry(gfx=20)]), } @@ -606,7 +608,7 @@ def test_no_raise_on_empty_gpu_list(self): def test_no_raise_on_malformed_orch_output(self): """If orch returns non-JSON text, capture_gpu_metrics degrades; never raises.""" orch = MagicMock() - orch.exec.return_value = {"node0": "not valid json at all"} + orch.exec_on_head.return_value = {"node0": "not valid json at all"} try: out = capture_gpu_metrics(orch) except Exception as exc: # noqa: BLE001 @@ -626,7 +628,7 @@ def test_no_raise_on_valid_json_wrong_type(self): for val in non_list_values: with self.subTest(decoded_type=type(val).__name__): orch = MagicMock() - orch.exec.return_value = {"node0": json.dumps(val)} + orch.exec_on_head.return_value = {"node0": json.dumps(val)} try: out = capture_gpu_metrics(orch) except Exception as exc: # noqa: BLE001 diff --git a/cvs/tests/inference/vllm/vllm_single.py b/cvs/tests/inference/vllm/vllm_single.py index 3b45626f..010f646a 100644 --- a/cvs/tests/inference/vllm/vllm_single.py +++ b/cvs/tests/inference/vllm/vllm_single.py @@ -279,6 +279,7 @@ def test_vllm_inference( # Poll GPU while client runs _gpu_log = _pl.Path(job.out_dir) / "gpu_poll.log" + _gpu_log.parent.mkdir(parents=True, exist_ok=True) _model_load_mb = ( (gpu_metrics_snap.get((cell_key, "loaded"), {}).get("gpu.used_vram") or 0) - (gpu_metrics_snap.get((cell_key, "preload"), {}).get("gpu.used_vram") or 0) From d08063fb312a2eaba000710ea9fde100ccb58d69 Mon Sep 17 00:00:00 2001 From: Atul Nair Date: Wed, 24 Jun 2026 21:17:47 -0400 Subject: [PATCH 4/7] fix(vllm_single): write gpu_poll.log to tmp then copy to node via exec_on_head out_dir is an NFS path on the node, not mounted on the devbox. Write the log to a local tempdir, then base64-encode it and push it to the node via exec_on_head so it lands in the bundle. --- cvs/tests/inference/vllm/vllm_single.py | 26 +++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/cvs/tests/inference/vllm/vllm_single.py b/cvs/tests/inference/vllm/vllm_single.py index 010f646a..cb19c453 100644 --- a/cvs/tests/inference/vllm/vllm_single.py +++ b/cvs/tests/inference/vllm/vllm_single.py @@ -277,9 +277,13 @@ def test_vllm_inference( # Background the client, then poll GPU while it runs. job.run_client() - # Poll GPU while client runs - _gpu_log = _pl.Path(job.out_dir) / "gpu_poll.log" - _gpu_log.parent.mkdir(parents=True, exist_ok=True) + # Poll GPU while client runs. + # gpu_poll.log is written locally then copied into the remote out_dir + # (which is an NFS path on the node, not mounted on the devbox). + import tempfile as _tempfile + + _gpu_log_local = _pl.Path(_tempfile.mkdtemp()) / "gpu_poll.log" + _gpu_log_remote = f"{job.out_dir}/gpu_poll.log" _model_load_mb = ( (gpu_metrics_snap.get((cell_key, "loaded"), {}).get("gpu.used_vram") or 0) - (gpu_metrics_snap.get((cell_key, "preload"), {}).get("gpu.used_vram") or 0) @@ -290,10 +294,24 @@ def test_vllm_inference( is_done_fn=job.is_client_done, poll_interval_s=15, label="poll", - log_path=str(_gpu_log), + log_path=str(_gpu_log_local), model_load_s=_model_load_s, model_load_memory_mb=_model_load_mb, ) + # Copy log into the node's out_dir (NFS) so it lands in the bundle. + if _gpu_log_local.exists(): + try: + import base64 as _b64 + + _enc = _b64.b64encode(_gpu_log_local.read_bytes()).decode() + orch.exec_on_head( + f"mkdir -p {shlex.quote(job.out_dir)} && " + f"printf '%s' {shlex.quote(_enc)} | base64 -d > {shlex.quote(_gpu_log_remote)}" + ) + except Exception as _e: + import logging as _logging + + _logging.getLogger(__name__).warning("gpu_poll.log upload failed: %s", _e) _agg = agg_readings(_poll_readings) job.wait_client_complete() From 717284f30deac601b43099c64a0809539a413902 Mon Sep 17 00:00:00 2001 From: Atul Nair Date: Thu, 25 Jun 2026 15:26:10 -0400 Subject: [PATCH 5/7] fix(gpu): move deferred imports to module level; fix test_gpu_metric rank Move import time/logging/pathlib from inside poll_gpu_metrics body to module top-level. Add test_gpu_metric at rank 4 in conftest sort table so it runs before test_teardown, not after. --- cvs/lib/utils/gpu.py | 8 +++----- cvs/tests/inference/vllm/conftest.py | 1 + 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cvs/lib/utils/gpu.py b/cvs/lib/utils/gpu.py index d9d3411c..22463a24 100644 --- a/cvs/lib/utils/gpu.py +++ b/cvs/lib/utils/gpu.py @@ -5,6 +5,9 @@ from __future__ import annotations import json +import logging +import pathlib +import time # Human-readable derived metrics exposed as HTML rows (one row per entry per cell). # These are computed in vllm_single.py from the raw amd-smi snapshots and stored @@ -217,9 +220,6 @@ def poll_gpu_metrics( Returns list of raw snapshot dicts (failed polls excluded). Never raises. Writes per-poll lines + summary to log_path if given. """ - import time - import logging - log = logging.getLogger(__name__) readings: list = [] log_lines: list = [] @@ -286,8 +286,6 @@ def poll_gpu_metrics( if log_path is not None: try: - import pathlib - pathlib.Path(log_path).write_text("\n".join(log_lines) + "\n") except Exception as exc: log.warning("poll_gpu_metrics: failed to write log %s: %s", log_path, exc) diff --git a/cvs/tests/inference/vllm/conftest.py b/cvs/tests/inference/vllm/conftest.py index be4c3dbb..447f157a 100644 --- a/cvs/tests/inference/vllm/conftest.py +++ b/cvs/tests/inference/vllm/conftest.py @@ -142,6 +142,7 @@ def pytest_collection_modifyitems(items): "test_model_fetch": 2, "test_vllm_inference": 3, "test_metric": 4, + "test_gpu_metric": 4, "test_print_results_table": 5, "test_teardown": 6, } From a9e09bf3f4be22f8856a262a064a1dbce7f371f5 Mon Sep 17 00:00:00 2001 From: Atul Nair Date: Thu, 25 Jun 2026 15:26:18 -0400 Subject: [PATCH 6/7] docs(gpu): add gpu.py section to AGENTS.md and integration guide Add gpu.py API reference to cvs/lib/utils/AGENTS.md: public symbols, poll_gpu_metrics parameter table, 5-metric derivation table, required conftest fixtures (gpu_metrics_snap), two wiring patterns (sync poll / threaded poll), pytest_generate_tests parametrize branch, collection sort rank table, and gotchas (threshold key prefix, capture can raise, or-None semantics, full actuals for evaluate_all, GATED_METRICS). Add cvs/lib/utils/docs/gpu-metrics.md: user-facing integration guide covering the 5 derived metrics, polling lifecycle, 5-step integration walkthrough, gpu_poll.log format, failure/None handling table, and cross-references to ADDING_A_SUITE.md and threshold-kinds.md. --- cvs/lib/utils/AGENTS.md | 192 ++++++++++++++++++ cvs/lib/utils/docs/gpu-metrics.md | 316 ++++++++++++++++++++++++++++++ 2 files changed, 508 insertions(+) create mode 100644 cvs/lib/utils/docs/gpu-metrics.md diff --git a/cvs/lib/utils/AGENTS.md b/cvs/lib/utils/AGENTS.md index dfb8955b..12baf93d 100644 --- a/cvs/lib/utils/AGENTS.md +++ b/cvs/lib/utils/AGENTS.md @@ -3,6 +3,9 @@ **Boundary**: if every CVS suite (inference, training, ...) needs it, it belongs here. Inference-only symbols belong in `cvs/lib/inference/utils/`; single-framework symbols in `cvs/lib//utils/`. +> **New in this boundary**: `gpu.py` — GPU metrics polling, usable by any inference suite. +> See `docs/gpu-metrics.md` for the integration guide. + --- ## Files @@ -176,3 +179,192 @@ or substitution. After calling `substitute_config`, attach `thresholds`, then bu - **`container.model_dump()` is the orchestrator contract** — serialises to `{lifetime, name, image, runtime: {name, args}}` that `OrchestratorConfig.from_configs` consumes; do not reshape the dict before passing it. + +--- + +### `gpu.py` + +GPU metrics polling library. No side-effects at import time; safe to import in any suite. + +**When to use**: add GPU utilisation rows to an inference suite's HTML report. +Do not copy-paste this logic — import it. + +#### Public API + +| Symbol | Kind | Purpose | +|---|---|---| +| `GPU_METRICS` | `list[tuple[str, str]]` | 5 derived metric keys + units, in display order. Iterate to register `test_gpu_metric` parametrize IDs and threshold keys. | +| `GPU_METRIC_UNITS` | `dict[str, str]` | `{key: unit}` convenience dict built from `GPU_METRICS`. | +| `capture_gpu_metrics(orch)` | function | One `amd-smi metric --json` exec on the head node. Returns `{gpu.*: value_or_None}` raw snapshot. | +| `agg_readings(readings)` | function | Aggregates a list of raw snapshots → `{peak_gpu_memory_mb, gpu_compute_util_pct, gpu_bandwidth_util_pct}`. | +| `poll_gpu_metrics(orch, is_done_fn, ...)` | function | Polling loop. Returns list of raw snapshots. Never raises. | + +#### `poll_gpu_metrics` parameters + +| Parameter | Default | Notes | +|---|---|---| +| `orch` | — | `ContainerOrchestrator`; must have `.exec_on_head(cmd)` | +| `is_done_fn` | — | Callable returning `bool`; polling stops when it returns `True` | +| `poll_interval_s` | `15` | Seconds between polls | +| `label` | `"poll"` | Log-line prefix tag | +| `log_path` | `None` | If given, writes `gpu_poll.log` to this path | +| `max_consecutive_failures` | `3` | Stops early after this many back-to-back `amd-smi` failures | +| `model_load_s` | `None` | Passed through into the summary block of `gpu_poll.log` | +| `model_load_memory_mb` | `None` | Passed through into the summary block of `gpu_poll.log` | + +`poll_gpu_metrics` returns the raw readings list. The caller computes the 5 derived +metrics by combining `agg_readings(readings)` with the separately-measured +`model_load_s` and `model_load_memory_mb` scalars. + +#### The 5 derived metrics and how they are computed + +| Key | Source | Aggregation | +|---|---|---| +| `peak_gpu_memory_mb` | `agg_readings` | `max(used_vram)` over polls, each poll summed across GPUs | +| `model_load_memory_mb` | caller-measured | `post_load_snap["gpu.used_vram"] - pre_load_snap["gpu.used_vram"]` | +| `model_load_s` | caller-measured | wall-clock elapsed while server starts | +| `gpu_bandwidth_util_pct` | `agg_readings` | `mean(umc_activity)` over polls, each poll averaged across GPUs | +| `gpu_compute_util_pct` | `agg_readings` | `mean(gfx_activity)` over polls, each poll averaged across GPUs | + +Store as `inf_res_dict[f"gpu.{key}"]` so `test_gpu_metric` can retrieve them. + +#### Required conftest fixtures + +Both fixtures must be module-scoped alongside `inf_res_dict`: + +```python +@pytest.fixture(scope="module") +def inf_res_dict(): + return {} + +@pytest.fixture(scope="module") +def gpu_metrics_snap(): + return {} # stores pre/post-load snapshots keyed by (cell_key, "preload"/"loaded") +``` + +`test_vllm_inference` accepts `gpu_metrics_snap` as a function argument to store +intermediate snapshots. Omitting it causes a collection error even if the test never +uses the fixture body. + +#### Wiring pattern + +Two valid patterns depending on how your client is invoked: + +**Pattern A — client is backgrounded by caller (synchronous poll):** + +```python +from cvs.lib.utils.gpu import GPU_METRICS, GPU_METRIC_UNITS, capture_gpu_metrics, poll_gpu_metrics, agg_readings +import time + +# --- inside test__inference --- +# Wrap capture_gpu_metrics to degrade gracefully if amd-smi is unavailable at snapshot time +def _snap(): + try: + return capture_gpu_metrics(orch) + except Exception: + return {} + +pre_snap = _snap() +t0 = time.monotonic() +# ... start server (returns immediately; use is_done_fn to signal client completion) ... +post_snap = _snap() +load_s = time.monotonic() - t0 +load_mb = ((post_snap.get("gpu.used_vram") or 0) - (pre_snap.get("gpu.used_vram") or 0)) or None + +poll_readings = poll_gpu_metrics( + orch, + is_done_fn=, # e.g. job.is_client_done + log_path=f"{log_dir}/gpu_poll.log", + model_load_s=load_s, + model_load_memory_mb=load_mb, +) +``` + +**Pattern B — client runs synchronously in main thread (poll in a thread):** + +```python +import threading + +done_flag = threading.Event() +poll_readings = [] +def _poll(): + poll_readings.extend(poll_gpu_metrics( + orch, done_flag.is_set, + log_path=f"{log_dir}/gpu_poll.log", + model_load_s=load_s, + model_load_memory_mb=load_mb, + )) +poll_thread = threading.Thread(target=_poll, daemon=True) +poll_thread.start() +# ... run client synchronously ... +done_flag.set() +poll_thread.join() +``` + +**After polling (both patterns):** + +```python +agg = agg_readings(poll_readings) +inf_res_dict["gpu.peak_gpu_memory_mb"] = agg.get("peak_gpu_memory_mb") +inf_res_dict["gpu.model_load_memory_mb"] = load_mb +inf_res_dict["gpu.model_load_s"] = load_s +inf_res_dict["gpu.gpu_bandwidth_util_pct"] = agg.get("gpu_bandwidth_util_pct") +inf_res_dict["gpu.gpu_compute_util_pct"] = agg.get("gpu_compute_util_pct") +``` + +#### Ordering in `pytest_collection_modifyitems` and `pytest_generate_tests` + +**Collection sort** — `test_gpu_metric` must share rank with `test_metric`. Omitting +it from the rank dict defaults to 99, which runs it after `test_teardown`. + +```python +rank = { + ... + "test_metric": 4, + "test_gpu_metric": 4, # must be present + "test_print_results_table": 5, + "test_teardown": 6, +} +``` + +**Parametrize** — `test_gpu_metric` must be parametrized via `pytest_generate_tests`, +not via a `@pytest.mark.parametrize` decorator. Add an `elif` branch that produces one +instance per entry in `GPU_METRICS`. The fixture parameter name is `gpu_metric` +(singular): + +```python +def pytest_generate_tests(metafunc): + ... + elif "gpu_metric" in metafunc.fixturenames: + metafunc.parametrize( + "gpu_metric", + [k for k, _ in GPU_METRICS], + ids=[k for k, _ in GPU_METRICS], + ) +``` + +Without this branch, `test_gpu_metric` collects zero instances and produces no HTML rows. + +#### Gotchas + +- **`amd-smi` must run on the host, not inside the container.** Always use + `orch.exec_on_head(...)`, never `orch.exec_in_container(...)`. +- **`capture_gpu_metrics` can raise**; only `poll_gpu_metrics` guarantees never-raises. + Wrap one-shot snapshot calls in a `try/except` that returns `{}` (see `_snap()` above). +- **`model_load_memory_mb` should be `None` when VRAM data is unavailable**, not `0`. + Use `... or None` after the subtraction so a missing-data case is skipped by + `test_gpu_metric` rather than gated as a zero value. +- **`agg_readings` only returns 3 of the 5 metrics.** `model_load_memory_mb` and + `model_load_s` come from the caller's timing and snapshot code, not from the poll loop. +- **All poll readings use raw `gpu.*` keys** (e.g. `gpu.used_vram`), not derived metric + keys (e.g. `peak_gpu_memory_mb`). Do not pass raw snapshots to `evaluate_all`. +- **Threshold JSON keys use the `gpu.` prefix** (`"gpu.peak_gpu_memory_mb"`, not + `"peak_gpu_memory_mb"`). Entries without the prefix never match and silently produce + record-only rows even when `enforce_thresholds=True`. +- **Pass the full cell actuals dict to `evaluate_all`**, not just the single metric's + value. `min_ratio` specs look up a reference metric from `actuals`; passing a + single-key dict causes a reference-resolution failure. +- **`GATED_METRICS` coverage check**: if your `VariantConfig` validates that every + gated metric has a threshold entry, add all five `gpu.*` keys to your `GATED_METRICS` + set. Omitting them causes a silent green PASS with no assertions under + `enforce_thresholds=True` when specs are missing. diff --git a/cvs/lib/utils/docs/gpu-metrics.md b/cvs/lib/utils/docs/gpu-metrics.md new file mode 100644 index 00000000..970f242e --- /dev/null +++ b/cvs/lib/utils/docs/gpu-metrics.md @@ -0,0 +1,316 @@ +# GPU Metrics Polling — Integration Guide + +`cvs/lib/utils/gpu.py` is a shared library that any CVS inference suite can use to +collect GPU utilisation data during a benchmark run and surface it as rows in the +HTML report. This document explains what the library measures, how to wire it into a +new or existing suite, and how to configure thresholds. + +> **Prerequisite**: this guide assumes you have completed (or are familiar with) +> the steps in `cvs/lib/inference/ADDING_A_SUITE.md`. Concepts like `cell_key`, +> `GATED_METRICS`, and `inf_res_dict` structure are defined there. + +--- + +## What it measures + +Five derived metrics are produced per run: + +| Metric key | Unit | Description | +|---|---|---| +| `gpu.peak_gpu_memory_mb` | MB | Highest VRAM used across all GPUs at any single poll during inference. Each poll sums VRAM across all GPUs on the node; this value is the max of those sums. | +| `gpu.model_load_memory_mb` | MB | VRAM delta between a snapshot taken before model load and one taken after. Represents the memory cost of loading the model weights. | +| `gpu.model_load_s` | s | Wall-clock time from server start to the post-load snapshot. | +| `gpu.gpu_bandwidth_util_pct` | % | Mean UMC (unified memory controller) activity across all GPUs, averaged over all polls taken during inference. | +| `gpu.gpu_compute_util_pct` | % | Mean GFX (shader/compute) activity across all GPUs, averaged over all polls taken during inference. | + +Each metric appears as its own row in the HTML report, with value, unit, and a +pass/fail result if a threshold is configured. + +--- + +## How polling works + +1. **Pre-load snapshot** — `capture_gpu_metrics(orch)` is called before the server + starts. Records baseline VRAM. +2. **Server start + post-load snapshot** — after the server is ready, + `capture_gpu_metrics(orch)` is called again. The VRAM delta and elapsed time give + `model_load_memory_mb` and `model_load_s`. +3. **Client phase polling** — `poll_gpu_metrics(...)` is called (either synchronously + with a backgrounded client, or from a thread with a synchronous client) and calls + `amd-smi metric --json` on the head node every `poll_interval_s` seconds + (default 15 s) until `is_done_fn()` returns `True`. +4. **Aggregation** — after the client completes, `agg_readings(readings)` reduces the + poll list to `peak_gpu_memory_mb`, `gpu_compute_util_pct`, and + `gpu_bandwidth_util_pct`. +5. **Results stored** — all five derived metrics are written into `inf_res_dict` under + `gpu.` so `test_gpu_metric` can read them. + +`amd-smi` runs on the host node, not inside the container, via +`orch.exec_on_head("amd-smi metric --json")`. This is intentional — `amd-smi` is a +host-side tool and is not available inside the benchmark container. + +--- + +## Integrating into a suite + +### 1. Add the GPU polling block to `test__inference` + +The function signature must include `gpu_metrics_snap` (see Step 3). Wrap +`capture_gpu_metrics` in a helper that degrades gracefully if `amd-smi` is unavailable +at snapshot time — unlike `poll_gpu_metrics`, it can raise. + +**Pattern A — client is backgrounded by the framework (synchronous poll):** + +```python +import time +from cvs.lib.utils.gpu import GPU_METRICS, GPU_METRIC_UNITS, agg_readings, capture_gpu_metrics, poll_gpu_metrics + +def test__inference(orch, variant_config, inf_res_dict, gpu_metrics_snap, ...): + + def _snap(): + try: + return capture_gpu_metrics(orch) + except Exception: + return {} + + pre_snap = _snap() + t0 = time.monotonic() + # ... start server (returns immediately; framework backgrounds the client) ... + post_snap = _snap() + load_s = time.monotonic() - t0 + load_mb = ((post_snap.get("gpu.used_vram") or 0) - (pre_snap.get("gpu.used_vram") or 0)) or None + + poll_readings = poll_gpu_metrics( + orch, + is_done_fn=, # e.g. job.is_client_done + log_path=f"{variant_config.paths.log_dir}/gpu_poll.log", + model_load_s=load_s, + model_load_memory_mb=load_mb, + ) + + agg = agg_readings(poll_readings) + inf_res_dict["gpu.peak_gpu_memory_mb"] = agg.get("peak_gpu_memory_mb") + inf_res_dict["gpu.model_load_memory_mb"] = load_mb + inf_res_dict["gpu.model_load_s"] = load_s + inf_res_dict["gpu.gpu_bandwidth_util_pct"] = agg.get("gpu_bandwidth_util_pct") + inf_res_dict["gpu.gpu_compute_util_pct"] = agg.get("gpu_compute_util_pct") +``` + +**Pattern B — client runs synchronously in the main thread (thread the poll):** + +```python +import threading + + done_flag = threading.Event() + poll_readings = [] + def _poll(): + poll_readings.extend(poll_gpu_metrics( + orch, done_flag.is_set, + log_path=f"{variant_config.paths.log_dir}/gpu_poll.log", + model_load_s=load_s, + model_load_memory_mb=load_mb, + )) + poll_thread = threading.Thread(target=_poll, daemon=True) + poll_thread.start() + # ... run client synchronously ... + done_flag.set() + poll_thread.join() + # then aggregate as in Pattern A +``` + +### 2. Add `test_gpu_metric` + +`test_gpu_metric` is parametrized via `pytest_generate_tests` (see Step 4), not via a +`@pytest.mark.parametrize` decorator. The fixture parameter name is `gpu_metric` +(singular, matching the `pytest_generate_tests` branch). + +Pass the **full** per-cell actuals dict to `evaluate_all` — not just the single metric +— so that `min_ratio` threshold specs can resolve their reference metric: + +```python +from cvs.lib.utils.gpu import GPU_METRIC_UNITS +from cvs.lib.utils.verdict import ThresholdViolation, evaluate_all + +def test_gpu_metric(gpu_metric, inf_res_dict, variant_config, request): + val = inf_res_dict.get(gpu_metric) + unit = GPU_METRIC_UNITS.get(gpu_metric, "") + + request.node.user_properties.append(("metric_value", val)) + request.node.user_properties.append(("metric_unit", unit)) + + if val is None: + pytest.skip(f"{gpu_metric}: no value recorded (amd-smi unavailable or polling failed)") + + if not variant_config.enforce_thresholds: + return + + cell = variant_config.cell_key(isl, osl, concurrency) # same key used for test_metric + spec = (variant_config.thresholds.get(cell) or {}).get(gpu_metric) + if spec is None: + return # no spec → record-only + + # Pass full cell actuals so min_ratio specs can resolve their reference metric + cell_actuals = {k: inf_res_dict.get(k) for k in inf_res_dict} + try: + evaluate_all(cell_actuals, {gpu_metric: spec}) + except ThresholdViolation as exc: + pytest.fail(str(exc)) +``` + +### 3. Add `gpu_metrics_snap` fixture to `conftest.py` + +```python +@pytest.fixture(scope="module") +def gpu_metrics_snap(): + return {} +``` + +This fixture is a forward-declaration that lets `test_gpu_metric` be collected without +errors even if a future version stores intermediate state in it. + +### 4. Register `test_gpu_metric` in `pytest_collection_modifyitems` and `pytest_generate_tests` + +**Collection sort** — add `test_gpu_metric` at rank 4 alongside `test_metric`: + +```python +rank = { + "test_launch_container": 0, + "test_setup_sshd": 1, + "test_model_fetch": 2, + "test__inference": 3, + "test_metric": 4, + "test_gpu_metric": 4, # must be present; omitting → rank 99 → runs after teardown + "test_print_results_table": 5, + "test_teardown": 6, +} +``` + +**Parametrize** — add an `elif` branch to `pytest_generate_tests` in the test module. +The fixture name is `gpu_metric` (singular): + +```python +from cvs.lib.utils.gpu import GPU_METRICS + +def pytest_generate_tests(metafunc): + if "metric" in metafunc.fixturenames: + # ... your existing metric parametrize branch ... + elif "gpu_metric" in metafunc.fixturenames: + metafunc.parametrize( + "gpu_metric", + [k for k, _ in GPU_METRICS], + ids=[k for k, _ in GPU_METRICS], + ) +``` + +Without this branch `test_gpu_metric` collects zero instances and produces no HTML rows. + +### 5. Add threshold entries and update `GATED_METRICS` + +**Threshold JSON** — threshold keys use the `gpu.` prefix. For each sweep cell: + +```json +"isl1000_osl1000_conc16": { + "client.total_token_throughput": { "kind": "min_tok_s", "value": 1000 }, + "gpu.peak_gpu_memory_mb": { "kind": "max", "value": 200000 }, + "gpu.model_load_memory_mb": { "kind": "max", "value": 150000 }, + "gpu.model_load_s": { "kind": "max", "value": 300 }, + "gpu.gpu_bandwidth_util_pct": { "kind": "min", "value": 10 }, + "gpu.gpu_compute_util_pct": { "kind": "min", "value": 5 } +} +``` + +**`GATED_METRICS`** — if your `VariantConfig` subclass validates that every gated +metric has a threshold entry (the two-axis coverage check in `ADDING_A_SUITE.md` +Step 2), add all five `gpu.*` keys to your `GATED_METRICS` set: + +```python +GATED_METRICS = { + "client.total_token_throughput", + ... + "gpu.peak_gpu_memory_mb", + "gpu.model_load_memory_mb", + "gpu.model_load_s", + "gpu.gpu_bandwidth_util_pct", + "gpu.gpu_compute_util_pct", +} +``` + +Omitting them causes a silent green PASS with no assertions when `enforce_thresholds=True` +and the spec is missing. + +**First run / characterisation** — set `enforce_thresholds: false` in the suite config. +All five metrics will be collected and surfaced as HTML rows but will never cause a +test failure. Use the reported values to populate your threshold JSON, then flip +`enforce_thresholds` to `true`. + +See `docs/threshold-kinds.md` for the full threshold kind reference (`min`, `max`, +`max_ms`, `within`, `min_tok_s`, `min_ratio`). + +--- + +## The `gpu_poll.log` file + +Every run writes a `gpu_poll.log` to the suite's `log_dir`. It contains one line per +poll and a summary block: + +``` +[gpu poll 1/?] used_vram=131072 MB gfx=87% umc=74% mm=0% +[gpu poll 2/?] used_vram=132864 MB gfx=91% umc=78% mm=0% +... +[gpu poll 12/?] used_vram=132480 MB gfx=89% umc=76% mm=0% [done] + +--- summary --- +samples: 12 +peak_gpu_memory_mb: 132864 MB +model_load_memory_mb: 127418 MB +model_load_s: 148.3 s +gpu_compute_util_pct: 89.2 % +gpu_bandwidth_util_pct: 76.1 % +``` + +A poll that fails (e.g. `amd-smi` exits non-zero or returns unparseable JSON) is +logged with a `FAILED [N/max consecutive]` tag and excluded from aggregation. After +`max_consecutive_failures` (default 3) consecutive failures the loop stops early and +logs a warning. + +--- + +## Failure handling and None values + +The library never raises from `poll_gpu_metrics`. Every metric can be `None`: + +| Situation | Result | +|---|---| +| `amd-smi` fails or returns unparseable JSON | snapshot excluded from aggregation; metric may be `None` if all polls fail | +| GPU reports `"N/A"` for a field | that field is `None` in the snapshot | +| Zero valid polls | all three `agg_readings` outputs are `None` | +| Caller passes `model_load_memory_mb=None` | stored as `None`; `test_gpu_metric` should `pytest.skip` rather than fail | + +`test_gpu_metric` should always check for `None` before evaluating thresholds. +`pytest.skip` (not `pytest.fail`) is the correct response when a metric is `None` — +the metric was unavailable for this run, not a regression. + +--- + +## Gotchas + +- **`model_load_memory_mb` should be `None` when VRAM data is unavailable, not `0`.** + Use `... or None` after the subtraction (as shown in Step 1). A zero stored as `0` + gets gated against thresholds and displayed as `"0"` in the report; `None` causes + `test_gpu_metric` to skip instead. +- **`capture_gpu_metrics` can raise; `poll_gpu_metrics` never does.** Always wrap + one-shot snapshot calls in a `try/except` that returns `{}` on failure. +- **`agg_readings` returns 3 keys, not 5.** `model_load_memory_mb` and `model_load_s` + are measured by the caller and stored separately. Do not look for them in + `agg_readings` output. +- **Raw snapshot keys differ from derived metric keys.** The poll loop returns dicts + with keys like `gpu.used_vram`; the stored/threshold-gated keys use names like + `gpu.peak_gpu_memory_mb`. Do not pass raw snapshots to `evaluate_all`. +- **Threshold JSON keys use the `gpu.` prefix** (`"gpu.peak_gpu_memory_mb"`, not + `"peak_gpu_memory_mb"`). A missing prefix means the spec is never found and the + metric silently operates as record-only even when `enforce_thresholds=True`. +- **`amd-smi` runs on the host, not in the container.** If your orchestrator does not + support `exec_on_head`, GPU polling is not available for your suite. +- **Pass the full cell actuals dict to `evaluate_all`.** `min_ratio` threshold specs + need to resolve a reference metric from `actuals`. Passing only the single metric's + value causes a reference-resolution failure. From 81313f42e06970903acd23ea9e733e57e6d8385e Mon Sep 17 00:00:00 2001 From: Atul Nair Date: Thu, 25 Jun 2026 16:37:44 -0400 Subject: [PATCH 7/7] fix(vllm_single): write gpu_poll.log to local report dir so it lands in zip bundle Previously the log was written to a tempfile then uploaded to the NFS out_dir; because the zip plugin only bundles the local html report directory, the log never appeared in the run archive. Now it is written directly into the _test_html_dir folder (e.g. vllm_single_html/) so every run archive contains the poll log alongside the per-test HTML files. The NFS upload is kept for cluster-side access. Update gpu-metrics.md integration guide to match the correct log_path pattern and to describe where the log lands. --- cvs/lib/utils/docs/gpu-metrics.md | 21 +++++++++++++++++---- cvs/tests/inference/vllm/vllm_single.py | 20 ++++++++++++++------ 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/cvs/lib/utils/docs/gpu-metrics.md b/cvs/lib/utils/docs/gpu-metrics.md index 970f242e..f4398195 100644 --- a/cvs/lib/utils/docs/gpu-metrics.md +++ b/cvs/lib/utils/docs/gpu-metrics.md @@ -62,10 +62,11 @@ at snapshot time — unlike `poll_gpu_metrics`, it can raise. **Pattern A — client is backgrounded by the framework (synchronous poll):** ```python +import pathlib import time from cvs.lib.utils.gpu import GPU_METRICS, GPU_METRIC_UNITS, agg_readings, capture_gpu_metrics, poll_gpu_metrics -def test__inference(orch, variant_config, inf_res_dict, gpu_metrics_snap, ...): +def test__inference(orch, variant_config, inf_res_dict, gpu_metrics_snap, request, ...): def _snap(): try: @@ -80,10 +81,18 @@ def test__inference(orch, variant_config, inf_res_dict, gpu_metrics_s load_s = time.monotonic() - t0 load_mb = ((post_snap.get("gpu.used_vram") or 0) - (pre_snap.get("gpu.used_vram") or 0)) or None + # Write the log into the local report dir so it lands in the zip bundle. + _htmlpath = getattr(request.config.option, "htmlpath", None) + _html_dir = getattr(request.config, "_test_html_dir", "test_html") + _gpu_log = ( + pathlib.Path(_htmlpath).parent / _html_dir / "gpu_poll.log" + if _htmlpath else None + ) + poll_readings = poll_gpu_metrics( orch, is_done_fn=, # e.g. job.is_client_done - log_path=f"{variant_config.paths.log_dir}/gpu_poll.log", + log_path=str(_gpu_log) if _gpu_log else None, model_load_s=load_s, model_load_memory_mb=load_mb, ) @@ -250,8 +259,12 @@ See `docs/threshold-kinds.md` for the full threshold kind reference (`min`, `max ## The `gpu_poll.log` file -Every run writes a `gpu_poll.log` to the suite's `log_dir`. It contains one line per -poll and a summary block: +Every run writes `gpu_poll.log` into the local HTML report directory (the same folder +as the per-test HTML files, e.g. `vllm_single_html/`). Because the zip bundle includes +that directory, the log is always available in the run archive. It is also copied to +the suite's NFS `out_dir` on the head node for cluster-side inspection. + +The file contains one line per poll and a summary block: ``` [gpu poll 1/?] used_vram=131072 MB gfx=87% umc=74% mm=0% diff --git a/cvs/tests/inference/vllm/vllm_single.py b/cvs/tests/inference/vllm/vllm_single.py index cb19c453..0264be65 100644 --- a/cvs/tests/inference/vllm/vllm_single.py +++ b/cvs/tests/inference/vllm/vllm_single.py @@ -278,11 +278,19 @@ def test_vllm_inference( job.run_client() # Poll GPU while client runs. - # gpu_poll.log is written locally then copied into the remote out_dir - # (which is an NFS path on the node, not mounted on the devbox). - import tempfile as _tempfile - - _gpu_log_local = _pl.Path(_tempfile.mkdtemp()) / "gpu_poll.log" + # Write gpu_poll.log into the local HTML report dir so it lands in the + # zip bundle. Fall back to a tempfile when --html is not passed. + _htmlpath = getattr(request.config.option, "htmlpath", None) + _html_dir_name = getattr(request.config, "_test_html_dir", "test_html") + if _htmlpath: + _gpu_log_local = ( + _pl.Path(_htmlpath).parent + / _html_dir_name + / f"gpu_poll_isl{isl}_osl{osl}_conc{concurrency}.log" + ) + else: + import tempfile as _tempfile + _gpu_log_local = _pl.Path(_tempfile.mkdtemp()) / "gpu_poll.log" _gpu_log_remote = f"{job.out_dir}/gpu_poll.log" _model_load_mb = ( (gpu_metrics_snap.get((cell_key, "loaded"), {}).get("gpu.used_vram") or 0) @@ -298,7 +306,7 @@ def test_vllm_inference( model_load_s=_model_load_s, model_load_memory_mb=_model_load_mb, ) - # Copy log into the node's out_dir (NFS) so it lands in the bundle. + # Also copy the log into the node's out_dir (NFS) for cluster-side access. if _gpu_log_local.exists(): try: import base64 as _b64