Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def __init__(self,
steps_per_iteration: int,
evaluation_mode: bool = False,
workload_gen: WorkloadGenerator | None = None,
) -> None:
job_arrival_scale: float = 1.0,
jobs_exact_replay: bool = False,
jobs_exact_replay_aggregate: bool = False) -> None:
super().__init__()

self.weights = weights
Expand All @@ -85,6 +87,9 @@ def __init__(self,
self.plot_config = plot_config
self.steps_per_iteration = steps_per_iteration
self.evaluation_mode = evaluation_mode
self.job_arrival_scale = float(job_arrival_scale)
self.jobs_exact_replay = bool(jobs_exact_replay)
self.jobs_exact_replay_aggregate = bool(jobs_exact_replay_aggregate)

self.next_plot_save = self.steps_per_iteration

Expand All @@ -111,16 +116,26 @@ def __init__(self,
self.jobs_sampler.parse_jobs(self.external_jobs, 60)
print(f"Parsed jobs for {len(self.jobs_sampler.jobs)} hours")
print(f"Parsed aggregated jobs for {len(self.jobs_sampler.aggregated_jobs)} hours")
self.jobs_sampler.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB)
print(f"Max jobs per hour: {self.jobs_sampler.max_new_jobs_per_hour}")
print(f"Max job duration: {self.jobs_sampler.max_job_duration}")
print(f"Parsed hourly jobs for {len(self.jobs_sampler.hourly_jobs)} hours")
if self.jobs_exact_replay:
max_raw_jobs = max((len(v) for v in self.jobs_sampler.jobs.values()), default=0)
if self.jobs_exact_replay_aggregate:
print("Jobs replay mode: exact timeline (aggregated per step)")
else:
print("Jobs replay mode: exact timeline (raw jobs per hour)")
print(f"Max raw jobs per hour: {max_raw_jobs}")
else:
self.jobs_sampler.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB)
print("Jobs replay mode: aggregated hourly templates")
print(f"Max jobs per hour: {self.jobs_sampler.max_new_jobs_per_hour}")
print(f"Max job duration: {self.jobs_sampler.max_job_duration}")
print(f"Parsed hourly jobs for {len(self.jobs_sampler.hourly_jobs)} hours")

if self.external_hourly_jobs:
print(f"Loading hourly jobs from {self.external_hourly_jobs}")
hourly_sampler.parse_jobs(self.external_hourly_jobs)
hourly_sampler.precalculate_hourly_templates(CORES_PER_NODE, MAX_NODES_PER_JOB)
print(f"Hourly sampler initialized with 24-hour distributions")
print(f"Job arrival scale: {self.job_arrival_scale:.3f}x")

self.current_step = 0
self.current_episode = 0
Expand Down Expand Up @@ -326,7 +341,10 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
self.metrics.current_hour,
self.external_jobs, self.external_hourly_jobs, self.external_durations,
self.workload_gen, self.jobs_sampler if hasattr(self, 'jobs_sampler') else None,
hourly_sampler, durations_sampler, self.np_random
hourly_sampler, durations_sampler, self.np_random,
job_arrival_scale=self.job_arrival_scale,
jobs_exact_replay=self.jobs_exact_replay,
jobs_exact_replay_aggregate=self.jobs_exact_replay_aggregate,
)

# Add new jobs to queue (overflow goes to helper)
Expand Down
25 changes: 23 additions & 2 deletions src/sampler_hourly.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,12 @@ def precalculate_hourly_templates(self, cores_per_node: int, max_nodes_per_job:
f"({tmpl['sub_hour_bins']} sub-hour bins, {tmpl['hourly_jobs']} hourly+ jobs)")
print(f" Total: {total_orig} jobs -> {total_templates} templates")

