From b62c4e2fa5b5ddf17fd6d1c1d400d402eaad58c2 Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Thu, 19 Feb 2026 14:25:29 +0100 Subject: [PATCH 1/8] Minor add: analyze_workload, get burst params --- .../analyze_workload_logs.py | 266 +++++++++++++++++- data/workload_statistics/workload_logs.txt | 44 +-- test/test_inspect_workloadgen.py | 6 +- 3 files changed, 291 insertions(+), 25 deletions(-) diff --git a/data/workload_statistics/analyze_workload_logs.py b/data/workload_statistics/analyze_workload_logs.py index 0da95d0..7911c38 100644 --- a/data/workload_statistics/analyze_workload_logs.py +++ b/data/workload_statistics/analyze_workload_logs.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Summarize workload logs per file and estimate burst probabilities. +"""Summarize workload logs per file and estimate burst parameters. For each input file, compute: - arrivals per hour: mean, stddev @@ -12,6 +12,11 @@ - burst probability suggestions for: - wg-burst-small-prob - wg-burst-heavy-prob +- burst boundary suggestions for: + - wg-burst-*-jobs-min/max + - wg-burst-*-duration-min/max + - wg-burst-*-nodes-min/max + - wg-burst-*-cores-min/max The script supports: - whitespace-delimited Slurm-like logs (as in data-internal/allusers-*.log) @@ -23,7 +28,7 @@ import argparse import csv import math -from collections import Counter +from collections import Counter, defaultdict from datetime import datetime, timedelta from pathlib import Path from statistics import fmean, stdev @@ -34,6 +39,7 @@ DURATION_CANDIDATES = ("ElapsedRaw", "elapsed_raw", "ELAPSEDRAW", "duration_seconds", "duration") NODES_CANDIDATES = ("NNodes", "nnodes", "NNODES", "nodes") CORES_CANDIDATES = ("NCPUS", "ncpus", "NCPUs", "cores", "cores_per_node") +TOTAL_CORES_COLUMNS = {"NCPUS", "ncpus", "NCPUs"} def pick_column(columns: Sequence[str], candidates: Sequence[str]) -> str: @@ -129,6 +135,39 @@ def parse_min_max(raw: str) -> Tuple[int, int]: return lo, hi +def parse_quantile_range(raw: str) -> Tuple[float, float]: + parts = [p.strip() for p in str(raw).split(":")] + if len(parts) != 2: + raise argparse.ArgumentTypeError("Expected q_low:q_high in [0,1]") + try: + q_low = float(parts[0]) + q_high = float(parts[1]) + except ValueError as exc: + raise argparse.ArgumentTypeError(f"Invalid quantile pair '{raw}'") from exc + if not (0.0 <= q_low <= 1.0 and 0.0 <= q_high <= 1.0): + raise argparse.ArgumentTypeError("Quantiles must be in [0,1]") + if q_low > q_high: + raise argparse.ArgumentTypeError("Expected q_low <= q_high") + return q_low, q_high + + +def suggest_int_range(values: Sequence[float], q_low: float, q_high: float, minimum: int = 1) -> Tuple[float, float]: + finite_values = [float(v) for v in values if math.isfinite(float(v))] + if not finite_values: + return float("nan"), float("nan") + lo = int(math.floor(quantile(finite_values, q_low))) + hi = int(math.ceil(quantile(finite_values, q_high))) + lo = max(lo, minimum) + hi = max(hi, lo) + return float(lo), float(hi) + + +def fmt_suggested_int(value: float) -> str: + if not math.isfinite(value): + return "nan" + return str(int(round(value))) + + def hourly_axis(arrivals_by_hour: Counter, include_zero_hours: bool) -> List[datetime]: if not arrivals_by_hour: return [] @@ -193,10 +232,17 @@ def summarize_file( assumed_heavy_jobs_min: int, assumed_heavy_jobs_max: int, baseline_quantile: float, + burst_event_quantile: float, + burst_range_quantile_low: float, + burst_range_quantile_high: float, ) -> Dict[str, float]: arrivals_by_hour: Counter = Counter() small_jobs_by_hour: Counter = Counter() heavy_jobs_by_hour: Counter = Counter() + small_job_attrs_by_hour = defaultdict(list) + heavy_job_attrs_by_hour = defaultdict(list) + small_job_attrs_all: List[Tuple[float, float, float]] = [] + heavy_job_attrs_all: List[Tuple[float, float, float]] = [] durations_h: List[float] = [] nodes: List[float] = [] cores: List[float] = [] @@ -229,6 +275,24 @@ def summarize_file( "heavy_volume_prob": float("nan"), "suggested_wg_burst_small_prob": float("nan"), "suggested_wg_burst_heavy_prob": float("nan"), + "suggested_wg_burst_small_jobs_min": float("nan"), + "suggested_wg_burst_small_jobs_max": float("nan"), + "suggested_wg_burst_small_duration_min": float("nan"), + "suggested_wg_burst_small_duration_max": float("nan"), + "suggested_wg_burst_small_nodes_min": float("nan"), + "suggested_wg_burst_small_nodes_max": float("nan"), + "suggested_wg_burst_small_cores_min": float("nan"), + "suggested_wg_burst_small_cores_max": float("nan"), + "suggested_wg_burst_heavy_jobs_min": float("nan"), + "suggested_wg_burst_heavy_jobs_max": float("nan"), + "suggested_wg_burst_heavy_duration_min": float("nan"), + "suggested_wg_burst_heavy_duration_max": float("nan"), + "suggested_wg_burst_heavy_nodes_min": float("nan"), + "suggested_wg_burst_heavy_nodes_max": float("nan"), + "suggested_wg_burst_heavy_cores_min": float("nan"), + "suggested_wg_burst_heavy_cores_max": float("nan"), + "small_burst_hours_detected": float("nan"), + "heavy_burst_hours_detected": float("nan"), } # Determine column mapping from the first row, then process all rows (including first). @@ -237,6 +301,7 @@ def summarize_file( duration_col = pick_column(keys, DURATION_CANDIDATES) nodes_col = pick_column(keys, NODES_CANDIDATES) cores_col = pick_column(keys, CORES_CANDIDATES) + cores_is_total = cores_col in TOTAL_CORES_COLUMNS def consume(row: Dict[str, str]) -> None: nonlocal skipped @@ -244,15 +309,27 @@ def consume(row: Dict[str, str]) -> None: hour = parse_submit_hour(row[submit_col]) duration = parse_duration_hours(row[duration_col]) node_count = float(row[nodes_col]) - core_count = float(row[cores_col]) + core_raw = float(row[cores_col]) + if cores_is_total and node_count > 0.0: + core_count = core_raw / node_count + else: + core_count = core_raw except Exception: skipped += 1 return arrivals_by_hour[hour] += 1 - if duration <= small_duration_max and node_count <= small_nodes_max and core_count <= small_cores_max: + is_small = duration <= small_duration_max and node_count <= small_nodes_max and core_count <= small_cores_max + is_heavy = duration >= heavy_duration_min and node_count >= heavy_nodes_min and core_count >= heavy_cores_min + if is_small: small_jobs_by_hour[hour] += 1 - if duration >= heavy_duration_min and node_count >= heavy_nodes_min and core_count >= heavy_cores_min: + attrs = (duration, node_count, core_count) + small_job_attrs_by_hour[hour].append(attrs) + small_job_attrs_all.append(attrs) + if is_heavy: heavy_jobs_by_hour[hour] += 1 + attrs = (duration, node_count, core_count) + heavy_job_attrs_by_hour[hour].append(attrs) + heavy_job_attrs_all.append(attrs) durations_h.append(duration) nodes.append(node_count) cores.append(core_count) @@ -298,8 +375,10 @@ def consume(row: Dict[str, str]) -> None: small_expected_jobs_per_burst = (float(assumed_small_jobs_min) + float(assumed_small_jobs_max)) / 2.0 heavy_expected_jobs_per_burst = (float(assumed_heavy_jobs_min) + float(assumed_heavy_jobs_max)) / 2.0 - small_excess = sum(max(v - small_baseline, 0.0) for v in small_series) - heavy_excess = sum(max(v - heavy_baseline, 0.0) for v in heavy_series) + small_excess_series = [max(v - small_baseline, 0.0) for v in small_series] + heavy_excess_series = [max(v - heavy_baseline, 0.0) for v in heavy_series] + small_excess = sum(small_excess_series) + heavy_excess = sum(heavy_excess_series) if n_hours > 0 and small_expected_jobs_per_burst > 0.0: small_volume_prob = min(max(small_excess / (n_hours * small_expected_jobs_per_burst), 0.0), 1.0) @@ -326,6 +405,109 @@ def consume(row: Dict[str, str]) -> None: else: suggested_heavy_prob = float("nan") + positive_small_excess = [v for v in small_excess_series if v > 0.0] + positive_heavy_excess = [v for v in heavy_excess_series if v > 0.0] + + if positive_small_excess: + small_burst_threshold = quantile(positive_small_excess, burst_event_quantile) + small_burst_hours = { + hour + for hour, excess in zip(axis, small_excess_series) + if excess >= small_burst_threshold and excess > 0.0 + } + small_burst_jobs = [ + excess + for hour, excess in zip(axis, small_excess_series) + if hour in small_burst_hours + ] + else: + small_burst_hours = set() + small_burst_jobs = [] + + if positive_heavy_excess: + heavy_burst_threshold = quantile(positive_heavy_excess, burst_event_quantile) + heavy_burst_hours = { + hour + for hour, excess in zip(axis, heavy_excess_series) + if excess >= heavy_burst_threshold and excess > 0.0 + } + heavy_burst_jobs = [ + excess + for hour, excess in zip(axis, heavy_excess_series) + if hour in heavy_burst_hours + ] + else: + heavy_burst_hours = set() + heavy_burst_jobs = [] + + small_burst_attrs = [ + attrs + for hour in small_burst_hours + for attrs in small_job_attrs_by_hour.get(hour, []) + ] + heavy_burst_attrs = [ + attrs + for hour in heavy_burst_hours + for attrs in heavy_job_attrs_by_hour.get(hour, []) + ] + + # Fallback to all matching jobs when no burst hours were isolated. + if not small_burst_attrs: + small_burst_attrs = small_job_attrs_all + if not heavy_burst_attrs: + heavy_burst_attrs = heavy_job_attrs_all + + small_jobs_min_s, small_jobs_max_s = suggest_int_range( + small_burst_jobs, + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + heavy_jobs_min_s, heavy_jobs_max_s = suggest_int_range( + heavy_burst_jobs, + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + + small_duration_min_s, small_duration_max_s = suggest_int_range( + [a[0] for a in small_burst_attrs], + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + small_nodes_min_s, small_nodes_max_s = suggest_int_range( + [a[1] for a in small_burst_attrs], + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + small_cores_min_s, small_cores_max_s = suggest_int_range( + [a[2] for a in small_burst_attrs], + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + + heavy_duration_min_s, heavy_duration_max_s = suggest_int_range( + [a[0] for a in heavy_burst_attrs], + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + heavy_nodes_min_s, heavy_nodes_max_s = suggest_int_range( + [a[1] for a in heavy_burst_attrs], + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + heavy_cores_min_s, heavy_cores_max_s = suggest_int_range( + [a[2] for a in heavy_burst_attrs], + burst_range_quantile_low, + burst_range_quantile_high, + minimum=1, + ) + return { "jobs": len(durations_h), "skipped_rows": skipped, @@ -348,6 +530,24 @@ def consume(row: Dict[str, str]) -> None: "heavy_volume_prob": heavy_volume_prob, "suggested_wg_burst_small_prob": suggested_small_prob, "suggested_wg_burst_heavy_prob": suggested_heavy_prob, + "suggested_wg_burst_small_jobs_min": small_jobs_min_s, + "suggested_wg_burst_small_jobs_max": small_jobs_max_s, + "suggested_wg_burst_small_duration_min": small_duration_min_s, + "suggested_wg_burst_small_duration_max": small_duration_max_s, + "suggested_wg_burst_small_nodes_min": small_nodes_min_s, + "suggested_wg_burst_small_nodes_max": small_nodes_max_s, + "suggested_wg_burst_small_cores_min": small_cores_min_s, + "suggested_wg_burst_small_cores_max": small_cores_max_s, + "suggested_wg_burst_heavy_jobs_min": heavy_jobs_min_s, + "suggested_wg_burst_heavy_jobs_max": heavy_jobs_max_s, + "suggested_wg_burst_heavy_duration_min": heavy_duration_min_s, + "suggested_wg_burst_heavy_duration_max": heavy_duration_max_s, + "suggested_wg_burst_heavy_nodes_min": heavy_nodes_min_s, + "suggested_wg_burst_heavy_nodes_max": heavy_nodes_max_s, + "suggested_wg_burst_heavy_cores_min": heavy_cores_min_s, + "suggested_wg_burst_heavy_cores_max": heavy_cores_max_s, + "small_burst_hours_detected": float(len(small_burst_hours)), + "heavy_burst_hours_detected": float(len(heavy_burst_hours)), } @@ -388,6 +588,18 @@ def main() -> None: default=0.5, help="Quantile used as non-burst baseline for hourly small/heavy counts (default: 0.5).", ) + parser.add_argument( + "--burst-event-quantile", + type=float, + default=0.75, + help="Quantile of positive baseline-adjusted hourly counts used to isolate burst hours (default: 0.75).", + ) + parser.add_argument( + "--burst-range-quantiles", + type=parse_quantile_range, + default=(0.1, 0.9), + help="Quantile range q_low:q_high for suggested burst min/max boundaries (default: 0.1:0.9).", + ) args = parser.parse_args() if args.files: @@ -413,6 +625,9 @@ def main() -> None: assumed_heavy_jobs_min=int(args.assumed_burst_heavy_jobs[0]), assumed_heavy_jobs_max=int(args.assumed_burst_heavy_jobs[1]), baseline_quantile=float(args.baseline_quantile), + burst_event_quantile=float(args.burst_event_quantile), + burst_range_quantile_low=float(args.burst_range_quantiles[0]), + burst_range_quantile_high=float(args.burst_range_quantiles[1]), ) print(f"\n=== {path.name} ===") print(f"jobs={stats['jobs']} skipped_rows={stats['skipped_rows']}") @@ -445,10 +660,45 @@ def main() -> None: f"heavy_event={stats['heavy_event_prob']:.4f} " f"heavy_volume={stats['heavy_volume_prob']:.4f}" ) + print( + "burst_hours_detected: " + f"small={fmt_suggested_int(stats['small_burst_hours_detected'])} " + f"heavy={fmt_suggested_int(stats['heavy_burst_hours_detected'])}" + ) + print( + "suggested_small_ranges: " + f"jobs={fmt_suggested_int(stats['suggested_wg_burst_small_jobs_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_small_jobs_max'])} " + f"duration={fmt_suggested_int(stats['suggested_wg_burst_small_duration_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_small_duration_max'])} " + f"nodes={fmt_suggested_int(stats['suggested_wg_burst_small_nodes_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_small_nodes_max'])} " + f"cores={fmt_suggested_int(stats['suggested_wg_burst_small_cores_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_small_cores_max'])}" + ) + print( + "suggested_heavy_ranges: " + f"jobs={fmt_suggested_int(stats['suggested_wg_burst_heavy_jobs_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_heavy_jobs_max'])} " + f"duration={fmt_suggested_int(stats['suggested_wg_burst_heavy_duration_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_heavy_duration_max'])} " + f"nodes={fmt_suggested_int(stats['suggested_wg_burst_heavy_nodes_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_heavy_nodes_max'])} " + f"cores={fmt_suggested_int(stats['suggested_wg_burst_heavy_cores_min'])}:{fmt_suggested_int(stats['suggested_wg_burst_heavy_cores_max'])}" + ) print( "suggested_flags: " f"--wg-burst-small-prob {stats['suggested_wg_burst_small_prob']:.4f} " - f"--wg-burst-heavy-prob {stats['suggested_wg_burst_heavy_prob']:.4f}" + f"--wg-burst-heavy-prob {stats['suggested_wg_burst_heavy_prob']:.4f} " + f"--wg-burst-small-jobs-min {fmt_suggested_int(stats['suggested_wg_burst_small_jobs_min'])} " + f"--wg-burst-small-jobs-max {fmt_suggested_int(stats['suggested_wg_burst_small_jobs_max'])} " + f"--wg-burst-small-duration-min {fmt_suggested_int(stats['suggested_wg_burst_small_duration_min'])} " + f"--wg-burst-small-duration-max {fmt_suggested_int(stats['suggested_wg_burst_small_duration_max'])} " + f"--wg-burst-small-nodes-min {fmt_suggested_int(stats['suggested_wg_burst_small_nodes_min'])} " + f"--wg-burst-small-nodes-max {fmt_suggested_int(stats['suggested_wg_burst_small_nodes_max'])} " + f"--wg-burst-small-cores-min {fmt_suggested_int(stats['suggested_wg_burst_small_cores_min'])} " + f"--wg-burst-small-cores-max {fmt_suggested_int(stats['suggested_wg_burst_small_cores_max'])} " + f"--wg-burst-heavy-jobs-min {fmt_suggested_int(stats['suggested_wg_burst_heavy_jobs_min'])} " + f"--wg-burst-heavy-jobs-max {fmt_suggested_int(stats['suggested_wg_burst_heavy_jobs_max'])} " + f"--wg-burst-heavy-duration-min {fmt_suggested_int(stats['suggested_wg_burst_heavy_duration_min'])} " + f"--wg-burst-heavy-duration-max {fmt_suggested_int(stats['suggested_wg_burst_heavy_duration_max'])} " + f"--wg-burst-heavy-nodes-min {fmt_suggested_int(stats['suggested_wg_burst_heavy_nodes_min'])} " + f"--wg-burst-heavy-nodes-max {fmt_suggested_int(stats['suggested_wg_burst_heavy_nodes_max'])} " + f"--wg-burst-heavy-cores-min {fmt_suggested_int(stats['suggested_wg_burst_heavy_cores_min'])} " + f"--wg-burst-heavy-cores-max {fmt_suggested_int(stats['suggested_wg_burst_heavy_cores_max'])}" ) diff --git a/data/workload_statistics/workload_logs.txt b/data/workload_statistics/workload_logs.txt index 230281c..921fa64 100644 --- a/data/workload_statistics/workload_logs.txt +++ b/data/workload_statistics/workload_logs.txt @@ -4,11 +4,14 @@ hours_observed=788 arrivals_per_hour: mean=4.2119 std=77.5734 duration_hours: mean=7.4852 std=29.6084 nodes: mean=1.0054 std=0.1202 -cores: mean=14.9328 std=30.1301 -corr(duration, nodes)=0.007262 corr(duration, cores)=0.628780 +cores: mean=14.8503 std=30.0671 +corr(duration, nodes)=0.007262 corr(duration, cores)=0.629600 burst_baseline_jobs_per_hour: small=0.0000 heavy=0.0000 burst_prob_estimates: small_event=0.0025 small_volume=0.0243 heavy_event=0.0000 heavy_volume=0.0000 -suggested_flags: --wg-burst-small-prob 0.0025 --wg-burst-heavy-prob 0.0000 +burst_hours_detected: small=11 heavy=0 +suggested_small_ranges: jobs=3:767 duration=1:2 nodes=1:1 cores=2:6 +suggested_heavy_ranges: jobs=nan:nan duration=nan:nan nodes=nan:nan cores=nan:nan +suggested_flags: --wg-burst-small-prob 0.0025 --wg-burst-heavy-prob 0.0000 --wg-burst-small-jobs-min 3 --wg-burst-small-jobs-max 767 --wg-burst-small-duration-min 1 --wg-burst-small-duration-max 2 --wg-burst-small-nodes-min 1 --wg-burst-small-nodes-max 1 --wg-burst-small-cores-min 2 --wg-burst-small-cores-max 6 --wg-burst-heavy-jobs-min nan --wg-burst-heavy-jobs-max nan --wg-burst-heavy-duration-min nan --wg-burst-heavy-duration-max nan --wg-burst-heavy-nodes-min nan --wg-burst-heavy-nodes-max nan --wg-burst-heavy-cores-min nan --wg-burst-heavy-cores-max nan === allusers-grid-30.log === jobs=40639 skipped_rows=1 @@ -20,7 +23,10 @@ cores: mean=7.9984 std=0.1122 corr(duration, nodes)=nan corr(duration, cores)=0.026772 burst_baseline_jobs_per_hour: small=0.0000 heavy=0.0000 burst_prob_estimates: small_event=0.0423 small_volume=0.0752 heavy_event=0.0000 heavy_volume=0.0000 -suggested_flags: --wg-burst-small-prob 0.0423 --wg-burst-heavy-prob 0.0000 +burst_hours_detected: small=79 heavy=0 +suggested_small_ranges: jobs=15:303 duration=1:7 nodes=1:1 cores=8:8 +suggested_heavy_ranges: jobs=nan:nan duration=nan:nan nodes=nan:nan cores=nan:nan +suggested_flags: --wg-burst-small-prob 0.0423 --wg-burst-heavy-prob 0.0000 --wg-burst-small-jobs-min 15 --wg-burst-small-jobs-max 303 --wg-burst-small-duration-min 1 --wg-burst-small-duration-max 7 --wg-burst-small-nodes-min 1 --wg-burst-small-nodes-max 1 --wg-burst-small-cores-min 8 --wg-burst-small-cores-max 8 --wg-burst-heavy-jobs-min nan --wg-burst-heavy-jobs-max nan --wg-burst-heavy-duration-min nan --wg-burst-heavy-duration-max nan --wg-burst-heavy-nodes-min nan --wg-burst-heavy-nodes-max nan --wg-burst-heavy-cores-min nan --wg-burst-heavy-cores-max nan === allusers-high_mem-30.log === jobs=237373 skipped_rows=1 @@ -28,11 +34,14 @@ hours_observed=1385 arrivals_per_hour: mean=171.3884 std=611.6586 duration_hours: mean=0.2537 std=1.8726 nodes: mean=1.0010 std=0.1191 -cores: mean=13.6286 std=47.3460 -corr(duration, nodes)=0.405630 corr(duration, cores)=0.113031 +cores: mean=13.5372 std=45.6885 +corr(duration, nodes)=0.405630 corr(duration, cores)=0.041992 burst_baseline_jobs_per_hour: small=129.0000 heavy=0.0000 burst_prob_estimates: small_event=0.1329 small_volume=0.9510 heavy_event=0.0043 heavy_volume=0.0011 -suggested_flags: --wg-burst-small-prob 0.1329 --wg-burst-heavy-prob 0.0043 +burst_hours_detected: small=47 heavy=2 +suggested_small_ranges: jobs=1407:4689 duration=1:1 nodes=1:1 cores=4:4 +suggested_heavy_ranges: jobs=2:4 duration=168:169 nodes=10:16 cores=64:64 +suggested_flags: --wg-burst-small-prob 0.1329 --wg-burst-heavy-prob 0.0043 --wg-burst-small-jobs-min 1407 --wg-burst-small-jobs-max 4689 --wg-burst-small-duration-min 1 --wg-burst-small-duration-max 1 --wg-burst-small-nodes-min 1 --wg-burst-small-nodes-max 1 --wg-burst-small-cores-min 4 --wg-burst-small-cores-max 4 --wg-burst-heavy-jobs-min 2 --wg-burst-heavy-jobs-max 4 --wg-burst-heavy-duration-min 168 --wg-burst-heavy-duration-max 169 --wg-burst-heavy-nodes-min 10 --wg-burst-heavy-nodes-max 16 --wg-burst-heavy-cores-min 64 --wg-burst-heavy-cores-max 64 === allusers-long-30.log === jobs=706918 skipped_rows=1 @@ -40,11 +49,14 @@ hours_observed=1381 arrivals_per_hour: mean=511.8885 std=1308.5007 duration_hours: mean=3.2965 std=13.0609 nodes: mean=1.0137 std=0.6736 -cores: mean=7.5031 std=54.1584 -corr(duration, nodes)=0.030925 corr(duration, cores)=0.020810 +cores: mean=6.4411 std=10.4744 +corr(duration, nodes)=0.030925 corr(duration, cores)=-0.065966 burst_baseline_jobs_per_hour: small=319.0000 heavy=0.0000 burst_prob_estimates: small_event=0.2274 small_volume=1.0000 heavy_event=0.0210 heavy_volume=0.0072 -suggested_flags: --wg-burst-small-prob 0.2274 --wg-burst-heavy-prob 0.0210 +burst_hours_detected: small=84 heavy=15 +suggested_small_ranges: jobs=1685:6569 duration=1:3 nodes=1:1 cores=2:8 +suggested_heavy_ranges: jobs=2:6 duration=132:169 nodes=5:16 cores=64:96 +suggested_flags: --wg-burst-small-prob 0.2274 --wg-burst-heavy-prob 0.0210 --wg-burst-small-jobs-min 1685 --wg-burst-small-jobs-max 6569 --wg-burst-small-duration-min 1 --wg-burst-small-duration-max 3 --wg-burst-small-nodes-min 1 --wg-burst-small-nodes-max 1 --wg-burst-small-cores-min 2 --wg-burst-small-cores-max 8 --wg-burst-heavy-jobs-min 2 --wg-burst-heavy-jobs-max 6 --wg-burst-heavy-duration-min 132 --wg-burst-heavy-duration-max 169 --wg-burst-heavy-nodes-min 5 --wg-burst-heavy-nodes-max 16 --wg-burst-heavy-cores-min 64 --wg-burst-heavy-cores-max 96 === allusers-main-30.log === jobs=1448404 skipped_rows=1 @@ -52,11 +64,11 @@ hours_observed=752 arrivals_per_hour: mean=1926.0691 std=3618.8276 duration_hours: mean=0.3007 std=0.8709 nodes: mean=1.0007 std=0.1308 -cores: mean=2.9509 std=3.6442 -corr(duration, nodes)=0.008865 corr(duration, cores)=-0.027545 +cores: mean=2.9427 std=3.2604 +corr(duration, nodes)=0.008865 corr(duration, cores)=-0.032620 burst_baseline_jobs_per_hour: small=907.0000 heavy=0.0000 burst_prob_estimates: small_event=0.3697 small_volume=1.0000 heavy_event=0.0000 heavy_volume=0.0000 -suggested_flags: --wg-burst-small-prob 0.3697 --wg-burst-heavy-prob 0.0000 - - -NOTE: skipped_rows when duration = 0 rows appear. Not used in calculation. \ No newline at end of file +burst_hours_detected: small=71 heavy=0 +suggested_small_ranges: jobs=6084:13961 duration=1:1 nodes=1:1 cores=2:4 +suggested_heavy_ranges: jobs=nan:nan duration=nan:nan nodes=nan:nan cores=nan:nan +suggested_flags: --wg-burst-small-prob 0.3697 --wg-burst-heavy-prob 0.0000 --wg-burst-small-jobs-min 6084 --wg-burst-small-jobs-max 13961 --wg-burst-small-duration-min 1 --wg-burst-small-duration-max 1 --wg-burst-small-nodes-min 1 --wg-burst-small-nodes-max 1 --wg-burst-small-cores-min 2 --wg-burst-small-cores-max 4 --wg-burst-heavy-jobs-min nan --wg-burst-heavy-jobs-max nan --wg-burst-heavy-duration-min nan --wg-burst-heavy-duration-max nan --wg-burst-heavy-nodes-min nan --wg-burst-heavy-nodes-max nan --wg-burst-heavy-cores-min nan --wg-burst-heavy-cores-max nan \ No newline at end of file diff --git a/test/test_inspect_workloadgen.py b/test/test_inspect_workloadgen.py index 3065c8f..dbe03e4 100644 --- a/test/test_inspect_workloadgen.py +++ b/test/test_inspect_workloadgen.py @@ -3,7 +3,7 @@ python -m test.test_inspect_workloadgen --workload-gen poisson --wg-poisson-lambdas4 200,10,6,24 --wg-max-jobs-hour 1500 --hours 336 --plot --wg-burst-small-prob 0.2 --wg-burst-heavy-prob 0.02 """ -# inspect_workloadgen.py +# test_inspect_workloadgen.py import argparse import hashlib @@ -133,6 +133,8 @@ def main(): cpn = [t[3] for t in all_jobs_triplets] if all_jobs_triplets else [] axs[2, 0].hist(durations, bins=50) + if args.wg_burst_heavy_prob > 0.0: + axs[2, 0].set_yscale("log") axs[2, 0].set_title("Durations (hours)") axs[2, 0].set_xlabel("duration [h]") axs[2, 0].set_ylabel("count") @@ -143,6 +145,8 @@ def main(): axs[2, 1].set_ylabel("count") axs[3, 0].hist(cpn, bins=32) + if args.wg_burst_heavy_prob > 0.0: + axs[3, 0].set_yscale("log") axs[3, 0].set_title("Cores per node (Jobs shape/Volume)") axs[3, 0].set_xlabel("cores/node") axs[3, 0].set_ylabel("count") From cdd0e7d4c25b560005b879dddd6499a49c3b77d3 Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Thu, 19 Feb 2026 15:20:06 +0100 Subject: [PATCH 2/8] Add extra metric (baseline) for used nodes/cores and print them in evaluation mode as capacity/occupancy of cluster. --- src/baseline.py | 4 +++- src/environment.py | 6 +++++- src/metrics_tracker.py | 11 +++++++---- train.py | 5 ++++- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/baseline.py b/src/baseline.py index f385e43..5bc2cd4 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -13,6 +13,7 @@ ) from src.metrics_tracker import MetricsTracker from src.reward_calculation import power_cost +from src.config import CORES_PER_NODE def baseline_step( @@ -113,4 +114,5 @@ def baseline_step( baseline_cost_off = power_cost(num_used_nodes, 0, current_price) env_print(f" > baseline_cost_off: €{baseline_cost_off:.4f} | used nodes: {num_used_nodes}, idle nodes: 0") - return baseline_cost, baseline_cost_off, baseline_next_empty_slot, next_job_id + num_used_cores = np.sum((baseline_state['nodes'] > 0) * CORES_PER_NODE) + return baseline_cost, baseline_cost_off, baseline_next_empty_slot, next_job_id, num_used_nodes, num_used_cores diff --git a/src/environment.py b/src/environment.py index 5b61cbc..7e01bee 100644 --- a/src/environment.py +++ b/src/environment.py @@ -433,6 +433,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, self.metrics.episode_running_jobs_counts.append(num_running_jobs) self.metrics.episode_on_nodes.append(num_on_nodes) self.metrics.episode_used_nodes.append(num_used_nodes) + self.metrics.episode_used_cores.append(num_used_cores) self.metrics.episode_job_queue_sizes.append(num_unprocessed_jobs) self.metrics.episode_price_stats.append(current_price) @@ -453,7 +454,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, self.env_print(f"[5] Calculating reward...") # Baseline step - baseline_cost, baseline_cost_off, self.baseline_next_empty_slot, self.next_job_id = baseline_step( + baseline_cost, baseline_cost_off, self.baseline_next_empty_slot, self.next_job_id, baseline_num_used_nodes, baseline_num_used_cores = baseline_step( self.baseline_state, self.baseline_cores_available, self.baseline_running_jobs, current_price, new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores, self.baseline_next_empty_slot, self.next_job_id, self.metrics, self.env_print, @@ -465,6 +466,9 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, self.metrics.episode_baseline_cost += baseline_cost self.metrics.episode_baseline_cost_off += baseline_cost_off + self.metrics.episode_baseline_used_nodes.append(baseline_num_used_nodes) + self.metrics.episode_baseline_used_cores.append(baseline_num_used_cores) + step_reward, step_cost, eff_reward_norm, price_reward, idle_penalty_norm, job_age_penalty_norm = self.reward_calculator.calculate( num_used_nodes, num_idle_nodes, current_price, average_future_price, num_off_nodes, num_launched_jobs, num_node_changes, job_queue_2d, diff --git a/src/metrics_tracker.py b/src/metrics_tracker.py index aa78466..f9391fb 100644 --- a/src/metrics_tracker.py +++ b/src/metrics_tracker.py @@ -79,10 +79,13 @@ def reset_episode_metrics(self) -> None: self.episode_baseline_jobs_rejected_queue_full: int = 0 # Time series data for plotting (episode) - self.episode_on_nodes: list[int] = [] - self.episode_used_nodes: list[int] = [] - self.episode_job_queue_sizes: list[int] = [] - self.episode_price_stats: list[float] = [] + self.episode_on_nodes = [] + self.episode_used_nodes = [] + self.episode_used_cores = [] + self.episode_baseline_used_nodes = [] + self.episode_baseline_used_cores = [] + self.episode_job_queue_sizes = [] + self.episode_price_stats = [] self.episode_eff_rewards: list[float] = [] self.episode_price_rewards: list[float] = [] diff --git a/train.py b/train.py index 968e189..feff76a 100644 --- a/train.py +++ b/train.py @@ -13,6 +13,7 @@ import pandas as pd from src.workloadgen import WorkloadGenerator from src.workloadgen_cli import add_workloadgen_args, build_workloadgen_config +from src.config import MAX_NODES, CORES_PER_NODE, EPISODE_HOURS import time @@ -223,7 +224,9 @@ def main(): f"Jobs={env.metrics.jobs_completed}/{env.metrics.jobs_submitted} ({completion_rate:.0f}%), " f"AvgWait={avg_wait:.1f}h, " f"EpisodeMaxQueue={env.metrics.episode_max_queue_size_reached}, Dropped={env.metrics.episode_jobs_dropped}, " - f"MaxQueue={env.metrics.max_queue_size_reached}") + f"MaxQueue={env.metrics.max_queue_size_reached}, " + f"Agent Occupancy (Cores)={env.metrics.episode_used_cores[-1]*100/(CORES_PER_NODE*MAX_NODES)if env.metrics.episode_used_cores else 0 :.2f}%, Baseline Occupancy (Cores)={env.metrics.episode_baseline_used_cores[-1]*100/(CORES_PER_NODE*MAX_NODES) if env.metrics.episode_baseline_used_cores else 0 :.2f}% " + f"Agent Occupancy (Nodes)={env.metrics.episode_used_nodes[-1]*100/MAX_NODES if env.metrics.episode_used_nodes else 0 :.2f}%, Baseline Occupancy (Nodes)={env.metrics.episode_baseline_used_nodes[-1]*100/MAX_NODES if env.metrics.episode_baseline_used_nodes else 0 :.2f}% " ) print(f"\nEvaluation complete! Generated {num_episodes} episodes of cost data.") From 61dbd51c3730ccbee7f15600ab10cca0a82b4ef0 Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Fri, 20 Feb 2026 10:25:10 +0100 Subject: [PATCH 3/8] Analysis: add standalone lambda sweep script for occupancy/savings trendlines - add `analyze_lambda_occupancy.py` to run `train.py` in evaluation mode across Poisson lambdas - default run matches the provided training/evaluation flags (12-month evaluation, model/session/workload defaults) - parse per-episode `Agent Occupancy (Nodes)` and cumulative savings from `train.py` output - convert cumulative savings to per-episode savings and compute mean/std per lambda - implement adaptive lambda selection to cover occupancy range with minimal points, with `--lambdas` override - fit 3rd-order polynomial curves for: - lambda vs occupancy - occupancy vs savings - occupancy vs savings_off - export artifacts: `summary.csv`, `summary.json`, `trendlines.png`, plus per-lambda logs - add optional `--echo-train-output` and headless plotting support via matplotlib `Agg` Add: extra plots, showing completion rate vs lambda and effective savings --- analyze_lambda_occupancy.py | 725 ++++++++++++++++++++++++++++++++++++ 1 file changed, 725 insertions(+) create mode 100644 analyze_lambda_occupancy.py diff --git a/analyze_lambda_occupancy.py b/analyze_lambda_occupancy.py new file mode 100644 index 0000000..d78dcce --- /dev/null +++ b/analyze_lambda_occupancy.py @@ -0,0 +1,725 @@ +#!/usr/bin/env python3 +""" +Sweep workload Poisson lambda values and analyze: +1) lambda -> agent occupancy (nodes) +2) occupancy -> savings +3) occupancy -> savings_off +4) lambda -> completion rate +5) occupancy -> effective savings +6) occupancy -> effective savings_off + +For each lambda, this script runs train.py in evaluation mode for one year +(12 months = 24 episodes), parses per-episode metrics from stdout, computes +mean/std, and fits 3rd-order polynomial trend lines. + +FAST DEBUG MODE: python analyze_lambda_occupancy.py --eval-months 1 --lambdas 1200,2000,3500 --no-plot-dashboard +""" + +from __future__ import annotations + +import argparse +import csv +import json +import math +import re +import shlex +import subprocess +import sys +from dataclasses import asdict, dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Callable + +import matplotlib +matplotlib.use("Agg") +import matplotlib.pyplot as plt +import numpy as np + + +EPISODE_RE = re.compile( + r"Episode\s+(?P\d+):.*?" + r"Savings=€(?P-?[\d,]+(?:\.\d+)?)\/€(?P-?[\d,]+(?:\.\d+)?),.*?" + r"Jobs=[\d,]+\/[\d,]+\s+\((?P-?[\d.]+)%\),\s*" + r"AvgWait=(?P-?[\d.]+)h,.*?" + r"Agent Occupancy \(Nodes\)=\s*(?P-?[\d.]+)%", + re.MULTILINE, +) + +WAIT_SUMMARY_RE = re.compile( + r"=== JOB PROCESSING METRICS ===.*?" + r"Agent:.*?Average Wait Time:\s*(?P-?[\d.]+)\s*hours.*?" + r"Baseline:.*?Average Wait Time:\s*(?P-?[\d.]+)\s*hours", + re.DOTALL, +) + + +@dataclass +class LambdaRunStats: + lambda_value: int + episodes: int + occupancy_mean: float + occupancy_std: float + savings_mean: float + savings_std: float + savings_off_mean: float + savings_off_std: float + completion_rate_mean: float + completion_rate_std: float + agent_avg_wait_hours: float + baseline_avg_wait_hours: float + wait_delta_hours: float + effective_savings_mean: float + effective_savings_std: float + effective_savings_off_mean: float + effective_savings_off_std: float + annual_total_savings: float + annual_total_savings_off: float + command: list[str] + command_str: str + occupancy_samples: list[float] = field(default_factory=list) + savings_samples: list[float] = field(default_factory=list) + savings_off_samples: list[float] = field(default_factory=list) + completion_rate_samples: list[float] = field(default_factory=list) + effective_savings_samples: list[float] = field(default_factory=list) + effective_savings_off_samples: list[float] = field(default_factory=list) + + +def _to_float(raw: str) -> float: + return float(raw.replace(",", "")) + + +def _diff_cumulative(values: list[float]) -> np.ndarray: + arr = np.asarray(values, dtype=float) + if arr.size == 0: + return arr + return np.diff(np.concatenate(([0.0], arr))) + + +def parse_episode_metrics(stdout: str) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + occupancy = [] + cumulative_savings = [] + cumulative_savings_off = [] + completion_rate = [] + avg_wait = [] + + for match in EPISODE_RE.finditer(stdout): + occupancy.append(_to_float(match.group("occupancy"))) + cumulative_savings.append(_to_float(match.group("savings"))) + cumulative_savings_off.append(_to_float(match.group("savings_off"))) + completion_rate.append(_to_float(match.group("completion_rate"))) + avg_wait.append(_to_float(match.group("avg_wait"))) + + if not occupancy: + raise RuntimeError( + "Could not parse episode metrics from train.py output. " + "Expected lines like 'Episode X: ... Savings=€.../€..., Agent Occupancy (Nodes)=...%'." + ) + + episode_savings = _diff_cumulative(cumulative_savings) + episode_savings_off = _diff_cumulative(cumulative_savings_off) + return ( + np.asarray(occupancy, dtype=float), + episode_savings, + episode_savings_off, + np.asarray(completion_rate, dtype=float), + np.asarray(avg_wait, dtype=float), + ) + + +def parse_wait_summary(stdout: str) -> tuple[float | None, float | None]: + match = WAIT_SUMMARY_RE.search(stdout) + if not match: + return None, None + return _to_float(match.group("agent_wait")), _to_float(match.group("baseline_wait")) + + +def safe_divide(numer: np.ndarray, denom: float) -> np.ndarray: + if abs(denom) < 1e-12: + return np.full_like(numer, np.nan, dtype=float) + return numer / denom + + +def finite_mean_std(values: np.ndarray) -> tuple[float, float]: + finite = np.isfinite(values) + if not np.any(finite): + return float("nan"), float("nan") + vals = values[finite] + return float(np.mean(vals)), float(np.std(vals)) + + +def make_run_stats( + lambda_value: int, + command: list[str], + occupancy: np.ndarray, + savings: np.ndarray, + savings_off: np.ndarray, + completion_rate: np.ndarray, + agent_avg_wait_hours: float, + baseline_avg_wait_hours: float, +) -> LambdaRunStats: + wait_delta_hours = agent_avg_wait_hours - baseline_avg_wait_hours + effective_savings = safe_divide(savings * completion_rate, wait_delta_hours) + effective_savings_off = safe_divide(savings_off * completion_rate, wait_delta_hours) + effective_savings_mean, effective_savings_std = finite_mean_std(effective_savings) + effective_savings_off_mean, effective_savings_off_std = finite_mean_std(effective_savings_off) + return LambdaRunStats( + lambda_value=lambda_value, + episodes=int(occupancy.size), + occupancy_mean=float(np.mean(occupancy)), + occupancy_std=float(np.std(occupancy)), + savings_mean=float(np.mean(savings)), + savings_std=float(np.std(savings)), + savings_off_mean=float(np.mean(savings_off)), + savings_off_std=float(np.std(savings_off)), + completion_rate_mean=float(np.mean(completion_rate)), + completion_rate_std=float(np.std(completion_rate)), + agent_avg_wait_hours=float(agent_avg_wait_hours), + baseline_avg_wait_hours=float(baseline_avg_wait_hours), + wait_delta_hours=float(wait_delta_hours), + effective_savings_mean=effective_savings_mean, + effective_savings_std=effective_savings_std, + effective_savings_off_mean=effective_savings_off_mean, + effective_savings_off_std=effective_savings_off_std, + annual_total_savings=float(np.sum(savings)), + annual_total_savings_off=float(np.sum(savings_off)), + command=command, + command_str=shlex.join(command), + occupancy_samples=occupancy.tolist(), + savings_samples=savings.tolist(), + savings_off_samples=savings_off.tolist(), + completion_rate_samples=completion_rate.tolist(), + effective_savings_samples=effective_savings.tolist(), + effective_savings_off_samples=effective_savings_off.tolist(), + ) + + +def polyfit_curve(x: np.ndarray, y: np.ndarray, max_degree: int = 3) -> tuple[np.ndarray | None, int]: + finite = np.isfinite(x) & np.isfinite(y) + xf = x[finite] + yf = y[finite] + if xf.size < 2: + return None, 0 + degree = min(max_degree, xf.size - 1) + coeffs = np.polyfit(xf, yf, degree) + return coeffs, degree + + +def unique_ints_sorted(values: list[int]) -> list[int]: + return sorted({int(v) for v in values}) + + +def geometric_int_space(low: int, high: int, n: int) -> list[int]: + if n <= 1 or low == high: + return [int(low)] + vals = np.geomspace(low, high, n) + return unique_ints_sorted([int(round(v)) for v in vals]) + + +def clamp_int(value: int, low: int, high: int) -> int: + return max(low, min(high, int(value))) + + +def build_train_command(args: argparse.Namespace, lambda_value: int) -> list[str]: + cmd = [ + sys.executable, + "./train.py", + "--prices", + args.prices, + "--session", + args.session, + "--efficiency-weight", + str(args.efficiency_weight), + "--price-weight", + str(args.price_weight), + "--idle-weight", + str(args.idle_weight), + "--job-age-weight", + str(args.job_age_weight), + "--drop-weight", + str(args.drop_weight), + "--evaluate-savings", + "--eval-months", + str(args.eval_months), + "--model", + str(args.model), + "--workload-gen", + "poisson", + "--wg-poisson-lambdas4", + f"{lambda_value},{args.wg_duration_lambda},{args.wg_nodes_lambda},{args.wg_cores_lambda}", + "--wg-max-jobs-hour", + str(args.wg_max_jobs_hour), + "--wg-burst-small-prob", + str(args.wg_burst_small_prob), + "--wg-burst-heavy-prob", + str(args.wg_burst_heavy_prob), + ] + if args.carry_over_state: + cmd.append("--carry-over-state") + if args.plot_dashboard: + cmd.append("--plot-dashboard") + if args.dashboard_hours is not None: + cmd.extend(["--dashboard-hours", str(args.dashboard_hours)]) + return cmd + + +def run_lambda_eval(args: argparse.Namespace, project_root: Path, lambda_value: int) -> tuple[LambdaRunStats, str]: + command = build_train_command(args, lambda_value) + print(f"[run] lambda={lambda_value}: {shlex.join(command)}") + completed = subprocess.run( + command, + cwd=str(project_root), + capture_output=True, + text=True, + check=False, + ) + + combined_output = (completed.stdout or "") + ("\n" + completed.stderr if completed.stderr else "") + if args.echo_train_output: + print(combined_output) + if completed.returncode != 0: + raise RuntimeError( + f"train.py failed for lambda={lambda_value} with code {completed.returncode}.\n" + f"Last output lines:\n{os_tail(combined_output, lines=40)}" + ) + + occupancy, savings, savings_off, completion_rate, avg_wait = parse_episode_metrics(combined_output) + agent_wait_summary, baseline_wait_summary = parse_wait_summary(combined_output) + if agent_wait_summary is None or baseline_wait_summary is None: + print(f"[warn] lambda={lambda_value}: could not parse run-level wait summary; effective savings may be NaN.") + agent_avg_wait_hours = float(np.mean(avg_wait)) + baseline_avg_wait_hours = float(np.mean(avg_wait)) + else: + agent_avg_wait_hours = float(agent_wait_summary) + baseline_avg_wait_hours = float(baseline_wait_summary) + stats = make_run_stats( + lambda_value, + command, + occupancy, + savings, + savings_off, + completion_rate, + agent_avg_wait_hours, + baseline_avg_wait_hours, + ) + print( + f"[ok ] lambda={lambda_value}: " + f"occupancy={stats.occupancy_mean:.2f}%±{stats.occupancy_std:.2f}, " + f"completion={stats.completion_rate_mean:.2f}%±{stats.completion_rate_std:.2f}, " + f"savings={stats.savings_mean:.0f}±{stats.savings_std:.0f}, " + f"savings_off={stats.savings_off_mean:.0f}±{stats.savings_off_std:.0f}, " + f"wait_delta={stats.wait_delta_hours:.3f}h" + ) + return stats, combined_output + + +def os_tail(text: str, lines: int = 20) -> str: + parts = text.rstrip().splitlines() + if not parts: + return "" + return "\n".join(parts[-lines:]) + + +def select_lambda_schedule( + args: argparse.Namespace, + evaluate: Callable[[int], LambdaRunStats], +) -> list[int]: + if args.lambdas: + explicit = unique_ints_sorted( + [clamp_int(v, args.min_lambda, args.max_lambda) for v in parse_int_list(args.lambdas)] + ) + for lam in explicit: + evaluate(lam) + return explicit + + ref = clamp_int(args.reference_lambda, args.min_lambda, args.max_lambda) + evaluate(ref) + + # Probe low side. + low = ref + for _ in range(args.bracket_steps): + occ = evaluate(low).occupancy_mean + if occ <= args.target_occupancy_min + args.occupancy_tolerance: + break + nxt = clamp_int(int(round(low / args.bracket_factor)), args.min_lambda, args.max_lambda) + if nxt == low: + break + low = nxt + evaluate(low) + + # Probe high side. + high = ref + for _ in range(args.bracket_steps): + occ = evaluate(high).occupancy_mean + if occ >= args.target_occupancy_max - args.occupancy_tolerance: + break + nxt = clamp_int(int(round(high * args.bracket_factor)), args.min_lambda, args.max_lambda) + if nxt == high: + break + high = nxt + evaluate(high) + + # Fill remaining points log-spaced across discovered range. + discovered = unique_ints_sorted(list(evaluate.cache_keys())) + low_bound = min(discovered) + high_bound = max(discovered) + target_points = max(args.num_points, len(discovered)) + + for lam in geometric_int_space(low_bound, high_bound, target_points): + evaluate(lam) + + # If geometric spacing collapsed to fewer unique integers, fill largest gaps. + while len(evaluate.cache_keys()) < target_points: + ordered = unique_ints_sorted(list(evaluate.cache_keys())) + if len(ordered) < 2: + break + gaps = sorted( + ((ordered[i + 1] - ordered[i], ordered[i], ordered[i + 1]) for i in range(len(ordered) - 1)), + reverse=True, + ) + inserted = False + for _, a, b in gaps: + cand = int(round(math.sqrt(a * b))) + if cand in (a, b): + cand = (a + b) // 2 + cand = clamp_int(cand, args.min_lambda, args.max_lambda) + if cand not in evaluate.cache_keys(): + evaluate(cand) + inserted = True + break + if not inserted: + break + + return unique_ints_sorted(list(evaluate.cache_keys())) + + +def write_summary_csv(path: Path, stats_by_lambda: list[LambdaRunStats]) -> None: + fieldnames = [ + "lambda", + "episodes", + "occupancy_mean_pct", + "occupancy_std_pct", + "completion_rate_mean_pct", + "completion_rate_std_pct", + "agent_avg_wait_hours", + "baseline_avg_wait_hours", + "wait_delta_hours", + "savings_mean_eur", + "savings_std_eur", + "savings_off_mean_eur", + "savings_off_std_eur", + "effective_savings_mean", + "effective_savings_std", + "effective_savings_off_mean", + "effective_savings_off_std", + "annual_total_savings_eur", + "annual_total_savings_off_eur", + ] + with path.open("w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for s in sorted(stats_by_lambda, key=lambda x: x.lambda_value): + writer.writerow( + { + "lambda": s.lambda_value, + "episodes": s.episodes, + "occupancy_mean_pct": f"{s.occupancy_mean:.6f}", + "occupancy_std_pct": f"{s.occupancy_std:.6f}", + "completion_rate_mean_pct": f"{s.completion_rate_mean:.6f}", + "completion_rate_std_pct": f"{s.completion_rate_std:.6f}", + "agent_avg_wait_hours": f"{s.agent_avg_wait_hours:.6f}", + "baseline_avg_wait_hours": f"{s.baseline_avg_wait_hours:.6f}", + "wait_delta_hours": f"{s.wait_delta_hours:.6f}", + "savings_mean_eur": f"{s.savings_mean:.6f}", + "savings_std_eur": f"{s.savings_std:.6f}", + "savings_off_mean_eur": f"{s.savings_off_mean:.6f}", + "savings_off_std_eur": f"{s.savings_off_std:.6f}", + "effective_savings_mean": f"{s.effective_savings_mean:.6f}", + "effective_savings_std": f"{s.effective_savings_std:.6f}", + "effective_savings_off_mean": f"{s.effective_savings_off_mean:.6f}", + "effective_savings_off_std": f"{s.effective_savings_off_std:.6f}", + "annual_total_savings_eur": f"{s.annual_total_savings:.6f}", + "annual_total_savings_off_eur": f"{s.annual_total_savings_off:.6f}", + } + ) + + +def make_plot(path: Path, stats_by_lambda: list[LambdaRunStats]) -> None: + ordered = sorted(stats_by_lambda, key=lambda x: x.lambda_value) + + lambdas = np.array([s.lambda_value for s in ordered], dtype=float) + occ_mean = np.array([s.occupancy_mean for s in ordered], dtype=float) + occ_std = np.array([s.occupancy_std for s in ordered], dtype=float) + sav_mean = np.array([s.savings_mean for s in ordered], dtype=float) + sav_std = np.array([s.savings_std for s in ordered], dtype=float) + sav_off_mean = np.array([s.savings_off_mean for s in ordered], dtype=float) + sav_off_std = np.array([s.savings_off_std for s in ordered], dtype=float) + completion_mean = np.array([s.completion_rate_mean for s in ordered], dtype=float) + completion_std = np.array([s.completion_rate_std for s in ordered], dtype=float) + eff_sav_mean = np.array([s.effective_savings_mean for s in ordered], dtype=float) + eff_sav_std = np.array([s.effective_savings_std for s in ordered], dtype=float) + eff_sav_off_mean = np.array([s.effective_savings_off_mean for s in ordered], dtype=float) + eff_sav_off_std = np.array([s.effective_savings_off_std for s in ordered], dtype=float) + + fig, axes = plt.subplots(2, 3, figsize=(20, 12), constrained_layout=True) + ax00, ax01, ax02 = axes[0] + ax10, ax11, ax12 = axes[1] + + # Panel 1: lambda vs occupancy. + ax00.errorbar( + lambdas, + occ_mean, + yerr=occ_std, + fmt="o", + capsize=3, + color="tab:blue", + ecolor="tab:blue", + label="mean +/- std", + ) + coeffs, deg = polyfit_curve(lambdas, occ_mean, max_degree=3) + if coeffs is not None: + x_fit = np.linspace(float(np.min(lambdas)), float(np.max(lambdas)), 250) + ax00.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:orange", lw=2, label=f"poly deg {deg}") + ax00.set_title("Lambda vs Occupancy/Episode") + ax00.set_xlabel("Poisson lambda (arrivals)") + ax00.set_ylabel("Agent Occupancy (Nodes, %) / Episode") + ax00.grid(alpha=0.3) + ax00.legend() + + # Panel 2: occupancy vs savings. + ax01.errorbar( + occ_mean, + sav_mean, + xerr=occ_std, + yerr=sav_std, + fmt="o", + capsize=3, + color="tab:green", + ecolor="tab:green", + label="mean +/- std", + ) + coeffs, deg = polyfit_curve(occ_mean, sav_mean, max_degree=3) + if coeffs is not None: + x_fit = np.linspace(float(np.min(occ_mean)), float(np.max(occ_mean)), 250) + ax01.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:red", lw=2, label=f"poly deg {deg}") + ax01.set_title("Occupancy/Episode vs Savings/Episode") + ax01.set_xlabel("Agent Occupancy (Nodes, %) / Episode") + ax01.set_ylabel("Savings vs Baseline (EUR / Episode)") + ax01.grid(alpha=0.3) + ax01.legend() + + # Panel 3: occupancy vs savings_off. + ax02.errorbar( + occ_mean, + sav_off_mean, + xerr=occ_std, + yerr=sav_off_std, + fmt="o", + capsize=3, + color="tab:purple", + ecolor="tab:purple", + label="mean +/- std", + ) + coeffs, deg = polyfit_curve(occ_mean, sav_off_mean, max_degree=3) + if coeffs is not None: + x_fit = np.linspace(float(np.min(occ_mean)), float(np.max(occ_mean)), 250) + ax02.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:brown", lw=2, label=f"poly deg {deg}") + ax02.set_title("Occupancy vs Savings_off/Episode") + ax02.set_xlabel("Agent Occupancy (Nodes, %) / Episode") + ax02.set_ylabel("Savings vs Baseline_off (EUR / Episode)") + ax02.grid(alpha=0.3) + ax02.legend() + + # Panel 4: lambda vs completion rate. + ax10.errorbar( + lambdas, + completion_mean, + yerr=completion_std, + fmt="o", + capsize=3, + color="tab:cyan", + ecolor="tab:cyan", + label="mean +/- std", + ) + coeffs, deg = polyfit_curve(lambdas, completion_mean, max_degree=3) + if coeffs is not None: + x_fit = np.linspace(float(np.min(lambdas)), float(np.max(lambdas)), 250) + ax10.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:gray", lw=2, label=f"poly deg {deg}") + ax10.set_title("Lambda vs Agent Completion Rate") + ax10.set_xlabel("Poisson lambda (arrivals)") + ax10.set_ylabel("Completion Rate (%)") + ax10.grid(alpha=0.3) + ax10.legend() + + # Panel 5: occupancy vs effective savings. + ax11.errorbar( + occ_mean, + eff_sav_mean, + xerr=occ_std, + yerr=eff_sav_std, + fmt="o", + capsize=3, + color="tab:olive", + ecolor="tab:olive", + label="mean +/- std", + ) + coeffs, deg = polyfit_curve(occ_mean, eff_sav_mean, max_degree=3) + if coeffs is not None: + finite = np.isfinite(occ_mean) & np.isfinite(eff_sav_mean) + x_fit = np.linspace(float(np.min(occ_mean[finite])), float(np.max(occ_mean[finite])), 250) + ax11.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:red", lw=2, label=f"poly deg {deg}") + ax11.set_title("Occupancy vs Effective Savings") + ax11.set_xlabel("Agent Occupancy (Nodes, %) / Episode") + ax11.set_ylabel("effective_savings") + ax11.grid(alpha=0.3) + ax11.legend() + + # Panel 6: occupancy vs effective savings_off. + ax12.errorbar( + occ_mean, + eff_sav_off_mean, + xerr=occ_std, + yerr=eff_sav_off_std, + fmt="o", + capsize=3, + color="tab:pink", + ecolor="tab:pink", + label="mean +/- std", + ) + coeffs, deg = polyfit_curve(occ_mean, eff_sav_off_mean, max_degree=3) + if coeffs is not None: + finite = np.isfinite(occ_mean) & np.isfinite(eff_sav_off_mean) + x_fit = np.linspace(float(np.min(occ_mean[finite])), float(np.max(occ_mean[finite])), 250) + ax12.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:brown", lw=2, label=f"poly deg {deg}") + ax12.set_title("Occupancy vs Effective Savings_off") + ax12.set_xlabel("Agent Occupancy (Nodes, %) / Episode") + ax12.set_ylabel("effective_savings_off") + ax12.grid(alpha=0.3) + ax12.legend() + + fig.savefig(path, dpi=220) + plt.close(fig) + + +def parse_int_list(raw: str) -> list[int]: + return [int(part.strip()) for part in raw.split(",") if part.strip()] + + +def build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Sweep Poisson lambdas and fit occupancy/savings trend lines." + ) + + # Core train.py params (default values mirror the command from the request). + parser.add_argument("--prices", default="./data/prices_2023.csv") + parser.add_argument("--session", default="20260216_fix") + parser.add_argument("--efficiency-weight", type=float, default=0.6) + parser.add_argument("--price-weight", type=float, default=0.1) + parser.add_argument("--idle-weight", type=float, default=0.1) + parser.add_argument("--job-age-weight", type=float, default=0.2) + parser.add_argument("--drop-weight", type=float, default=0.0) + parser.add_argument("--eval-months", type=int, default=12) + parser.add_argument("--model", type=int, default=1000000) + + parser.add_argument("--carry-over-state", action=argparse.BooleanOptionalAction, default=True) + parser.add_argument("--plot-dashboard", action=argparse.BooleanOptionalAction, default=True) + parser.add_argument("--dashboard-hours", type=int, default=None) + + parser.add_argument("--wg-duration-lambda", type=float, default=0.3) + parser.add_argument("--wg-nodes-lambda", type=float, default=1.0) + parser.add_argument("--wg-cores-lambda", type=float, default=3.0) + parser.add_argument("--wg-max-jobs-hour", type=int, default=70000) + parser.add_argument("--wg-burst-small-prob", type=float, default=0.1329) + parser.add_argument("--wg-burst-heavy-prob", type=float, default=0.0043) + + # Lambda sweep controls. + parser.add_argument("--lambdas", type=str, default="", help="Explicit comma-separated lambda list. If set, adaptive selection is disabled.") + parser.add_argument("--reference-lambda", type=int, default=2000) + parser.add_argument("--min-lambda", type=int, default=100) + parser.add_argument("--max-lambda", type=int, default=12000) + parser.add_argument("--num-points", type=int, default=7, help="Target number of lambda points for adaptive sweep.") + parser.add_argument("--bracket-steps", type=int, default=2, help="Low/high probing steps from reference lambda.") + parser.add_argument("--bracket-factor", type=float, default=2.0, help="Factor for high/low probing.") + parser.add_argument("--target-occupancy-min", type=float, default=20.0) + parser.add_argument("--target-occupancy-max", type=float, default=100.0) + parser.add_argument("--occupancy-tolerance", type=float, default=2.0) + + parser.add_argument("--out-dir", type=str, default="") + parser.add_argument("--save-logs", action=argparse.BooleanOptionalAction, default=True) + parser.add_argument("--echo-train-output", action=argparse.BooleanOptionalAction, default=False) + return parser + + +def main() -> None: + parser = build_arg_parser() + args = parser.parse_args() + + if args.num_points < 2: + parser.error("--num-points must be >= 2") + if args.min_lambda <= 0: + parser.error("--min-lambda must be > 0") + if args.max_lambda < args.min_lambda: + parser.error("--max-lambda must be >= --min-lambda") + if args.bracket_factor <= 1.0: + parser.error("--bracket-factor must be > 1") + + project_root = Path(__file__).resolve().parent + train_py = project_root / "train.py" + if not train_py.exists(): + raise FileNotFoundError(f"Could not find train.py at: {train_py}") + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + if args.out_dir: + out_dir = Path(args.out_dir).expanduser().resolve() + else: + out_dir = project_root / "analysis" / f"lambda_occupancy_sweep_{timestamp}" + out_dir.mkdir(parents=True, exist_ok=True) + logs_dir = out_dir / "logs" + if args.save_logs: + logs_dir.mkdir(parents=True, exist_ok=True) + + cache: dict[int, LambdaRunStats] = {} + + def evaluate(lam: int) -> LambdaRunStats: + lam = clamp_int(lam, args.min_lambda, args.max_lambda) + if lam in cache: + return cache[lam] + stats, raw_output = run_lambda_eval(args, project_root, lam) + cache[lam] = stats + if args.save_logs: + log_path = logs_dir / f"lambda_{lam}.log" + log_path.write_text(raw_output) + return stats + + # Attach cache reader for select_lambda_schedule(). + evaluate.cache_keys = cache.keys # type: ignore[attr-defined] + + selected_lambdas = select_lambda_schedule(args, evaluate) + stats_ordered = [cache[l] for l in sorted(selected_lambdas)] + + csv_path = out_dir / "summary.csv" + json_path = out_dir / "summary.json" + plot_path = out_dir / "trendlines.png" + + write_summary_csv(csv_path, stats_ordered) + with json_path.open("w") as f: + json.dump( + { + "created_at": datetime.now().isoformat(), + "selected_lambdas": selected_lambdas, + "args": vars(args), + "results": [asdict(s) for s in stats_ordered], + }, + f, + indent=2, + ) + make_plot(plot_path, stats_ordered) + + print("\nSweep complete.") + print(f" Lambdas: {selected_lambdas}") + print(f" CSV: {csv_path}") + print(f" JSON: {json_path}") + print(f" Plot: {plot_path}") + + +if __name__ == "__main__": + main() From 4e10bb9ef806735cfcf4768fb5dc56914f6f3ab9 Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Fri, 20 Feb 2026 16:20:48 +0100 Subject: [PATCH 4/8] Add: QoL Plotting now color-coded Add: Fit now toggled through parser --- analyze_lambda_occupancy.py | 196 ++++++++++++++++++++---------------- 1 file changed, 108 insertions(+), 88 deletions(-) diff --git a/analyze_lambda_occupancy.py b/analyze_lambda_occupancy.py index d78dcce..a407887 100644 --- a/analyze_lambda_occupancy.py +++ b/analyze_lambda_occupancy.py @@ -158,8 +158,8 @@ def make_run_stats( baseline_avg_wait_hours: float, ) -> LambdaRunStats: wait_delta_hours = agent_avg_wait_hours - baseline_avg_wait_hours - effective_savings = safe_divide(savings * completion_rate, wait_delta_hours) - effective_savings_off = safe_divide(savings_off * completion_rate, wait_delta_hours) + effective_savings = safe_divide(savings * (completion_rate/100)**2, wait_delta_hours+1) + effective_savings_off = safe_divide(savings_off * (completion_rate/100)**2, wait_delta_hours+1) effective_savings_mean, effective_savings_std = finite_mean_std(effective_savings) effective_savings_off_mean, effective_savings_off_std = finite_mean_std(effective_savings_off) return LambdaRunStats( @@ -323,6 +323,18 @@ def select_lambda_schedule( args: argparse.Namespace, evaluate: Callable[[int], LambdaRunStats], ) -> list[int]: + """ + Adaptive lambda selection when --lambdas is not provided: + 1) Start at reference lambda (clamped to min/max). + 2) Probe downward (divide by bracket_factor) until occupancy reaches + target_occupancy_min (+ tolerance) or bracket_steps is exhausted. + 3) Probe upward (multiply by bracket_factor) until occupancy reaches + target_occupancy_max (- tolerance) or bracket_steps is exhausted. + 4) Fill remaining points with log spacing between discovered low/high. + 5) If log spacing collapses due to integer rounding, insert candidates + into the largest gaps until num_points is reached or no new values + can be inserted. + """ if args.lambdas: explicit = unique_ints_sorted( [clamp_int(v, args.min_lambda, args.max_lambda) for v in parse_int_list(args.lambdas)] @@ -443,8 +455,10 @@ def write_summary_csv(path: Path, stats_by_lambda: list[LambdaRunStats]) -> None ) -def make_plot(path: Path, stats_by_lambda: list[LambdaRunStats]) -> None: +def make_plot(path: Path, stats_by_lambda: list[LambdaRunStats], fit: bool = False) -> None: ordered = sorted(stats_by_lambda, key=lambda x: x.lambda_value) + if not ordered: + return lambdas = np.array([s.lambda_value for s in ordered], dtype=float) occ_mean = np.array([s.occupancy_mean for s in ordered], dtype=float) @@ -460,141 +474,146 @@ def make_plot(path: Path, stats_by_lambda: list[LambdaRunStats]) -> None: eff_sav_off_mean = np.array([s.effective_savings_off_mean for s in ordered], dtype=float) eff_sav_off_std = np.array([s.effective_savings_off_std for s in ordered], dtype=float) + lam_min = float(np.min(lambdas)) + lam_max = float(np.max(lambdas)) + if lam_max <= lam_min: + lam_max = lam_min + 1.0 + norm = matplotlib.colors.Normalize(vmin=lam_min, vmax=lam_max) + cmap = plt.get_cmap("turbo") + point_colors = cmap(norm(lambdas)) + + def _error_at(arr: np.ndarray | None, idx: int) -> float | None: + if arr is None: + return None + v = float(arr[idx]) + return v if np.isfinite(v) else None + + def plot_colored_points( + ax: plt.Axes, + x: np.ndarray, + y: np.ndarray, + xerr: np.ndarray | None = None, + yerr: np.ndarray | None = None, + ) -> None: + for i, (xi, yi, c) in enumerate(zip(x, y, point_colors)): + if not (np.isfinite(xi) and np.isfinite(yi)): + continue + ax.errorbar( + float(xi), + float(yi), + xerr=_error_at(xerr, i), + yerr=_error_at(yerr, i), + fmt="o", + markersize=6, + capsize=3, + color=c, + ecolor=c, + elinewidth=1.2, + alpha=0.95, + ) + fig, axes = plt.subplots(2, 3, figsize=(20, 12), constrained_layout=True) ax00, ax01, ax02 = axes[0] ax10, ax11, ax12 = axes[1] # Panel 1: lambda vs occupancy. - ax00.errorbar( - lambdas, - occ_mean, - yerr=occ_std, - fmt="o", - capsize=3, - color="tab:blue", - ecolor="tab:blue", - label="mean +/- std", - ) - coeffs, deg = polyfit_curve(lambdas, occ_mean, max_degree=3) + plot_colored_points(ax00, lambdas, occ_mean, yerr=occ_std) + coeffs, deg = None, None + if fit: + coeffs, deg = polyfit_curve(lambdas, occ_mean, max_degree=3) if coeffs is not None: x_fit = np.linspace(float(np.min(lambdas)), float(np.max(lambdas)), 250) - ax00.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:orange", lw=2, label=f"poly deg {deg}") + ax00.plot(x_fit, np.polyval(coeffs, x_fit), color="black", lw=2, label=f"poly deg {deg}") ax00.set_title("Lambda vs Occupancy/Episode") ax00.set_xlabel("Poisson lambda (arrivals)") ax00.set_ylabel("Agent Occupancy (Nodes, %) / Episode") ax00.grid(alpha=0.3) - ax00.legend() + if coeffs is not None: + ax00.legend() # Panel 2: occupancy vs savings. - ax01.errorbar( - occ_mean, - sav_mean, - xerr=occ_std, - yerr=sav_std, - fmt="o", - capsize=3, - color="tab:green", - ecolor="tab:green", - label="mean +/- std", - ) - coeffs, deg = polyfit_curve(occ_mean, sav_mean, max_degree=3) + plot_colored_points(ax01, occ_mean, sav_mean, xerr=occ_std, yerr=sav_std) + coeffs, deg = None, None + if fit: + coeffs, deg = polyfit_curve(occ_mean, sav_mean, max_degree=3) if coeffs is not None: - x_fit = np.linspace(float(np.min(occ_mean)), float(np.max(occ_mean)), 250) - ax01.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:red", lw=2, label=f"poly deg {deg}") + finite = np.isfinite(occ_mean) & np.isfinite(sav_mean) + x_fit = np.linspace(float(np.min(occ_mean[finite])), float(np.max(occ_mean[finite])), 250) + ax01.plot(x_fit, np.polyval(coeffs, x_fit), color="black", lw=2, label=f"poly deg {deg}") ax01.set_title("Occupancy/Episode vs Savings/Episode") ax01.set_xlabel("Agent Occupancy (Nodes, %) / Episode") ax01.set_ylabel("Savings vs Baseline (EUR / Episode)") ax01.grid(alpha=0.3) - ax01.legend() + if coeffs is not None: + ax01.legend() # Panel 3: occupancy vs savings_off. - ax02.errorbar( - occ_mean, - sav_off_mean, - xerr=occ_std, - yerr=sav_off_std, - fmt="o", - capsize=3, - color="tab:purple", - ecolor="tab:purple", - label="mean +/- std", - ) - coeffs, deg = polyfit_curve(occ_mean, sav_off_mean, max_degree=3) + plot_colored_points(ax02, occ_mean, sav_off_mean, xerr=occ_std, yerr=sav_off_std) + coeffs, deg = None, None + if fit: + coeffs, deg = polyfit_curve(occ_mean, sav_off_mean, max_degree=3) if coeffs is not None: - x_fit = np.linspace(float(np.min(occ_mean)), float(np.max(occ_mean)), 250) - ax02.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:brown", lw=2, label=f"poly deg {deg}") + finite = np.isfinite(occ_mean) & np.isfinite(sav_off_mean) + x_fit = np.linspace(float(np.min(occ_mean[finite])), float(np.max(occ_mean[finite])), 250) + ax02.plot(x_fit, np.polyval(coeffs, x_fit), color="black", lw=2, label=f"poly deg {deg}") ax02.set_title("Occupancy vs Savings_off/Episode") ax02.set_xlabel("Agent Occupancy (Nodes, %) / Episode") ax02.set_ylabel("Savings vs Baseline_off (EUR / Episode)") ax02.grid(alpha=0.3) - ax02.legend() + if coeffs is not None: + ax02.legend() # Panel 4: lambda vs completion rate. - ax10.errorbar( - lambdas, - completion_mean, - yerr=completion_std, - fmt="o", - capsize=3, - color="tab:cyan", - ecolor="tab:cyan", - label="mean +/- std", - ) - coeffs, deg = polyfit_curve(lambdas, completion_mean, max_degree=3) + plot_colored_points(ax10, lambdas, completion_mean, yerr=completion_std) + coeffs, deg = None, None + if fit: + coeffs, deg = polyfit_curve(lambdas, completion_mean, max_degree=3) if coeffs is not None: x_fit = np.linspace(float(np.min(lambdas)), float(np.max(lambdas)), 250) - ax10.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:gray", lw=2, label=f"poly deg {deg}") + ax10.plot(x_fit, np.polyval(coeffs, x_fit), color="black", lw=2, label=f"poly deg {deg}") ax10.set_title("Lambda vs Agent Completion Rate") ax10.set_xlabel("Poisson lambda (arrivals)") ax10.set_ylabel("Completion Rate (%)") ax10.grid(alpha=0.3) - ax10.legend() + if coeffs is not None: + ax10.legend() # Panel 5: occupancy vs effective savings. - ax11.errorbar( - occ_mean, - eff_sav_mean, - xerr=occ_std, - yerr=eff_sav_std, - fmt="o", - capsize=3, - color="tab:olive", - ecolor="tab:olive", - label="mean +/- std", - ) - coeffs, deg = polyfit_curve(occ_mean, eff_sav_mean, max_degree=3) + plot_colored_points(ax11, occ_mean, eff_sav_mean, xerr=occ_std, yerr=eff_sav_std) + coeffs, deg = None, None + if fit: + coeffs, deg = polyfit_curve(occ_mean, eff_sav_mean, max_degree=3) if coeffs is not None: finite = np.isfinite(occ_mean) & np.isfinite(eff_sav_mean) x_fit = np.linspace(float(np.min(occ_mean[finite])), float(np.max(occ_mean[finite])), 250) - ax11.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:red", lw=2, label=f"poly deg {deg}") + ax11.plot(x_fit, np.polyval(coeffs, x_fit), color="black", lw=2, label=f"poly deg {deg}") ax11.set_title("Occupancy vs Effective Savings") ax11.set_xlabel("Agent Occupancy (Nodes, %) / Episode") ax11.set_ylabel("effective_savings") ax11.grid(alpha=0.3) - ax11.legend() + if coeffs is not None: + ax11.legend() # Panel 6: occupancy vs effective savings_off. - ax12.errorbar( - occ_mean, - eff_sav_off_mean, - xerr=occ_std, - yerr=eff_sav_off_std, - fmt="o", - capsize=3, - color="tab:pink", - ecolor="tab:pink", - label="mean +/- std", - ) - coeffs, deg = polyfit_curve(occ_mean, eff_sav_off_mean, max_degree=3) + plot_colored_points(ax12, occ_mean, eff_sav_off_mean, xerr=occ_std, yerr=eff_sav_off_std) + coeffs, deg = None, None + if fit: + coeffs, deg = polyfit_curve(occ_mean, eff_sav_off_mean, max_degree=3) if coeffs is not None: finite = np.isfinite(occ_mean) & np.isfinite(eff_sav_off_mean) x_fit = np.linspace(float(np.min(occ_mean[finite])), float(np.max(occ_mean[finite])), 250) - ax12.plot(x_fit, np.polyval(coeffs, x_fit), color="tab:brown", lw=2, label=f"poly deg {deg}") + ax12.plot(x_fit, np.polyval(coeffs, x_fit), color="black", lw=2, label=f"poly deg {deg}") ax12.set_title("Occupancy vs Effective Savings_off") ax12.set_xlabel("Agent Occupancy (Nodes, %) / Episode") ax12.set_ylabel("effective_savings_off") ax12.grid(alpha=0.3) - ax12.legend() + if coeffs is not None: + ax12.legend() + + sm = plt.cm.ScalarMappable(norm=norm, cmap=cmap) + sm.set_array([]) + cbar = fig.colorbar(sm, ax=axes.ravel().tolist(), pad=0.02) + cbar.set_label("Poisson lambda (point color)") fig.savefig(path, dpi=220) plt.close(fig) @@ -646,6 +665,7 @@ def build_arg_parser() -> argparse.ArgumentParser: parser.add_argument("--out-dir", type=str, default="") parser.add_argument("--save-logs", action=argparse.BooleanOptionalAction, default=True) parser.add_argument("--echo-train-output", action=argparse.BooleanOptionalAction, default=False) + parser.add_argument("--fit", action="store_true", default=False, help="Enable polynomial fitting of datasets") return parser @@ -712,7 +732,7 @@ def evaluate(lam: int) -> LambdaRunStats: f, indent=2, ) - make_plot(plot_path, stats_ordered) + make_plot(plot_path, stats_ordered, fit=args.fit) print("\nSweep complete.") print(f" Lambdas: {selected_lambdas}") From 692a1002eb21f6120665ac225bca926be1154566 Mon Sep 17 00:00:00 2001 From: enlorenz <103032392+enlorenz@users.noreply.github.com> Date: Tue, 3 Mar 2026 12:20:45 +0100 Subject: [PATCH 5/8] Update train.py Co-authored-by: Alexey Rybalchenko --- train.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/train.py b/train.py index feff76a..71d86b0 100644 --- a/train.py +++ b/train.py @@ -225,7 +225,7 @@ def main(): f"AvgWait={avg_wait:.1f}h, " f"EpisodeMaxQueue={env.metrics.episode_max_queue_size_reached}, Dropped={env.metrics.episode_jobs_dropped}, " f"MaxQueue={env.metrics.max_queue_size_reached}, " - f"Agent Occupancy (Cores)={env.metrics.episode_used_cores[-1]*100/(CORES_PER_NODE*MAX_NODES)if env.metrics.episode_used_cores else 0 :.2f}%, Baseline Occupancy (Cores)={env.metrics.episode_baseline_used_cores[-1]*100/(CORES_PER_NODE*MAX_NODES) if env.metrics.episode_baseline_used_cores else 0 :.2f}% " + f"Agent Occupancy (Cores)={env.metrics.episode_used_cores[-1]*100/(CORES_PER_NODE*MAX_NODES) if env.metrics.episode_used_cores else 0 :.2f}%, Baseline Occupancy (Cores)={env.metrics.episode_baseline_used_cores[-1]*100/(CORES_PER_NODE*MAX_NODES) if env.metrics.episode_baseline_used_cores else 0 :.2f}%, " f"Agent Occupancy (Nodes)={env.metrics.episode_used_nodes[-1]*100/MAX_NODES if env.metrics.episode_used_nodes else 0 :.2f}%, Baseline Occupancy (Nodes)={env.metrics.episode_baseline_used_nodes[-1]*100/MAX_NODES if env.metrics.episode_baseline_used_nodes else 0 :.2f}% " ) print(f"\nEvaluation complete! Generated {num_episodes} episodes of cost data.") From fa72e4b75229784d0a1f6e8f8d1fc38cab0b0e69 Mon Sep 17 00:00:00 2001 From: enlorenz <103032392+enlorenz@users.noreply.github.com> Date: Tue, 3 Mar 2026 12:27:28 +0100 Subject: [PATCH 6/8] Update src/baseline.py Co-authored-by: Alexey Rybalchenko --- src/baseline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/baseline.py b/src/baseline.py index 5bc2cd4..097f91c 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -114,5 +114,5 @@ def baseline_step( baseline_cost_off = power_cost(num_used_nodes, 0, current_price) env_print(f" > baseline_cost_off: €{baseline_cost_off:.4f} | used nodes: {num_used_nodes}, idle nodes: 0") - num_used_cores = np.sum((baseline_state['nodes'] > 0) * CORES_PER_NODE) + num_used_cores = num_on_nodes * CORES_PER_NODE - np.sum(baseline_cores_available) return baseline_cost, baseline_cost_off, baseline_next_empty_slot, next_job_id, num_used_nodes, num_used_cores From 1cf81f2f65db6064bac1b5e67167ba594bae54b3 Mon Sep 17 00:00:00 2001 From: enlorenz <103032392+enlorenz@users.noreply.github.com> Date: Tue, 3 Mar 2026 12:30:37 +0100 Subject: [PATCH 7/8] Update analyze_lambda_occupancy.py Co-authored-by: Alexey Rybalchenko --- analyze_lambda_occupancy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyze_lambda_occupancy.py b/analyze_lambda_occupancy.py index a407887..5fad95f 100644 --- a/analyze_lambda_occupancy.py +++ b/analyze_lambda_occupancy.py @@ -630,7 +630,7 @@ def build_arg_parser() -> argparse.ArgumentParser: # Core train.py params (default values mirror the command from the request). parser.add_argument("--prices", default="./data/prices_2023.csv") - parser.add_argument("--session", default="20260216_fix") + parser.add_argument("--session", default="") parser.add_argument("--efficiency-weight", type=float, default=0.6) parser.add_argument("--price-weight", type=float, default=0.1) parser.add_argument("--idle-weight", type=float, default=0.1) From fd6442febdb4f123a5dfe47aeddf79a85199b0ba Mon Sep 17 00:00:00 2001 From: Enis Lorenz Date: Fri, 20 Feb 2026 16:20:48 +0100 Subject: [PATCH 8/8] Add: QoL Plotting now color-coded Add: Fit now toggled through parser --- src/metrics_tracker.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/metrics_tracker.py b/src/metrics_tracker.py index f9391fb..1f5f0c9 100644 --- a/src/metrics_tracker.py +++ b/src/metrics_tracker.py @@ -79,13 +79,13 @@ def reset_episode_metrics(self) -> None: self.episode_baseline_jobs_rejected_queue_full: int = 0 # Time series data for plotting (episode) - self.episode_on_nodes = [] - self.episode_used_nodes = [] - self.episode_used_cores = [] - self.episode_baseline_used_nodes = [] - self.episode_baseline_used_cores = [] - self.episode_job_queue_sizes = [] - self.episode_price_stats = [] + self.episode_on_nodes: list[int] = [] + self.episode_used_nodes: list[int] = [] + self.episode_used_cores: list[int] = [] + self.episode_baseline_used_nodes: list[int] = [] + self.episode_baseline_used_cores: list[int] = [] + self.episode_job_queue_sizes: list[int] = [] + self.episode_price_stats: list[float] = [] self.episode_eff_rewards: list[float] = [] self.episode_price_rewards: list[float] = []