Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,021 changes: 1,021 additions & 0 deletions analyze_arrivalscale_occupancy.py

Large diffs are not rendered by default.

554 changes: 434 additions & 120 deletions analyze_lambda_occupancy.py

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions src/analysis_naming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations


def _format_slug_number(value: float) -> str:
return f"{float(value):.6f}".rstrip("0").rstrip(".").replace("-", "m").replace(".", "p")


def build_weight_slug(
efficiency_weight: float,
price_weight: float,
idle_weight: float,
job_age_weight: float,
) -> str:
return (
f"e{_format_slug_number(efficiency_weight)}"
f"_p{_format_slug_number(price_weight)}"
f"_i{_format_slug_number(idle_weight)}"
f"_ja{_format_slug_number(job_age_weight)}"
)


def build_analysis_dir_name(
prefix: str,
timestamp: str,
model: int | None,
efficiency_weight: float,
price_weight: float,
idle_weight: float,
job_age_weight: float,
) -> str:
parts = [prefix]
if model is not None:
parts.append(f"m{int(model)}")
parts.append(
build_weight_slug(
efficiency_weight=efficiency_weight,
price_weight=price_weight,
idle_weight=idle_weight,
job_age_weight=job_age_weight,
)
)
parts.append(timestamp)
return "_".join(parts)
11 changes: 11 additions & 0 deletions src/arrival_scale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from __future__ import annotations

import math


def validate_job_arrival_scale(job_arrival_scale: float) -> float:
"""Return a normalized arrival scale or raise for invalid values."""
scale = float(job_arrival_scale)
if not math.isfinite(scale) or scale < 0.0:
raise ValueError("--job-arrival-scale must be finite and >= 0.0")
return scale
24 changes: 20 additions & 4 deletions src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
age_backlog_queue,
)
from src.metrics_tracker import MetricsTracker
from src.reward_calculation import power_cost
from src.reward_calculation import power_cost, power_consumption_mwh
from src.config import CORES_PER_NODE


Expand All @@ -30,7 +30,7 @@ def baseline_step(
metrics: MetricsTracker,
env_print: Callable[..., None],
baseline_backlog_queue: deque,
) -> tuple[float, float, int, int]:
) -> tuple[float, float, float, float, int, int, int, int]:
"""
Run one step of the baseline simulation for comparison.

Expand All @@ -52,7 +52,12 @@ def baseline_step(
baseline_backlog_queue: Overflow queue for jobs that don't fit in the main queue

Returns:
Tuple of (baseline_cost, baseline_cost_off, updated baseline_next_empty_slot, updated next_job_id)
Tuple of (
baseline_cost, baseline_cost_off,
baseline_power_mwh, baseline_power_off_mwh,
updated baseline_next_empty_slot, updated next_job_id,
baseline_num_used_nodes, baseline_num_used_cores
)
"""
job_queue_2d = baseline_state['job_queue'].reshape(-1, 4)

Expand Down Expand Up @@ -113,6 +118,17 @@ def baseline_step(
env_print(f" > baseline_cost: €{baseline_cost:.4f} | used nodes: {num_used_nodes}, idle nodes: {num_idle_nodes}")
baseline_cost_off = power_cost(num_used_nodes, 0, current_price)
env_print(f" > baseline_cost_off: €{baseline_cost_off:.4f} | used nodes: {num_used_nodes}, idle nodes: 0")
baseline_power_mwh = power_consumption_mwh(num_used_nodes, num_idle_nodes)
baseline_power_off_mwh = power_consumption_mwh(num_used_nodes, 0)

num_used_cores = num_on_nodes * CORES_PER_NODE - np.sum(baseline_cores_available)
return baseline_cost, baseline_cost_off, baseline_next_empty_slot, next_job_id, num_used_nodes, num_used_cores
return (
baseline_cost,
baseline_cost_off,
baseline_power_mwh,
baseline_power_off_mwh,
baseline_next_empty_slot,
next_job_id,
num_used_nodes,
num_used_cores,
)
11 changes: 9 additions & 2 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue
)
from src.node_management import adjust_nodes
from src.reward_calculation import RewardCalculator
from src.reward_calculation import RewardCalculator, power_consumption_mwh
from src.baseline import baseline_step
from src.workload_generator import generate_jobs
from src.metrics_tracker import MetricsTracker
Expand Down Expand Up @@ -452,7 +452,7 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
self.env_print(f"[5] Calculating reward...")

