diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4b18e12..9703c02 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -73,5 +73,8 @@ jobs: - name: Run jobs aggregated sampler test run: python -m test.test_sampler_jobs_aggregated --file-path data/allusers-gpu-30.log + - name: Run jobs replay modes test + run: python -m test.test_jobs_replay_modes + - name: Run workload generator inspection test run: python -m test.test_inspect_workloadgen --workload-gen poisson --wg-poisson-lambdas4 200,10,6,24 --wg-max-jobs-hour 1500 --hours 336 --plot --wg-burst-small-prob 0.2 --wg-burst-heavy-prob 0.02 diff --git a/src/environment.py b/src/environment.py index 268d7a0..8ed817f 100644 --- a/src/environment.py +++ b/src/environment.py @@ -73,8 +73,7 @@ def __init__(self, evaluation_mode: bool = False, workload_gen: WorkloadGenerator | None = None, job_arrival_scale: float = 1.0, - jobs_exact_replay: bool = False, - jobs_exact_replay_aggregate: bool = False) -> None: + jobs_exact_replay: bool = False) -> None: super().__init__() self.weights = weights @@ -89,7 +88,6 @@ def __init__(self, self.evaluation_mode = evaluation_mode self.job_arrival_scale = float(job_arrival_scale) self.jobs_exact_replay = bool(jobs_exact_replay) - self.jobs_exact_replay_aggregate = bool(jobs_exact_replay_aggregate) self.next_plot_save = self.steps_per_iteration @@ -118,10 +116,7 @@ def __init__(self, print(f"Parsed aggregated jobs for {len(self.jobs_sampler.aggregated_jobs)} hours") if self.jobs_exact_replay: max_raw_jobs = max((len(v) for v in self.jobs_sampler.jobs.values()), default=0) - if self.jobs_exact_replay_aggregate: - print("Jobs replay mode: exact timeline (aggregated per step)") - else: - print("Jobs replay mode: exact timeline (raw jobs per hour)") + print("Jobs replay mode: exact timeline (raw jobs per hour)") print(f"Max raw jobs per hour: {max_raw_jobs}") else: self.jobs_sampler.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB) @@ -350,7 +345,6 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, hourly_sampler, durations_sampler, self.np_random, job_arrival_scale=self.job_arrival_scale, jobs_exact_replay=self.jobs_exact_replay, - jobs_exact_replay_aggregate=self.jobs_exact_replay_aggregate, ) # Add new jobs to queue (overflow goes to helper) diff --git a/src/workload_generator.py b/src/workload_generator.py index 26b496b..e6d0e12 100644 --- a/src/workload_generator.py +++ b/src/workload_generator.py @@ -30,7 +30,6 @@ def generate_jobs( np_random: np.random.Generator, job_arrival_scale: float = 1.0, jobs_exact_replay: bool = False, - jobs_exact_replay_aggregate: bool = False, ) -> tuple[int, list[int], list[int], list[int]]: """ Generate new jobs for the current hour using configured workload source. @@ -50,9 +49,6 @@ def generate_jobs( - >1.0: upsample jobs - 0.0..1.0: downsample jobs jobs_exact_replay: If True, replay raw jobs in log order for --jobs mode. - jobs_exact_replay_aggregate: In exact replay mode, aggregate each sampled - raw time-bin into compact hourly-equivalent templates. - Returns: Tuple of (new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores) """ @@ -68,26 +64,14 @@ def generate_jobs( # Replay jobs exactly as they appear in the parsed timeline (one bin per step). sampled = jobs_sampler.sample(1, wrap=True) raw_jobs = next(iter(sampled.values()), []) - if jobs_exact_replay_aggregate and raw_jobs: - aggregated_jobs = jobs_sampler.aggregate_jobs(raw_jobs) - hourly_jobs = jobs_sampler.convert_to_hourly_jobs( - aggregated_jobs, CORES_PER_NODE, MAX_NODES_PER_JOB - ) - for job in hourly_jobs: - instances = max(1, int(job.get('instances', 1))) - new_jobs_count += instances - new_jobs_durations.extend([int(job['duration_hours'])] * instances) - new_jobs_nodes.extend([int(job['nnodes'])] * instances) - new_jobs_cores.extend([int(job['cores_per_node'])] * instances) - else: - for job in raw_jobs: - duration_hours = max(1, int(math.ceil(int(job['duration_minutes']) / 60))) - nnodes = min(max(int(job['nnodes']), MIN_NODES_PER_JOB), MAX_NODES_PER_JOB) - cores_per_node = min(max(int(job['cores_per_node']), MIN_CORES_PER_JOB), CORES_PER_NODE) - new_jobs_count += 1 - new_jobs_durations.append(duration_hours) - new_jobs_nodes.append(nnodes) - new_jobs_cores.append(cores_per_node) + for job in raw_jobs: + duration_hours = max(1, int(math.ceil(int(job['duration_minutes']) / 60))) + nnodes = min(max(int(job['nnodes']), MIN_NODES_PER_JOB), MAX_NODES_PER_JOB) + cores_per_node = min(max(int(job['cores_per_node']), MIN_CORES_PER_JOB), CORES_PER_NODE) + new_jobs_count += 1 + new_jobs_durations.append(duration_hours) + new_jobs_nodes.append(nnodes) + new_jobs_cores.append(cores_per_node) else: # Use pre-aggregated hourly templates for pattern-based replay. sampled = jobs_sampler.sample_one_hourly(wrap=True) diff --git a/test/run_all.py b/test/run_all.py index 57ac853..758a7de 100644 --- a/test/run_all.py +++ b/test/run_all.py @@ -20,6 +20,7 @@ ["python", "-m", "test.test_sampler_hourly_aggregated", "--file-path", "data/allusers-gpu-30.log"], ["python", "-m", "test.test_sampler_jobs", "--file-path", "data/allusers-gpu-30.log"], ["python", "-m", "test.test_sampler_jobs_aggregated", "--file-path", "data/allusers-gpu-30.log"], + ["python", "-m", "test.test_jobs_replay_modes"], ["python", "-m", "test.test_inspect_workloadgen", "--workload-gen", "poisson", "--wg-poisson-lambdas4", "200,10,6,24", "--wg-max-jobs-hour", "1500", "--hours", "336", "--plot", "--wg-burst-small-prob", "0.2", "--wg-burst-heavy-prob", "0.02"], ] diff --git a/test/test_jobs_replay_modes.py b/test/test_jobs_replay_modes.py new file mode 100644 index 0000000..304a8f9 --- /dev/null +++ b/test/test_jobs_replay_modes.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python3 +"""Tests for --jobs replay modes: raw exact replay vs aggregated template replay. + +Synthetic log has two hourly bins with known jobs: + Bin 1 (00:xx): + - 2x (2 nodes, 16 NCPUS=8/node, 30 min) sub-hour, identical → merge in template mode + - 1x (1 node, 8 NCPUS=8/node, 120 min) 2-hour job + Bin 2 (01:xx): + - 1x (4 nodes, 32 NCPUS=8/node, 60 min) 1-hour job + - 1x (1 node, 96 NCPUS=96/node, 30 min) sub-hour job + +Raw replay: bin 1 → 3 jobs, bin 2 → 2 jobs; cores preserved as-parsed. +Template replay: bin 1 → 2 job instances (two sub-hour jobs collapsed into one + core-hour-equivalent entry), bin 2 → 2 job instances. +""" + +import math +import sys +import tempfile +import textwrap + +import numpy as np + +from src.sampler_jobs import DurationSampler +from src.workload_generator import generate_jobs +from src.config import CORES_PER_NODE, MAX_NODES_PER_JOB + +LOG_CONTENT = textwrap.dedent("""\ + JobID User Partition Submit Start End ElapsedRaw NCPUS NNodes\x20 + ------------------------------ --------- ---------- ------------------- ------------------- ------------------- ---------- ---------- --------\x20 + 1 user01 gpu 2024-01-01T00:01:00 2024-01-01T00:01:00 2024-01-01T00:31:00 1800 16 2 + 2 user01 gpu 2024-01-01T00:02:00 2024-01-01T00:02:00 2024-01-01T00:32:00 1800 16 2 + 3 user01 gpu 2024-01-01T00:10:00 2024-01-01T00:10:00 2024-01-01T02:10:00 7200 8 1 + 4 user01 gpu 2024-01-01T01:05:00 2024-01-01T01:05:00 2024-01-01T02:05:00 3600 32 4 + 5 user01 gpu 2024-01-01T01:10:00 2024-01-01T01:10:00 2024-01-01T01:40:00 1800 96 1 +""") + +# Expected raw jobs after clamping/rounding applied by generate_jobs. +# duration = max(1, ceil(elapsed_seconds / 3600)) +# cores_per_node = ncpus // nnodes, then clamped to [1, CORES_PER_NODE] +EXPECTED_RAW_BIN1 = [ + {"duration_hours": 1, "nnodes": 2, "cores_per_node": 8}, + {"duration_hours": 1, "nnodes": 2, "cores_per_node": 8}, + {"duration_hours": 2, "nnodes": 1, "cores_per_node": 8}, +] +EXPECTED_RAW_BIN2 = [ + {"duration_hours": 1, "nnodes": 4, "cores_per_node": 8}, + {"duration_hours": 1, "nnodes": 1, "cores_per_node": 96}, +] + + +def _make_sampler(log_path: str, template: bool) -> DurationSampler: + s = DurationSampler() + s.parse_jobs(log_path, 60) + if template: + s.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB) + return s + + +def _call_generate(sampler, hour, exact_replay): + rng = np.random.default_rng(0) + return generate_jobs( + current_hour=hour, + external_jobs="synthetic", # non-empty triggers --jobs path + external_hourly_jobs=None, + external_durations=None, + workload_gen=None, + jobs_sampler=sampler, + hourly_sampler=None, + durations_sampler=None, + np_random=rng, + jobs_exact_replay=exact_replay, + ) + + +def test_raw_replay_job_counts(): + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(LOG_CONTENT) + path = f.name + + sampler = _make_sampler(path, template=False) + + count1, _, _, _ = _call_generate(sampler, hour=1, exact_replay=True) + assert count1 == 3, f"bin 1 raw: expected 3 jobs, got {count1}" + + count2, _, _, _ = _call_generate(sampler, hour=2, exact_replay=True) + assert count2 == 2, f"bin 2 raw: expected 2 jobs, got {count2}" + + print(f"PASS: raw replay job counts ({count1}, {count2})") + + +def test_raw_replay_cores_populated(): + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(LOG_CONTENT) + path = f.name + + sampler = _make_sampler(path, template=False) + + for hour in (1, 2): + _, _, _, cores = _call_generate(sampler, hour=hour, exact_replay=True) + assert all(c > 0 for c in cores), \ + f"bin {hour} raw: cores_per_node contains zeros: {cores}" + + print("PASS: raw replay cores populated") + + +def test_raw_replay_job_attributes(): + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(LOG_CONTENT) + path = f.name + + sampler = _make_sampler(path, template=False) + + _, dur1, nodes1, cores1 = _call_generate(sampler, hour=1, exact_replay=True) + actual1 = sorted(zip(dur1, nodes1, cores1)) + expected1 = sorted( + (j["duration_hours"], j["nnodes"], j["cores_per_node"]) for j in EXPECTED_RAW_BIN1 + ) + assert actual1 == expected1, f"bin 1 raw jobs mismatch:\n got: {actual1}\n expected: {expected1}" + + _, dur2, nodes2, cores2 = _call_generate(sampler, hour=2, exact_replay=True) + actual2 = sorted(zip(dur2, nodes2, cores2)) + expected2 = sorted( + (j["duration_hours"], j["nnodes"], j["cores_per_node"]) for j in EXPECTED_RAW_BIN2 + ) + assert actual2 == expected2, f"bin 2 raw jobs mismatch:\n got: {actual2}\n expected: {expected2}" + + print("PASS: raw replay job attributes match expected") + + +def test_template_replay_cores_populated(): + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(LOG_CONTENT) + path = f.name + + sampler = _make_sampler(path, template=True) + + for hour in (1, 2): + count, durations, nodes, cores = _call_generate(sampler, hour=hour, exact_replay=False) + assert count > 0, f"bin {hour} template: expected jobs, got 0" + assert all(c > 0 for c in cores), \ + f"bin {hour} template: cores_per_node contains zeros: {cores}" + + print("PASS: template replay cores populated") + + +def test_template_replay_bin1_collapses_subhour_pair(): + """Two identical sub-hour jobs in bin 1 should collapse to one template entry.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(LOG_CONTENT) + path = f.name + + sampler = _make_sampler(path, template=True) + + count, durations, nodes, cores = _call_generate(sampler, hour=1, exact_replay=False) + + # Raw has 3 jobs; template collapses the two identical sub-hour jobs into one + # core-hour-equivalent entry + keeps the 2h job = 2 job instances. + assert count == 2, ( + f"bin 1 template: expected 2 job instances after sub-hour collapse, got {count}\n" + f" durations={durations}, nodes={nodes}, cores={cores}" + ) + print(f"PASS: template replay bin 1 collapses sub-hour pair → {count} instances") + + +def test_template_replay_preserves_core_hours_bin1(): + """Template mode should preserve approximate total core-hours from the raw log.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f: + f.write(LOG_CONTENT) + path = f.name + + # Raw core-hours for bin 1: + # job1: 2 nodes * 8 cores/node * (30/60)h = 8 core-hours + # job2: same = 8 core-hours + # job3: 1 node * 8 cores/node * (120/60)h = 16 core-hours + # Total = 32 core-hours + raw_core_hours_bin1 = 8 + 8 + 16 + + sampler = _make_sampler(path, template=True) + count, durations, nodes, cores = _call_generate(sampler, hour=1, exact_replay=False) + + template_core_hours = sum( + n * c * d for d, n, c in zip(durations, nodes, cores) + ) + # Allow small rounding due to ceil operations, but should be within 10% + assert abs(template_core_hours - raw_core_hours_bin1) / raw_core_hours_bin1 < 0.1, ( + f"bin 1 template core-hours {template_core_hours:.1f} deviates " + f"more than 10% from raw {raw_core_hours_bin1}" + ) + print(f"PASS: template replay bin 1 core-hours {template_core_hours:.1f} ≈ raw {raw_core_hours_bin1}") + + +def main(): + tests = [ + test_raw_replay_job_counts, + test_raw_replay_cores_populated, + test_raw_replay_job_attributes, + test_template_replay_cores_populated, + test_template_replay_bin1_collapses_subhour_pair, + test_template_replay_preserves_core_hours_bin1, + ] + failed = [] + for t in tests: + try: + t() + except AssertionError as e: + print(f"FAIL: {t.__name__}: {e}") + failed.append(t.__name__) + except Exception as e: + print(f"ERROR: {t.__name__}: {e}") + failed.append(t.__name__) + + print(f"\n{len(tests) - len(failed)}/{len(tests)} passed") + if failed: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/train.py b/train.py index ce6d993..cf476dc 100644 --- a/train.py +++ b/train.py @@ -52,13 +52,12 @@ def main(): parser.add_argument('--hourly-jobs', type=str, nargs='?', const="", default="", help='Path to Slurm log file for hourly statistical sampling (for use with hourly_sampler)') parser.add_argument('--job-arrival-scale', type=float, default=1.0, help='Scale sampled arrivals per step (1.0 = unchanged).') parser.add_argument('--jobs-exact-replay', action='store_true', help='For --jobs mode, replay raw jobs in timeline order (no template aggregation).') - parser.add_argument('--jobs-exact-replay-aggregate', action='store_true', help='With --jobs-exact-replay, aggregate each sampled raw time-bin before enqueueing.') parser.add_argument('--plot-rewards', action='store_true', help='Per step, plot rewards for all possible num_idle_nodes & num_used_nodes (default: False).') - parser.add_argument('--plot-eff-reward', action=argparse.BooleanOptionalAction, default=True, help='Include efficiency reward in the plot (dashed line).') - parser.add_argument('--plot-price-reward', action=argparse.BooleanOptionalAction, default=True, help='Include price reward in the plot (dashed line).') - parser.add_argument('--plot-idle-penalty', action=argparse.BooleanOptionalAction, default=True, help='Include idle penalty in the plot (dashed line).') - parser.add_argument('--plot-job-age-penalty', action=argparse.BooleanOptionalAction, default=True, help='Include job age penalty in the plot (dashed line).') - parser.add_argument('--plot-total-reward', action=argparse.BooleanOptionalAction, default=True, help='Include total reward per step in the dashboard (raw values).') + parser.add_argument('--plot-eff-reward', action=argparse.BooleanOptionalAction, default=False, help='Include efficiency reward in the plot (dashed line).') + parser.add_argument('--plot-price-reward', action=argparse.BooleanOptionalAction, default=False, help='Include price reward in the plot (dashed line).') + parser.add_argument('--plot-idle-penalty', action=argparse.BooleanOptionalAction, default=False, help='Include idle penalty in the plot (dashed line).') + parser.add_argument('--plot-job-age-penalty', action=argparse.BooleanOptionalAction, default=False, help='Include job age penalty in the plot (dashed line).') + parser.add_argument('--plot-total-reward', action=argparse.BooleanOptionalAction, default=False, help='Include total reward per step in the dashboard (raw values).') parser.add_argument('--plot-price', action=argparse.BooleanOptionalAction, default=True, help='Plot electricity price.') parser.add_argument('--plot-online-nodes', action=argparse.BooleanOptionalAction, default=True, help='Plot online nodes.') parser.add_argument('--plot-used-nodes', action=argparse.BooleanOptionalAction, default=True, help='Plot used nodes.') @@ -91,8 +90,6 @@ def main(): parser.error(str(exc)) if args.jobs_exact_replay and not norm_path(args.jobs): parser.error("--jobs-exact-replay requires --jobs") - if args.jobs_exact_replay_aggregate and not args.jobs_exact_replay: - parser.error("--jobs-exact-replay-aggregate requires --jobs-exact-replay") if args.workload_gen and args.job_arrival_scale != 1.0: print( "Warning: --job-arrival-scale is not allowed with --workload-gen; " @@ -177,8 +174,7 @@ def main(): evaluation_mode=args.evaluate_savings, workload_gen=workload_gen, job_arrival_scale=args.job_arrival_scale, - jobs_exact_replay=args.jobs_exact_replay, - jobs_exact_replay_aggregate=args.jobs_exact_replay_aggregate) + jobs_exact_replay=args.jobs_exact_replay) env.session_dir = session_root env.plots_dir = plots_dir env.reset(seed=args.seed) diff --git a/train_iter.py b/train_iter.py index 70c011e..1c728c2 100644 --- a/train_iter.py +++ b/train_iter.py @@ -112,7 +112,6 @@ def build_command( hourly_jobs, job_arrival_scale, jobs_exact_replay, - jobs_exact_replay_aggregate, plot_dashboard=False, dashboard_hours=24 * 14, seed=None, @@ -139,8 +138,6 @@ def build_command( ] if jobs_exact_replay: command += ["--jobs-exact-replay"] - if jobs_exact_replay_aggregate: - command += ["--jobs-exact-replay-aggregate"] if plot_dashboard: command += ["--plot-dashboard", "--dashboard-hours", str(dashboard_hours)] if seed is not None: @@ -365,7 +362,7 @@ def _reap(proc, label, fh, elapsed): def run_all_parallel(combinations, max_parallel, iter_limit_per_step, session, prices, job_durations, jobs, hourly_jobs, job_arrival_scale, jobs_exact_replay, - jobs_exact_replay_aggregate, plot_dashboard, dashboard_hours, + plot_dashboard, dashboard_hours, seeds, seed_sweep, evaluate_savings, eval_months, workloadgen_args, no_tui=False): multi_seed = len(seeds) > 1 @@ -381,7 +378,7 @@ def launch(combo, seed): command = build_command( efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, iter_limit_per_step, session, prices, job_durations, jobs, hourly_jobs, - job_arrival_scale, jobs_exact_replay, jobs_exact_replay_aggregate, + job_arrival_scale, jobs_exact_replay, plot_dashboard, dashboard_hours, seed, seed_sweep, evaluate_savings, eval_months, workloadgen_args, ) @@ -432,7 +429,6 @@ def main(): parser.add_argument('--hourly-jobs', type=str, nargs='?', const="", default="", help='Path to Slurm log file for hourly statistical sampling (for use with hourly_sampler)') parser.add_argument('--job-arrival-scale', type=float, default=1.0, help='Scale sampled arrivals per step (forwarded to train.py).') parser.add_argument('--jobs-exact-replay', action='store_true', help='Forward to train.py: replay raw jobs in timeline order for --jobs mode.') - parser.add_argument('--jobs-exact-replay-aggregate', action='store_true', help='Forward to train.py: aggregate per-step raw jobs in exact replay mode.') parser.add_argument("--fix-weights", type=str, help="Comma-separated list of weights to fix (efficiency,price,idle,job-age,drop)") parser.add_argument("--fix-values", type=str, help="Comma-separated list of values for fixed weights") parser.add_argument("--iter-limit-per-step", type=int, help="Max number of training iterations per step (1 iteration = {TIMESTEPS} steps)") @@ -458,8 +454,6 @@ def main(): parser.error(str(exc)) if args.jobs_exact_replay and not norm_path(args.jobs): parser.error("--jobs-exact-replay requires --jobs") - if args.jobs_exact_replay_aggregate and not args.jobs_exact_replay: - parser.error("--jobs-exact-replay-aggregate requires --jobs-exact-replay") if args.workload_gen and args.job_arrival_scale != 1.0: parser.error("--job-arrival-scale is not supported with --workload-gen. Use workload generator arrival settings instead.") @@ -502,7 +496,6 @@ def main(): hourly_jobs=args.hourly_jobs, job_arrival_scale=args.job_arrival_scale, jobs_exact_replay=args.jobs_exact_replay, - jobs_exact_replay_aggregate=args.jobs_exact_replay_aggregate, plot_dashboard=args.plot_dashboard, dashboard_hours=args.dashboard_hours, seeds=seeds,