def sample_aggregated(self, hour_of_day: int, rng: np.random.Generator) -> list[dict[str, int]]:
def sample_aggregated(
self,
hour_of_day: int,
rng: np.random.Generator,
arrival_scale: float = 1.0,
) -> list[dict[str, int]]:
"""
Sample aggregated hourly jobs for a given hour of day.

Expand All @@ -283,6 +288,10 @@ def sample_aggregated(self, hour_of_day: int, rng: np.random.Generator) -> list[
Args:
hour_of_day: Hour of day (0-23)
rng: NumPy random generator
arrival_scale: Multiplier for sampled job count before template scaling.
- 1.0: unchanged
- >1.0: upsample arrivals
- 0.0..1.0: downsample arrivals

Returns:
list: List of job dictionaries with keys: nodes, cores_per_node, duration_hours
Expand All @@ -297,7 +306,19 @@ def sample_aggregated(self, hour_of_day: int, rng: np.random.Generator) -> list[
tmpl = self.hourly_templates[hour_of_day]

# Sample number of jobs for this hour (can be 0)
num_jobs = rng.choice(dist["job_count"])
num_jobs = int(rng.choice(dist["job_count"]))

# Apply arrival scaling directly to the sampled count so true hourly
# statistical sampling can be controlled via --job-arrival-scale.
if num_jobs > 0 and arrival_scale != 1.0:
if arrival_scale <= 0.0:
return []
whole = int(math.floor(arrival_scale))
frac = float(arrival_scale - whole)
scaled_num_jobs = num_jobs * whole
if frac > 0.0:
scaled_num_jobs += int(rng.binomial(num_jobs, frac))
num_jobs = scaled_num_jobs

if num_jobs <= 0 or len(tmpl["templates"]) == 0:
return []
Expand Down
24 changes: 17 additions & 7 deletions src/sampler_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def convert_to_hourly_jobs(self, aggregated_jobs: list[Job], cores_per_node: int
- List of hourly simulation job dictionaries
"""
hourly_jobs: list[Job] = []
expanded_jobs_count = 0

for agg_job in aggregated_jobs:
# Calculate total compute resources needed
Expand All @@ -340,27 +341,36 @@ def convert_to_hourly_jobs(self, aggregated_jobs: list[Job], cores_per_node: int
'nnodes': equivalent_nodes,
'cores_per_node': cores_per_node_needed,
'duration_hours': 1, # 1 hour
'original_job_count': agg_job['count']
'original_job_count': agg_job['count'],
# This template already represents the full sub-hour bin.
# Do not multiply by original_job_count during replay.
'instances': 1,
}
hourly_jobs.append(hourly_job)
else:
# For longer jobs, keep the original structure but convert to hours
# with appropriate scaling
# with appropriate scaling. Replay must create one instance per
# original job to preserve core-hours and concurrency.
duration_hours = math.ceil(agg_job['duration_minutes'] / 60)
# Keep replay templates within the simulated cluster limits.
nnodes = min(max(agg_job['nnodes'], 1), max_nodes_per_job)
cores_per_node_needed = min(max(agg_job['cores_per_node'], 1), cores_per_node)

hourly_job = {
'nnodes': agg_job['nnodes'],
'cores_per_node': agg_job['cores_per_node'],
'nnodes': nnodes,
'cores_per_node': cores_per_node_needed,
'duration_hours': duration_hours,
'original_job_count': agg_job['count']
'original_job_count': agg_job['count'],
'instances': agg_job['count'],
}
hourly_jobs.append(hourly_job)

if hourly_job['duration_hours'] > self.max_job_duration:
self.max_job_duration = hourly_job['duration_hours']
expanded_jobs_count += hourly_job.get('instances', 1)

if len(hourly_jobs) > self.max_new_jobs_per_hour:
self.max_new_jobs_per_hour = len(hourly_jobs)
if expanded_jobs_count > self.max_new_jobs_per_hour:
self.max_new_jobs_per_hour = expanded_jobs_count

return hourly_jobs