# Baseline step
baseline_cost, baseline_cost_off, self.baseline_next_empty_slot, self.next_job_id, baseline_num_used_nodes, baseline_num_used_cores = baseline_step(
baseline_cost, baseline_cost_off, baseline_power_mwh, baseline_power_off_mwh, self.baseline_next_empty_slot, self.next_job_id, baseline_num_used_nodes, baseline_num_used_cores = baseline_step(
self.baseline_state, self.baseline_cores_available, self.baseline_running_jobs,
current_price, new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores,
self.baseline_next_empty_slot, self.next_job_id, self.metrics, self.env_print,
Expand All @@ -463,6 +463,10 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
self.metrics.baseline_cost_off += baseline_cost_off
self.metrics.episode_baseline_cost += baseline_cost
self.metrics.episode_baseline_cost_off += baseline_cost_off
self.metrics.baseline_power_consumption_mwh += baseline_power_mwh
self.metrics.baseline_power_consumption_off_mwh += baseline_power_off_mwh
self.metrics.episode_baseline_power_consumption_mwh += baseline_power_mwh
self.metrics.episode_baseline_power_consumption_off_mwh += baseline_power_off_mwh

self.metrics.episode_baseline_used_nodes.append(baseline_num_used_nodes)
self.metrics.episode_baseline_used_cores.append(baseline_num_used_cores)
Expand All @@ -476,6 +480,9 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
self.metrics.episode_reward += step_reward
self.metrics.total_cost += step_cost
self.metrics.episode_total_cost += step_cost
step_power_mwh = power_consumption_mwh(num_used_nodes, num_idle_nodes)
self.metrics.total_power_consumption_mwh += step_power_mwh
self.metrics.episode_total_power_consumption_mwh += step_power_mwh

# Store normalized reward components for plotting
self.metrics.eff_rewards.append(eff_reward_norm * 100)
Expand Down
67 changes: 67 additions & 0 deletions src/evaluation_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

import math
from typing import Mapping, Sequence


def _fmt_optional(value: float | int | None, precision: int = 2, thousands: bool = False) -> str:
if value is None:
return "n/a"

numeric_value = float(value)
if math.isnan(numeric_value):
return "n/a"

return f"{numeric_value:,.{precision}f}" if thousands else f"{numeric_value:.{precision}f}"


def mean_occupancy_pct(values: Sequence[int], capacity: int) -> float:
if not values or capacity <= 0:
return 0.0
return float(sum(values) * 100.0 / (len(values) * capacity))


def build_episode_summary_line(
episode_number: int,
episode_data: Mapping[str, float | int],
timeline_max_queue: int,
agent_occupancy_cores_pct: float,
baseline_occupancy_cores_pct: float,
agent_occupancy_nodes_pct: float,
baseline_occupancy_nodes_pct: float,
) -> str:
return (
f" Episode {episode_number}: "
f"Agent Cost=€{float(episode_data['agent_cost']):.0f}, "
f"Baseline Cost=€{float(episode_data['baseline_cost']):.0f} | "
f"Baseline Off=€{float(episode_data['baseline_cost_off']):.0f}, "
f"Savings=€{float(episode_data['savings_vs_baseline']):.0f}/"
f"€{float(episode_data['savings_vs_baseline_off']):.0f}, "
f"Power={float(episode_data['agent_power_consumption_mwh']):.1f}/"
f"{float(episode_data['baseline_power_consumption_mwh']):.1f}/"
f"{float(episode_data['baseline_power_consumption_off_mwh']):.1f} MWh "
f"(agent/base/base_off), "
f"MeanPrice={float(episode_data['agent_mean_price']):.2f}/"
f"{float(episode_data['baseline_mean_price']):.2f}/"
f"{float(episode_data['baseline_off_mean_price']):.2f} €/MWh "
f"(agent/base/base_off), "
f"CostPer1kCompleted="
f"{_fmt_optional(episode_data['agent_cost_per_1000_completed_jobs'], 1, thousands=True)}/"
f"{_fmt_optional(episode_data['baseline_cost_per_1000_completed_jobs'], 1, thousands=True)}/"
f"{_fmt_optional(episode_data['baseline_off_cost_per_1000_completed_jobs'], 1, thousands=True)} "
f"€/1k (agent/base/base_off), "
f"DroppedPerSavedEuro="
f"{_fmt_optional(episode_data['agent_dropped_jobs_per_saved_euro'], 6)}/"
f"{_fmt_optional(episode_data['agent_dropped_jobs_per_saved_euro_off'], 6)} "
f"jobs/€ (vs base/base_off), "
f"Jobs={int(episode_data['jobs_completed'])}/{int(episode_data['jobs_submitted'])} "
f"({float(episode_data['completion_rate']):.0f}%), "
f"AvgWait={float(episode_data['avg_wait_time']):.1f}h, "
f"EpisodeMaxQueue={int(episode_data['max_queue_size'])}, "
f"Dropped={int(episode_data['jobs_dropped'])}, "
f"TimelineMaxQueue={timeline_max_queue}, "
f"Agent Occupancy (Cores)={agent_occupancy_cores_pct:.2f}%, "
f"Baseline Occupancy (Cores)={baseline_occupancy_cores_pct:.2f}%, "
f"Agent Occupancy (Nodes)={agent_occupancy_nodes_pct:.2f}%, "
f"Baseline Occupancy (Nodes)={baseline_occupancy_nodes_pct:.2f}% "
)
58 changes: 56 additions & 2 deletions src/metrics_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@
class MetricsTracker:
"""Tracks metrics throughout training episodes."""

@staticmethod
def _effective_mean_price(total_cost: float, total_power_mwh: float) -> float:
"""Effective mean price in €/MWh, weighted by consumed energy."""
return (total_cost / total_power_mwh) if total_power_mwh > 0.0 else 0.0

@staticmethod
def _safe_ratio(numerator: float, denominator: float) -> float:
"""Safe division; returns NaN when denominator is not positive."""
return (numerator / denominator) if denominator > 0.0 else float("nan")

def __init__(self) -> None:
"""Initialize all metric counters."""
self.reset_timeline_metrics()
Expand All @@ -21,6 +31,9 @@ def reset_timeline_metrics(self) -> None:
self.total_cost: float = 0.0
self.baseline_cost: float = 0.0
self.baseline_cost_off: float = 0.0
self.total_power_consumption_mwh: float = 0.0
self.baseline_power_consumption_mwh: float = 0.0
self.baseline_power_consumption_off_mwh: float = 0.0

# Agent job metrics (cumulative across episodes)
self.jobs_submitted: int = 0
Expand Down Expand Up @@ -59,6 +72,9 @@ def reset_episode_metrics(self) -> None:
self.episode_total_cost: float = 0.0
self.episode_baseline_cost: float = 0.0
self.episode_baseline_cost_off: float = 0.0
self.episode_total_power_consumption_mwh: float = 0.0
self.episode_baseline_power_consumption_mwh: float = 0.0
self.episode_baseline_power_consumption_off_mwh: float = 0.0

# Agent job metrics (episode)
self.episode_jobs_submitted: int = 0
Expand Down Expand Up @@ -138,16 +154,54 @@ def record_episode_completion(self, current_episode: int) -> dict[str, float | i
if self.episode_baseline_jobs_submitted
else 0.0
)
agent_mean_price: float = self._effective_mean_price(
self.episode_total_cost, self.episode_total_power_consumption_mwh
)
baseline_mean_price: float = self._effective_mean_price(
self.episode_baseline_cost, self.episode_baseline_power_consumption_mwh
)
baseline_off_mean_price: float = self._effective_mean_price(
self.episode_baseline_cost_off, self.episode_baseline_power_consumption_off_mwh
)
agent_cost_per_1000_completed: float = self._safe_ratio(
self.episode_total_cost * 1000.0, float(self.episode_jobs_completed)
)
baseline_cost_per_1000_completed: float = self._safe_ratio(
self.episode_baseline_cost * 1000.0, float(self.episode_baseline_jobs_completed)
)
# baseline_off is a cost variant of baseline scheduling, so it uses the same completed-job count.
baseline_off_cost_per_1000_completed: float = self._safe_ratio(
self.episode_baseline_cost_off * 1000.0, float(self.episode_baseline_jobs_completed)
)
savings_vs_baseline: float = self.episode_baseline_cost - self.episode_total_cost
savings_vs_baseline_off: float = self.episode_baseline_cost_off - self.episode_total_cost
dropped_jobs_per_saved_euro: float = self._safe_ratio(
float(self.episode_jobs_dropped), savings_vs_baseline
) if savings_vs_baseline > 0.0 else float("nan")
dropped_jobs_per_saved_euro_off: float = self._safe_ratio(
float(self.episode_jobs_dropped), savings_vs_baseline_off
) if savings_vs_baseline_off > 0.0 else float("nan")

episode_data: dict[str, float | int] = {
'episode': current_episode,
'agent_cost': self.episode_total_cost,
'baseline_cost': self.episode_baseline_cost,
'baseline_cost_off': self.episode_baseline_cost_off,
'savings_vs_baseline': self.episode_baseline_cost - self.episode_total_cost,
'savings_vs_baseline_off': self.episode_baseline_cost_off - self.episode_total_cost,
'agent_power_consumption_mwh': self.episode_total_power_consumption_mwh,
'baseline_power_consumption_mwh': self.episode_baseline_power_consumption_mwh,
'baseline_power_consumption_off_mwh': self.episode_baseline_power_consumption_off_mwh,
'agent_mean_price': agent_mean_price,
'baseline_mean_price': baseline_mean_price,
'baseline_off_mean_price': baseline_off_mean_price,
'savings_vs_baseline': savings_vs_baseline,
'savings_vs_baseline_off': savings_vs_baseline_off,
'savings_pct_baseline': ((self.episode_baseline_cost - self.episode_total_cost) / self.episode_baseline_cost) * 100 if self.episode_baseline_cost > 0 else 0.0,
'savings_pct_baseline_off': ((self.episode_baseline_cost_off - self.episode_total_cost) / self.episode_baseline_cost_off) * 100 if self.episode_baseline_cost_off > 0 else 0.0,
'agent_cost_per_1000_completed_jobs': agent_cost_per_1000_completed,
'baseline_cost_per_1000_completed_jobs': baseline_cost_per_1000_completed,
'baseline_off_cost_per_1000_completed_jobs': baseline_off_cost_per_1000_completed,
'agent_dropped_jobs_per_saved_euro': dropped_jobs_per_saved_euro,
'agent_dropped_jobs_per_saved_euro_off': dropped_jobs_per_saved_euro_off,
'total_reward': self.episode_reward,
# Agent job metrics
'jobs_submitted': self.episode_jobs_submitted,
Expand Down
16 changes: 16 additions & 0 deletions src/reward_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ def power_cost(num_used_nodes: int, num_idle_nodes: int, current_price: float) -
return total_cost


def power_consumption_mwh(num_used_nodes: int, num_idle_nodes: int) -> float:
"""
Calculate energy consumption for one environment step.

One environment step equals one hour, so this is both average MW and MWh/step.

Args:
num_used_nodes: Number of nodes with jobs running
num_idle_nodes: Number of idle (on but unused) nodes

Returns:
Energy consumption in MWh for this step
"""
return COST_IDLE_MW * num_idle_nodes + COST_USED_MW * num_used_nodes


class RewardCalculator:

"""Calculates rewards with pre-computed normalization bounds."""
Expand Down
2 changes: 2 additions & 0 deletions src/sampler_hourly.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any

import numpy as np
from src.arrival_scale import validate_job_arrival_scale


class HourlySampler:
Expand Down Expand Up @@ -300,6 +301,7 @@ def sample_aggregated(
if not self.aggregation_initialized:
raise RuntimeError("Aggregation not initialized. Call precalculate_hourly_templates() first.")

arrival_scale = validate_job_arrival_scale(arrival_scale)
hour_of_day = hour_of_day % 24

dist = self.hour_distributions[hour_of_day]
Expand Down
2 changes: 2 additions & 0 deletions src/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import math
from typing import TYPE_CHECKING
import numpy as np
from src.arrival_scale import validate_job_arrival_scale
from src.config import (
MAX_NEW_JOBS_PER_HOUR, MAX_JOB_DURATION, MIN_NODES_PER_JOB,
MAX_NODES_PER_JOB, MIN_CORES_PER_JOB, CORES_PER_NODE
Expand Down Expand Up @@ -55,6 +56,7 @@ def generate_jobs(
Returns:
Tuple of (new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores)
"""
job_arrival_scale = validate_job_arrival_scale(job_arrival_scale)
new_jobs_durations = []
new_jobs_nodes = []
new_jobs_cores = []
Expand Down
Loading
Loading