Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 157 additions & 42 deletions mlperf_logging/result_summarizer/result_summarizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
import os
import re
import sys
import traceback
import itertools
import pandas as pd
import yaml
import numpy as np
import hashlib
import math
import operator
import uuid as uuidlib
import copy

from ..compliance_checker import mlp_compliance
from ..compliance_checker.mlp_compliance import usage_choices, rule_choices
from ..compliance_checker.mlp_parser import parse_file

from ..rcp_checker import rcp_checker
from ..benchmark_meta import get_allowed_benchmarks, get_result_file_counts


Expand Down Expand Up @@ -263,12 +267,23 @@ def _get_weak_scaling_metric_schema():
}


def _get_empty_summary(usage, ruleset, weak_scaling=False):
def _get_strong_scaling_metric_schema():
return {
'time_to_train': float,
'Energy': float,
'GBS': float,
'epochs': float,
'RCP': str,
'rcp_scaling_factor': float,
}


def _get_empty_summary(usage, ruleset, weak_scaling=False, detailed=False):
return Summary(
_get_column_schema(usage, ruleset, weak_scaling=weak_scaling).keys())
_get_column_schema(usage, ruleset, weak_scaling=weak_scaling, detailed=detailed).keys())


def _get_column_schema(usage, ruleset, weak_scaling=False):
def _get_column_schema(usage, ruleset, weak_scaling=False, detailed=False):
schema = {
'division': str,
'availability': str,
Expand All @@ -289,9 +304,17 @@ def _get_column_schema(usage, ruleset, weak_scaling=False):
for metric, dtype in _get_weak_scaling_metric_schema().items():
schema['{}:{}'.format(benchmark, metric)] = dtype
else:
schema.update(
{b: float
for b in get_allowed_benchmarks(usage, ruleset)})
if detailed:
benchmarks = get_allowed_benchmarks(usage, ruleset)
for benchmark in benchmarks:
for metric, dtype in _get_strong_scaling_metric_schema().items():
schema['{}:{}'.format(benchmark, metric)] = dtype
else:
schema.update(
{
b: float for b in get_allowed_benchmarks(usage, ruleset)
}
)
schema.update({'details_url': str, 'code_url': str})
return schema

Expand Down Expand Up @@ -404,8 +427,8 @@ def _compute_strong_score_standalone(
power_score = olympic_avg
power_score *= scaling_factor
if return_full_scores:
return scores_track, power_scores_track, score, power_score
return score, power_score
return scores_track, power_scores_track, score, power_score, scaling_factor
return score, power_score, scaling_factor


def _compute_weak_score_standalone(benchmark, system, has_power, benchmark_folder, usage, ruleset, desc = {"submitter": None}):
Expand Down Expand Up @@ -474,31 +497,106 @@ def _compute_weak_score_standalone(benchmark, system, has_power, benchmark_folde



def _compute_strong_scaling_scores(desc, system_folder, usage, ruleset):
def _compute_strong_scaling_scores(desc, system_folder, usage, ruleset, division, rcp_bypass=False):
# Collect scores for benchmarks.
benchmark_scores = {}
benchmark_power_scores = {}
has_power = None
detailed_bechmark_scores = {}
benchmark_folder_parent = os.path.join(
system_folder, 'strong') if usage == 'hpc' else system_folder
if not os.path.isdir(benchmark_folder_parent):
return benchmark_scores, benchmark_power_scores
return benchmark_scores, {}
for benchmark_folder in _get_sub_folders(benchmark_folder_parent):
folder_parts = benchmark_folder.split('/')
# Check if this benchmark has power results
has_power = _has_power(benchmark_folder)
benchmark = _benchmark_alias(folder_parts[-1])
system = folder_parts[-3] if usage == 'hpc' else folder_parts[-2]
# Read scores from result files.
score, power_score = _compute_strong_score_standalone(benchmark, system, has_power, benchmark_folder, usage, ruleset, desc)
# Compute base perf/power scores
score, power_score, rcp_scaling_factor = _compute_strong_score_standalone(
benchmark, system, has_power, benchmark_folder, usage, ruleset, desc
)

# RCP/GBS/Epochs additions for closed division
benchmark_gbs = None
benchmark_epochs = None
benchmark_rcp = None
if division == 'closed':
pattern = '{folder}/result_*.txt'.format(folder=benchmark_folder)
result_files = glob.glob(pattern, recursive=True)
try:
# RCP check
verbose = False
bert_train_samples = False
rcp_pass, rcp_msg, _ = rcp_checker.check_directory(
benchmark_folder,
usage,
ruleset,
verbose,
bert_train_samples,
rcp_file=None,
rcp_pass='pruned_rcps',
rcp_bypass=rcp_bypass,
set_scaling=True,
)
if not rcp_pass:
print(
'ERROR: RCP Test Failed on {}/{}/{} with message: {}.'.format(
desc['submitter'], system, benchmark, rcp_msg
)
)
if rcp_msg == 'RCP found':
benchmark_rcp = 'Fail'
elif rcp_msg == 'RCP Interpolation':
benchmark_rcp = 'Interp. Fail'
elif 'Missing' in rcp_msg:
benchmark_rcp = 'Missing'
elif rcp_msg == 'Cannot find any RCPs':
benchmark_rcp = 'No RCP'
else:
benchmark_rcp = 'Unknown state'
else:
benchmark_rcp = 'Pass'

# GBS and epochs
benchmark_gbs, subm_epochs, _ = rcp_checker.get_submission_epochs(
result_files, ruleset, bert_train_samples=False
)
subm_epochs.sort()
samples_rejected = 1
if len(subm_epochs) >= 2 * samples_rejected + 1:
benchmark_epochs = float(
np.mean(
subm_epochs[
samples_rejected : len(subm_epochs) - samples_rejected
]
)
)
except Exception as e:
print(
f"WARNING: RCP/GBS computation failed for {benchmark_folder}: {e}"
)
traceback.print_exc()

# Map into metric-suffixed keys for schema
detailed_bechmark_scores[f"{benchmark}:rcp_scaling_factor"] = float(
rcp_scaling_factor
)
if score is not None:
benchmark_scores[benchmark] = score
detailed_bechmark_scores[f"{benchmark}:time_to_train"] = score
if benchmark_gbs is not None:
detailed_bechmark_scores[f"{benchmark}:GBS"] = float(benchmark_gbs)
if benchmark_epochs is not None:
detailed_bechmark_scores[f"{benchmark}:samples_to_converge"] = float(benchmark_epochs)
if benchmark_rcp is not None:
detailed_bechmark_scores[f"{benchmark}:RCP"] = benchmark_rcp
if power_score is not None:
benchmark_power_scores[benchmark] = power_score
_fill_empty_benchmark_scores(benchmark_scores, usage, ruleset)
if len(benchmark_power_scores) > 0:
_fill_empty_benchmark_scores(benchmark_power_scores, usage, ruleset)
return benchmark_scores, benchmark_power_scores
detailed_bechmark_scores[f"{benchmark}:Energy"] = power_score
benchmark_scores[f"{benchmark}"] = float(
rcp_scaling_factor
)
_fill_empty_benchmark_scores(benchmark_scores, usage, ruleset, detailed=False)
_fill_empty_benchmark_scores(detailed_bechmark_scores, usage, ruleset, detailed=True)
return benchmark_scores, detailed_bechmark_scores


def _compute_weak_scaling_scores(desc, system_folder, usage, ruleset):
Expand Down Expand Up @@ -693,6 +791,7 @@ def _fill_empty_benchmark_scores(
usage,
ruleset,
weak_scaling=False,
detailed=False,
):
for benchmark in get_allowed_benchmarks(usage, ruleset):
if weak_scaling:
Expand All @@ -702,8 +801,19 @@ def _fill_empty_benchmark_scores(
benchmark_scores[k] = None

else:
if benchmark not in benchmark_scores:
Comment thread
nnasirinvidia marked this conversation as resolved.
benchmark_scores[benchmark] = None
if detailed:
strong_schema = _get_strong_scaling_metric_schema()
for metric, dtype in strong_schema.items():
k = '{}:{}'.format(benchmark, metric)
if dtype is str:
if k not in benchmark_scores or benchmark_scores[k] is None:
benchmark_scores[k] = ''
else:
if k not in benchmark_scores:
benchmark_scores[k] = None
else:
if benchmark not in benchmark_scores:
benchmark_scores[benchmark] = None


def _get_id_from_sysinfo(summary):
Expand Down Expand Up @@ -841,7 +951,7 @@ def summarize_results(folder, usage, ruleset, csv_file=None, **kwargs):
weak_scaling_summary = _get_empty_summary(usage,
ruleset,
weak_scaling=True)
power_summary = _get_empty_summary(usage, ruleset)
detailed_strong_scaling_summary = _get_empty_summary(usage, ruleset, detailed=True)
power_weak_scaling_summary = _get_empty_summary(usage, ruleset, weak_scaling=True)
for system_folder in _get_sub_folders(results_folder):
folder_parts = system_folder.split('/')
Expand Down Expand Up @@ -924,8 +1034,8 @@ def _check_and_update_system_specs(desc_keys, column_name, query=None):
continue

# Compute the scores.
strong_scaling_scores, power_scores = _compute_strong_scaling_scores(
desc, system_folder, usage, ruleset)
strong_scaling_scores, detailed_strong_scaling_scores = _compute_strong_scaling_scores(
desc, system_folder, usage, ruleset, system_specs["division"], rcp_bypass=False)
if usage == 'hpc':
weak_scaling_scores, power_scores_weak_scaling = _compute_weak_scaling_scores(
desc, system_folder, usage, ruleset)
Expand All @@ -950,17 +1060,18 @@ def _check_and_update_system_specs(desc_keys, column_name, query=None):
urls.items(),
):
weak_scaling_summary.push(column_name, value)
if len(power_scores) > 0:
if len(detailed_strong_scaling_scores) > 0:
for column_name, value in itertools.chain(
system_specs.items(),
power_scores.items(),
detailed_strong_scaling_scores.items(),
urls.items(),
):
power_summary.push(column_name, value)
if column_name in strong_scaling_scores:
power_summary.push(column_name, strong_scaling_scores[column_name])
else:
power_summary.push(column_name, value)
merged = (
detailed_strong_scaling_scores[column_name]
if column_name in detailed_strong_scaling_scores
else value
)
detailed_strong_scaling_summary.push(column_name, merged)
if usage == 'hpc' and len(power_scores_weak_scaling) > 0:
for column_name, value in itertools.chain(
system_specs.items(),
Expand All @@ -975,13 +1086,13 @@ def _check_and_update_system_specs(desc_keys, column_name, query=None):
if len(weak_scaling_summary) > 0:
weak_scaling_summary = weak_scaling_summary.to_dataframe().sort_values(
_get_sort_by_column_names()).reset_index(drop=True)
if len(power_summary) > 0:
power_summary = power_summary.to_dataframe().sort_values(
if len(detailed_strong_scaling_summary) > 0:
detailed_strong_scaling_summary = detailed_strong_scaling_summary.to_dataframe().sort_values(
_get_sort_by_column_names()).reset_index(drop=True)
if len(power_weak_scaling_summary) > 0:
power_weak_scaling_summary = power_weak_scaling_summary.to_dataframe().sort_values(
_get_sort_by_column_names()).reset_index(drop=True)
return strong_scaling_summary, weak_scaling_summary, power_summary, power_weak_scaling_summary
return strong_scaling_summary, weak_scaling_summary, detailed_strong_scaling_summary, power_weak_scaling_summary



Expand Down Expand Up @@ -1039,23 +1150,23 @@ def main():

strong_scaling_summaries = []
weak_scaling_summaries = []
power_summaries = []
detailed_strong_scaling_summaries = []
power_weak_scaling_summaries = []

def _update_summaries(folder):
if args.usage == "Training":
config_path = os.path.join(os.path.dirname(__file__), "config.yaml")
with open(config_path, "r") as f:
config = yaml.safe_load(f)
strong_scaling_summary, weak_scaling_summary, power_summary, power_weak_scaling_summary = summarize_results(
strong_scaling_summary, weak_scaling_summary, detailed_strong_scaling_summary, power_weak_scaling_summary = summarize_results(
folder,
args.usage,
args.ruleset,
availability = config["availability"],
generate_private_ids = args.generate_private_ids,
)
else:
strong_scaling_summary, weak_scaling_summary, power_summary, power_weak_scaling_summary = summarize_results(
strong_scaling_summary, weak_scaling_summary, detailed_strong_scaling_summary, power_weak_scaling_summary = summarize_results(
folder,
args.usage,
args.ruleset,
Expand All @@ -1064,8 +1175,8 @@ def _update_summaries(folder):
strong_scaling_summaries.append(strong_scaling_summary)
if len(weak_scaling_summary) > 0:
weak_scaling_summaries.append(weak_scaling_summary)
if len(power_summary) > 0:
power_summaries.append(power_summary)
if len(detailed_strong_scaling_summary) > 0:
detailed_strong_scaling_summaries.append(detailed_strong_scaling_summary)
if len(power_weak_scaling_summary) > 0:
power_weak_scaling_summaries.append(power_weak_scaling_summary)

Expand Down Expand Up @@ -1180,13 +1291,14 @@ def _summaries_to_xlsx(summaries: pd.DataFrame, path, version):

writer.save()
# Print and write back results.
def _print_and_write(summaries, weak_scaling=False, mode='w', power = False):
def _print_and_write(summaries, weak_scaling=False, mode='w', power = False, detailed = False):
if len(summaries) > 0:
summaries = pd.concat(summaries).astype(
_get_column_schema(
args.usage,
args.ruleset,
weak_scaling=weak_scaling,
detailed=detailed
)
)
if weak_scaling:
Expand All @@ -1208,6 +1320,9 @@ def _print_and_write(summaries, weak_scaling=False, mode='w', power = False):
specs_and_notes = [c for c in summaries.columns if c not in benchmarks]
csv = csv.replace(".csv", "_power.csv")
summaries.groupby(specs_and_notes).apply(lambda x: agg_columns_fn(x, benchmarks)).to_csv(csv, mode=mode)
elif detailed:
csv = csv.replace(".csv", "_detailed.csv")
summaries.to_csv(csv, index=False, mode=mode)
else:
summaries.to_csv(csv, index=False, mode=mode)
json_path = "summary.json" if args.csv is None else f"""{csv.replace(".csv", ".json")}"""
Expand All @@ -1224,7 +1339,7 @@ def _print_and_write(summaries, weak_scaling=False, mode='w', power = False):
None, 'display.max_colwidth', None):
_print_and_write(strong_scaling_summaries)
_print_and_write(weak_scaling_summaries, weak_scaling=True, mode='a')
_print_and_write(power_summaries, mode='a', power=True)
_print_and_write(detailed_strong_scaling_summaries, mode='a', detailed=True)
_print_and_write(power_weak_scaling_summaries, weak_scaling=True, mode='a', power=True)


Expand Down
Loading