diff --git a/src/environment.py b/src/environment.py index 37b0b6e..2dc1da9 100644 --- a/src/environment.py +++ b/src/environment.py @@ -72,7 +72,9 @@ def __init__(self, steps_per_iteration: int, evaluation_mode: bool = False, workload_gen: WorkloadGenerator | None = None, - ) -> None: + job_arrival_scale: float = 1.0, + jobs_exact_replay: bool = False, + jobs_exact_replay_aggregate: bool = False) -> None: super().__init__() self.weights = weights @@ -85,6 +87,9 @@ def __init__(self, self.plot_config = plot_config self.steps_per_iteration = steps_per_iteration self.evaluation_mode = evaluation_mode + self.job_arrival_scale = float(job_arrival_scale) + self.jobs_exact_replay = bool(jobs_exact_replay) + self.jobs_exact_replay_aggregate = bool(jobs_exact_replay_aggregate) self.next_plot_save = self.steps_per_iteration @@ -111,16 +116,26 @@ def __init__(self, self.jobs_sampler.parse_jobs(self.external_jobs, 60) print(f"Parsed jobs for {len(self.jobs_sampler.jobs)} hours") print(f"Parsed aggregated jobs for {len(self.jobs_sampler.aggregated_jobs)} hours") - self.jobs_sampler.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB) - print(f"Max jobs per hour: {self.jobs_sampler.max_new_jobs_per_hour}") - print(f"Max job duration: {self.jobs_sampler.max_job_duration}") - print(f"Parsed hourly jobs for {len(self.jobs_sampler.hourly_jobs)} hours") + if self.jobs_exact_replay: + max_raw_jobs = max((len(v) for v in self.jobs_sampler.jobs.values()), default=0) + if self.jobs_exact_replay_aggregate: + print("Jobs replay mode: exact timeline (aggregated per step)") + else: + print("Jobs replay mode: exact timeline (raw jobs per hour)") + print(f"Max raw jobs per hour: {max_raw_jobs}") + else: + self.jobs_sampler.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB) + print("Jobs replay mode: aggregated hourly templates") + print(f"Max jobs per hour: {self.jobs_sampler.max_new_jobs_per_hour}") + print(f"Max job duration: {self.jobs_sampler.max_job_duration}") + print(f"Parsed hourly jobs for {len(self.jobs_sampler.hourly_jobs)} hours") if self.external_hourly_jobs: print(f"Loading hourly jobs from {self.external_hourly_jobs}") hourly_sampler.parse_jobs(self.external_hourly_jobs) hourly_sampler.precalculate_hourly_templates(CORES_PER_NODE, MAX_NODES_PER_JOB) print(f"Hourly sampler initialized with 24-hour distributions") + print(f"Job arrival scale: {self.job_arrival_scale:.3f}x") self.current_step = 0 self.current_episode = 0 @@ -326,7 +341,10 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool, self.metrics.current_hour, self.external_jobs, self.external_hourly_jobs, self.external_durations, self.workload_gen, self.jobs_sampler if hasattr(self, 'jobs_sampler') else None, - hourly_sampler, durations_sampler, self.np_random + hourly_sampler, durations_sampler, self.np_random, + job_arrival_scale=self.job_arrival_scale, + jobs_exact_replay=self.jobs_exact_replay, + jobs_exact_replay_aggregate=self.jobs_exact_replay_aggregate, ) # Add new jobs to queue (overflow goes to helper) diff --git a/src/sampler_hourly.py b/src/sampler_hourly.py index e863f39..07a236e 100644 --- a/src/sampler_hourly.py +++ b/src/sampler_hourly.py @@ -273,7 +273,12 @@ def precalculate_hourly_templates(self, cores_per_node: int, max_nodes_per_job: f"({tmpl['sub_hour_bins']} sub-hour bins, {tmpl['hourly_jobs']} hourly+ jobs)") print(f" Total: {total_orig} jobs -> {total_templates} templates") - def sample_aggregated(self, hour_of_day: int, rng: np.random.Generator) -> list[dict[str, int]]: + def sample_aggregated( + self, + hour_of_day: int, + rng: np.random.Generator, + arrival_scale: float = 1.0, + ) -> list[dict[str, int]]: """ Sample aggregated hourly jobs for a given hour of day. @@ -283,6 +288,10 @@ def sample_aggregated(self, hour_of_day: int, rng: np.random.Generator) -> list[ Args: hour_of_day: Hour of day (0-23) rng: NumPy random generator + arrival_scale: Multiplier for sampled job count before template scaling. + - 1.0: unchanged + - >1.0: upsample arrivals + - 0.0..1.0: downsample arrivals Returns: list: List of job dictionaries with keys: nodes, cores_per_node, duration_hours @@ -297,7 +306,19 @@ def sample_aggregated(self, hour_of_day: int, rng: np.random.Generator) -> list[ tmpl = self.hourly_templates[hour_of_day] # Sample number of jobs for this hour (can be 0) - num_jobs = rng.choice(dist["job_count"]) + num_jobs = int(rng.choice(dist["job_count"])) + + # Apply arrival scaling directly to the sampled count so true hourly + # statistical sampling can be controlled via --job-arrival-scale. + if num_jobs > 0 and arrival_scale != 1.0: + if arrival_scale <= 0.0: + return [] + whole = int(math.floor(arrival_scale)) + frac = float(arrival_scale - whole) + scaled_num_jobs = num_jobs * whole + if frac > 0.0: + scaled_num_jobs += int(rng.binomial(num_jobs, frac)) + num_jobs = scaled_num_jobs if num_jobs <= 0 or len(tmpl["templates"]) == 0: return [] diff --git a/src/sampler_jobs.py b/src/sampler_jobs.py index 1a89daf..5b2d18a 100644 --- a/src/sampler_jobs.py +++ b/src/sampler_jobs.py @@ -315,6 +315,7 @@ def convert_to_hourly_jobs(self, aggregated_jobs: list[Job], cores_per_node: int - List of hourly simulation job dictionaries """ hourly_jobs: list[Job] = [] + expanded_jobs_count = 0 for agg_job in aggregated_jobs: # Calculate total compute resources needed @@ -340,27 +341,36 @@ def convert_to_hourly_jobs(self, aggregated_jobs: list[Job], cores_per_node: int 'nnodes': equivalent_nodes, 'cores_per_node': cores_per_node_needed, 'duration_hours': 1, # 1 hour - 'original_job_count': agg_job['count'] + 'original_job_count': agg_job['count'], + # This template already represents the full sub-hour bin. + # Do not multiply by original_job_count during replay. + 'instances': 1, } hourly_jobs.append(hourly_job) else: # For longer jobs, keep the original structure but convert to hours - # with appropriate scaling + # with appropriate scaling. Replay must create one instance per + # original job to preserve core-hours and concurrency. duration_hours = math.ceil(agg_job['duration_minutes'] / 60) + # Keep replay templates within the simulated cluster limits. + nnodes = min(max(agg_job['nnodes'], 1), max_nodes_per_job) + cores_per_node_needed = min(max(agg_job['cores_per_node'], 1), cores_per_node) hourly_job = { - 'nnodes': agg_job['nnodes'], - 'cores_per_node': agg_job['cores_per_node'], + 'nnodes': nnodes, + 'cores_per_node': cores_per_node_needed, 'duration_hours': duration_hours, - 'original_job_count': agg_job['count'] + 'original_job_count': agg_job['count'], + 'instances': agg_job['count'], } hourly_jobs.append(hourly_job) if hourly_job['duration_hours'] > self.max_job_duration: self.max_job_duration = hourly_job['duration_hours'] + expanded_jobs_count += hourly_job.get('instances', 1) - if len(hourly_jobs) > self.max_new_jobs_per_hour: - self.max_new_jobs_per_hour = len(hourly_jobs) + if expanded_jobs_count > self.max_new_jobs_per_hour: + self.max_new_jobs_per_hour = expanded_jobs_count return hourly_jobs diff --git a/src/workload_generator.py b/src/workload_generator.py index 2928c5a..fb948be 100644 --- a/src/workload_generator.py +++ b/src/workload_generator.py @@ -2,6 +2,7 @@ from __future__ import annotations +import math from typing import TYPE_CHECKING import numpy as np from src.config import ( @@ -26,6 +27,9 @@ def generate_jobs( hourly_sampler: HourlySampler, durations_sampler: DurationsSampler, np_random: np.random.Generator, + job_arrival_scale: float = 1.0, + jobs_exact_replay: bool = False, + jobs_exact_replay_aggregate: bool = False, ) -> tuple[int, list[int], list[int], list[int]]: """ Generate new jobs for the current hour using configured workload source. @@ -40,6 +44,13 @@ def generate_jobs( hourly_sampler: Hourly sampler object durations_sampler: Durations sampler object np_random: NumPy random generator + job_arrival_scale: Multiplier for sampled arrivals per step. + - 1.0: unchanged + - >1.0: upsample jobs + - 0.0..1.0: downsample jobs + jobs_exact_replay: If True, replay raw jobs in log order for --jobs mode. + jobs_exact_replay_aggregate: In exact replay mode, aggregate each sampled + raw time-bin into compact hourly-equivalent templates. Returns: Tuple of (new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores) @@ -48,22 +59,55 @@ def generate_jobs( new_jobs_nodes = [] new_jobs_cores = [] new_jobs_count = 0 + arrival_scale_applied_in_source = False if external_jobs and not workload_gen: - # Use jobs sampler for pattern-based replay - jobs = jobs_sampler.sample_one_hourly(wrap=True)["hourly_jobs"] - if len(jobs) > 0: - for job in jobs: - new_jobs_count += 1 - new_jobs_durations.append(job['duration_hours']) - new_jobs_nodes.append(job['nnodes']) - new_jobs_cores.append(job['cores_per_node']) + if jobs_exact_replay: + # Replay jobs exactly as they appear in the parsed timeline (one bin per step). + sampled = jobs_sampler.sample(1, wrap=True) + raw_jobs = next(iter(sampled.values()), []) + if jobs_exact_replay_aggregate and raw_jobs: + aggregated_jobs = jobs_sampler.aggregate_jobs(raw_jobs) + hourly_jobs = jobs_sampler.convert_to_hourly_jobs( + aggregated_jobs, CORES_PER_NODE, MAX_NODES_PER_JOB + ) + for job in hourly_jobs: + instances = max(1, int(job.get('instances', 1))) + new_jobs_count += instances + new_jobs_durations.extend([int(job['duration_hours'])] * instances) + new_jobs_nodes.extend([int(job['nnodes'])] * instances) + new_jobs_cores.extend([int(job['cores_per_node'])] * instances) + else: + for job in raw_jobs: + duration_hours = max(1, int(math.ceil(int(job['duration_minutes']) / 60))) + nnodes = min(max(int(job['nnodes']), MIN_NODES_PER_JOB), MAX_NODES_PER_JOB) + cores_per_node = min(max(int(job['cores_per_node']), MIN_CORES_PER_JOB), CORES_PER_NODE) + new_jobs_count += 1 + new_jobs_durations.append(duration_hours) + new_jobs_nodes.append(nnodes) + new_jobs_cores.append(cores_per_node) + else: + # Use pre-aggregated hourly templates for pattern-based replay. + sampled = jobs_sampler.sample_one_hourly(wrap=True) + jobs = sampled.get("hourly_jobs", []) + if len(jobs) > 0: + for job in jobs: + instances = max(1, int(job.get('instances', 1))) + new_jobs_count += instances + new_jobs_durations.extend([job['duration_hours']] * instances) + new_jobs_nodes.extend([job['nnodes']] * instances) + new_jobs_cores.extend([job['cores_per_node']] * instances) elif external_hourly_jobs: # Use hourly sampler for statistical sampling with aggregated jobs hour_of_day = (current_hour - 1) % 24 - jobs = hourly_sampler.sample_aggregated(hour_of_day, rng=np_random) + jobs = hourly_sampler.sample_aggregated( + hour_of_day, + rng=np_random, + arrival_scale=job_arrival_scale, + ) + arrival_scale_applied_in_source = True if len(jobs) > 0: for job in jobs: @@ -94,4 +138,33 @@ def generate_jobs( new_jobs_nodes.append(np_random.integers(MIN_NODES_PER_JOB, MAX_NODES_PER_JOB + 1)) new_jobs_cores.append(np_random.integers(MIN_CORES_PER_JOB, CORES_PER_NODE + 1)) + # Global arrival scaling for sources that do not apply it internally. + if (not arrival_scale_applied_in_source) and new_jobs_count > 0 and job_arrival_scale != 1.0: + if job_arrival_scale <= 0.0: + return 0, [], [], [] + + whole = int(np.floor(job_arrival_scale)) + frac = float(job_arrival_scale - whole) + + scaled_durations: list[int] = [] + scaled_nodes: list[int] = [] + scaled_cores: list[int] = [] + + if whole > 0: + scaled_durations.extend(new_jobs_durations * whole) + scaled_nodes.extend(new_jobs_nodes * whole) + scaled_cores.extend(new_jobs_cores * whole) + + if frac > 0.0: + for d, n, c in zip(new_jobs_durations, new_jobs_nodes, new_jobs_cores): + if np_random.random() < frac: + scaled_durations.append(d) + scaled_nodes.append(n) + scaled_cores.append(c) + + new_jobs_durations = scaled_durations + new_jobs_nodes = scaled_nodes + new_jobs_cores = scaled_cores + new_jobs_count = len(new_jobs_durations) + return new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores diff --git a/train.py b/train.py index 19124f7..b11ee0c 100644 --- a/train.py +++ b/train.py @@ -9,6 +9,7 @@ import re import glob import argparse +import sys import pandas as pd from src.workloadgen import WorkloadGenerator from src.workloadgen_cli import add_workloadgen_args, build_workloadgen_config @@ -33,6 +34,10 @@ def main(): parser.add_argument('--job-durations', type=str, nargs='?', const="", default="", help='Path to a file containing job duration samples (for use with durations_sampler)') parser.add_argument('--jobs', type=str, nargs='?', const="", default="", help='Path to a file containing job samples (for use with jobs_sampler)') parser.add_argument('--hourly-jobs', type=str, nargs='?', const="", default="", help='Path to Slurm log file for hourly statistical sampling (for use with hourly_sampler)') + parser.add_argument('--job-arrival-scale', type=float, default=1.0, help='Scale sampled arrivals per step (1.0 = unchanged).') + parser.add_argument('--jobs-exact-replay', action='store_true', help='For --jobs mode, replay raw jobs in timeline order (no template aggregation).') + parser.add_argument('--jobs-exact-replay-aggregate', action='store_true', help='With --jobs-exact-replay, aggregate each sampled raw time-bin before enqueueing.') + parser.add_argument('--plot-rewards', action='store_true', help='Per step, plot rewards for all possible num_idle_nodes & num_used_nodes (default: False).') parser.add_argument('--plot-eff-reward', action=argparse.BooleanOptionalAction, default=True, help='Include efficiency reward in the plot (dashed line).') parser.add_argument('--plot-price-reward', action=argparse.BooleanOptionalAction, default=True, help='Include price reward in the plot (dashed line).') parser.add_argument('--plot-idle-penalty', action=argparse.BooleanOptionalAction, default=True, help='Include idle penalty in the plot (dashed line).') @@ -62,6 +67,20 @@ def main(): parser.add_argument("--print-policy", action="store_true", help="Print structure of the policy network.") args = parser.parse_args() + if args.job_arrival_scale < 0.0: + parser.error("--job-arrival-scale must be >= 0.0") + if args.jobs_exact_replay and not norm_path(args.jobs): + parser.error("--jobs-exact-replay requires --jobs") + if args.jobs_exact_replay_aggregate and not args.jobs_exact_replay: + parser.error("--jobs-exact-replay-aggregate requires --jobs-exact-replay") + if args.workload_gen and args.job_arrival_scale != 1.0: + print( + "Warning: --job-arrival-scale is not allowed with --workload-gen; " + "resetting it to 1.0. Use workload generator arrival settings instead.", + file=sys.stderr, + ) + args.job_arrival_scale = 1.0 + prices_file_path = args.prices job_durations_file_path = args.job_durations jobs_file_path = args.jobs @@ -133,7 +152,10 @@ def main(): plot_config=plot_config, steps_per_iteration=STEPS_PER_ITERATION, evaluation_mode=args.evaluate_savings, - workload_gen=workload_gen) + workload_gen=workload_gen, + job_arrival_scale=args.job_arrival_scale, + jobs_exact_replay=args.jobs_exact_replay, + jobs_exact_replay_aggregate=args.jobs_exact_replay_aggregate) env.reset(seed=args.seed) # Check if there are any saved models in models_dir diff --git a/train_iter.py b/train_iter.py index 6e4d55d..d91ce2d 100644 --- a/train_iter.py +++ b/train_iter.py @@ -7,6 +7,11 @@ import time from src.workloadgen_cli import add_workloadgen_args, build_workloadgen_cli_args + +def norm_path(x): + return None if (x is None or str(x).strip() == "") else x + + def generate_weight_combinations(step=0.1, fixed_weights=None): weights = np.linspace(0, 1, num=int(1/step) + 1, endpoint=True) combinations = [] @@ -103,6 +108,9 @@ def build_command( job_durations, jobs, hourly_jobs, + job_arrival_scale, + jobs_exact_replay, + jobs_exact_replay_aggregate, plot_dashboard=False, dashboard_hours=24 * 14, seed=None, @@ -123,8 +131,13 @@ def build_command( "--job-durations", f"{job_durations}", "--jobs", f"{jobs}", "--hourly-jobs", f"{hourly_jobs}", + "--job-arrival-scale", f"{job_arrival_scale}", "--session", f"{session}" ] + if jobs_exact_replay: + command += ["--jobs-exact-replay"] + if jobs_exact_replay_aggregate: + command += ["--jobs-exact-replay-aggregate"] if plot_dashboard: command += ["--plot-dashboard", "--dashboard-hours", str(dashboard_hours)] if seed is not None: @@ -137,7 +150,7 @@ def build_command( def run_all_parallel(combinations, max_parallel, iter_limit_per_step, session, prices, - job_durations, jobs, hourly_jobs, plot_dashboard, dashboard_hours, + job_durations, jobs, hourly_jobs, job_arrival_scale, jobs_exact_replay, jobs_exact_replay_aggregate, plot_dashboard, dashboard_hours, seed, evaluate_savings, eval_months, workloadgen_args): active = [] # list of (proc, label) current_env = os.environ.copy() @@ -165,7 +178,7 @@ def run_all_parallel(combinations, max_parallel, iter_limit_per_step, session, p command = build_command( efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, - iter_limit_per_step, session, prices, job_durations, jobs, hourly_jobs, + iter_limit_per_step, session, prices, job_durations, jobs, hourly_jobs, job_arrival_scale, jobs_exact_replay, jobs_exact_replay_aggregate, plot_dashboard, dashboard_hours, seed, evaluate_savings, eval_months, workloadgen_args, @@ -211,6 +224,9 @@ def main(): parser.add_argument('--job-durations', type=str, nargs='?', const="", default="", help='Path to a file containing job duration samples (for use with duration_sampler)') parser.add_argument('--jobs', type=str, nargs='?', const="", default="", help='Path to a file containing jobs samples (for use with jobs_sampler)') parser.add_argument('--hourly-jobs', type=str, nargs='?', const="", default="", help='Path to Slurm log file for hourly statistical sampling (for use with hourly_sampler)') + parser.add_argument('--job-arrival-scale', type=float, default=1.0, help='Scale sampled arrivals per step (forwarded to train.py).') + parser.add_argument('--jobs-exact-replay', action='store_true', help='Forward to train.py: replay raw jobs in timeline order for --jobs mode.') + parser.add_argument('--jobs-exact-replay-aggregate', action='store_true', help='Forward to train.py: aggregate per-step raw jobs in exact replay mode.') parser.add_argument("--fix-weights", type=str, help="Comma-separated list of weights to fix (efficiency,price,idle,job-age,drop)") parser.add_argument("--fix-values", type=str, help="Comma-separated list of values for fixed weights") parser.add_argument("--iter-limit-per-step", type=int, help="Max number of training iterations per step (1 iteration = {TIMESTEPS} steps)") @@ -228,6 +244,14 @@ def main(): if args.parallel < 1: parser.error("--parallel must be at least 1") + if args.job_arrival_scale < 0.0: + parser.error("--job-arrival-scale must be >= 0.0") + if args.jobs_exact_replay and not norm_path(args.jobs): + parser.error("--jobs-exact-replay requires --jobs") + if args.jobs_exact_replay_aggregate and not args.jobs_exact_replay: + parser.error("--jobs-exact-replay-aggregate requires --jobs-exact-replay") + if args.workload_gen and args.job_arrival_scale != 1.0: + parser.error("--job-arrival-scale is not supported with --workload-gen. Use workload generator arrival settings instead.") try: fixed_weights = parse_fixed_weights(args.fix_weights, args.fix_values) @@ -256,6 +280,9 @@ def main(): job_durations=args.job_durations, jobs=args.jobs, hourly_jobs=args.hourly_jobs, + job_arrival_scale=args.job_arrival_scale, + jobs_exact_replay=args.jobs_exact_replay, + jobs_exact_replay_aggregate=args.jobs_exact_replay_aggregate, plot_dashboard=args.plot_dashboard, dashboard_hours=args.dashboard_hours, seed=args.seed,