Expand Down
91 changes: 82 additions & 9 deletions src/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import math
from typing import TYPE_CHECKING
import numpy as np
from src.config import (
Expand All @@ -26,6 +27,9 @@ def generate_jobs(
hourly_sampler: HourlySampler,
durations_sampler: DurationsSampler,
np_random: np.random.Generator,
job_arrival_scale: float = 1.0,
jobs_exact_replay: bool = False,
jobs_exact_replay_aggregate: bool = False,
) -> tuple[int, list[int], list[int], list[int]]:
"""
Generate new jobs for the current hour using configured workload source.
Expand All @@ -40,6 +44,13 @@ def generate_jobs(
hourly_sampler: Hourly sampler object
durations_sampler: Durations sampler object
np_random: NumPy random generator
job_arrival_scale: Multiplier for sampled arrivals per step.
- 1.0: unchanged
- >1.0: upsample jobs
- 0.0..1.0: downsample jobs
jobs_exact_replay: If True, replay raw jobs in log order for --jobs mode.
jobs_exact_replay_aggregate: In exact replay mode, aggregate each sampled
raw time-bin into compact hourly-equivalent templates.

Returns:
Tuple of (new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores)
Expand All @@ -48,22 +59,55 @@ def generate_jobs(
new_jobs_nodes = []
new_jobs_cores = []
new_jobs_count = 0
arrival_scale_applied_in_source = False

if external_jobs and not workload_gen:
# Use jobs sampler for pattern-based replay
jobs = jobs_sampler.sample_one_hourly(wrap=True)["hourly_jobs"]
if len(jobs) > 0:
for job in jobs:
new_jobs_count += 1
new_jobs_durations.append(job['duration_hours'])
new_jobs_nodes.append(job['nnodes'])
new_jobs_cores.append(job['cores_per_node'])
if jobs_exact_replay:
# Replay jobs exactly as they appear in the parsed timeline (one bin per step).
sampled = jobs_sampler.sample(1, wrap=True)
raw_jobs = next(iter(sampled.values()), [])
if jobs_exact_replay_aggregate and raw_jobs:
aggregated_jobs = jobs_sampler.aggregate_jobs(raw_jobs)
hourly_jobs = jobs_sampler.convert_to_hourly_jobs(
aggregated_jobs, CORES_PER_NODE, MAX_NODES_PER_JOB
)
for job in hourly_jobs:
instances = max(1, int(job.get('instances', 1)))
new_jobs_count += instances
new_jobs_durations.extend([int(job['duration_hours'])] * instances)
new_jobs_nodes.extend([int(job['nnodes'])] * instances)
new_jobs_cores.extend([int(job['cores_per_node'])] * instances)
else:
for job in raw_jobs:
duration_hours = max(1, int(math.ceil(int(job['duration_minutes']) / 60)))
nnodes = min(max(int(job['nnodes']), MIN_NODES_PER_JOB), MAX_NODES_PER_JOB)
cores_per_node = min(max(int(job['cores_per_node']), MIN_CORES_PER_JOB), CORES_PER_NODE)
new_jobs_count += 1
new_jobs_durations.append(duration_hours)
new_jobs_nodes.append(nnodes)
new_jobs_cores.append(cores_per_node)
else:
# Use pre-aggregated hourly templates for pattern-based replay.
sampled = jobs_sampler.sample_one_hourly(wrap=True)
jobs = sampled.get("hourly_jobs", [])
if len(jobs) > 0:
for job in jobs:
instances = max(1, int(job.get('instances', 1)))
new_jobs_count += instances
new_jobs_durations.extend([job['duration_hours']] * instances)
new_jobs_nodes.extend([job['nnodes']] * instances)
new_jobs_cores.extend([job['cores_per_node']] * instances)

elif external_hourly_jobs:
# Use hourly sampler for statistical sampling with aggregated jobs
hour_of_day = (current_hour - 1) % 24

jobs = hourly_sampler.sample_aggregated(hour_of_day, rng=np_random)
jobs = hourly_sampler.sample_aggregated(
hour_of_day,
rng=np_random,
arrival_scale=job_arrival_scale,
)
arrival_scale_applied_in_source = True

if len(jobs) > 0:
for job in jobs:
Expand Down Expand Up @@ -94,4 +138,33 @@ def generate_jobs(
new_jobs_nodes.append(np_random.integers(MIN_NODES_PER_JOB, MAX_NODES_PER_JOB + 1))
new_jobs_cores.append(np_random.integers(MIN_CORES_PER_JOB, CORES_PER_NODE + 1))

# Global arrival scaling for sources that do not apply it internally.
if (not arrival_scale_applied_in_source) and new_jobs_count > 0 and job_arrival_scale != 1.0:
if job_arrival_scale <= 0.0:
return 0, [], [], []

whole = int(np.floor(job_arrival_scale))
frac = float(job_arrival_scale - whole)

scaled_durations: list[int] = []
scaled_nodes: list[int] = []
scaled_cores: list[int] = []

if whole > 0:
scaled_durations.extend(new_jobs_durations * whole)
scaled_nodes.extend(new_jobs_nodes * whole)
scaled_cores.extend(new_jobs_cores * whole)

if frac > 0.0:
for d, n, c in zip(new_jobs_durations, new_jobs_nodes, new_jobs_cores):
if np_random.random() < frac:
scaled_durations.append(d)
scaled_nodes.append(n)
scaled_cores.append(c)

new_jobs_durations = scaled_durations
new_jobs_nodes = scaled_nodes
new_jobs_cores = scaled_cores
new_jobs_count = len(new_jobs_durations)

return new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores
24 changes: 23 additions & 1 deletion train.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import re
import glob
import argparse
import sys
import pandas as pd
from src.workloadgen import WorkloadGenerator
from src.workloadgen_cli import add_workloadgen_args, build_workloadgen_config
Expand All @@ -33,6 +34,10 @@ def main():
parser.add_argument('--job-durations', type=str, nargs='?', const="", default="", help='Path to a file containing job duration samples (for use with durations_sampler)')
parser.add_argument('--jobs', type=str, nargs='?', const="", default="", help='Path to a file containing job samples (for use with jobs_sampler)')
parser.add_argument('--hourly-jobs', type=str, nargs='?', const="", default="", help='Path to Slurm log file for hourly statistical sampling (for use with hourly_sampler)')
parser.add_argument('--job-arrival-scale', type=float, default=1.0, help='Scale sampled arrivals per step (1.0 = unchanged).')
parser.add_argument('--jobs-exact-replay', action='store_true', help='For --jobs mode, replay raw jobs in timeline order (no template aggregation).')
parser.add_argument('--jobs-exact-replay-aggregate', action='store_true', help='With --jobs-exact-replay, aggregate each sampled raw time-bin before enqueueing.')
parser.add_argument('--plot-rewards', action='store_true', help='Per step, plot rewards for all possible num_idle_nodes & num_used_nodes (default: False).')
parser.add_argument('--plot-eff-reward', action=argparse.BooleanOptionalAction, default=True, help='Include efficiency reward in the plot (dashed line).')
parser.add_argument('--plot-price-reward', action=argparse.BooleanOptionalAction, default=True, help='Include price reward in the plot (dashed line).')
parser.add_argument('--plot-idle-penalty', action=argparse.BooleanOptionalAction, default=True, help='Include idle penalty in the plot (dashed line).')
Expand Down Expand Up @@ -62,6 +67,20 @@ def main():
parser.add_argument("--print-policy", action="store_true", help="Print structure of the policy network.")

args = parser.parse_args()
if args.job_arrival_scale < 0.0:
parser.error("--job-arrival-scale must be >= 0.0")
if args.jobs_exact_replay and not norm_path(args.jobs):
parser.error("--jobs-exact-replay requires --jobs")
if args.jobs_exact_replay_aggregate and not args.jobs_exact_replay:
parser.error("--jobs-exact-replay-aggregate requires --jobs-exact-replay")
if args.workload_gen and args.job_arrival_scale != 1.0:
print(
"Warning: --job-arrival-scale is not allowed with --workload-gen; "
"resetting it to 1.0. Use workload generator arrival settings instead.",
file=sys.stderr,
)
args.job_arrival_scale = 1.0

prices_file_path = args.prices
job_durations_file_path = args.job_durations
jobs_file_path = args.jobs
Expand Down Expand Up @@ -133,7 +152,10 @@ def main():
plot_config=plot_config,
steps_per_iteration=STEPS_PER_ITERATION,
evaluation_mode=args.evaluate_savings,
workload_gen=workload_gen)
workload_gen=workload_gen,
job_arrival_scale=args.job_arrival_scale,
jobs_exact_replay=args.jobs_exact_replay,
jobs_exact_replay_aggregate=args.jobs_exact_replay_aggregate)
env.reset(seed=args.seed)

# Check if there are any saved models in models_dir
Expand Down
Loading
Loading