文件
gprMax/reframe_tests/tests/base_tests.py
2024-12-04 12:06:33 +00:00

438 行
16 KiB
Python

"""ReFrame base classes for GprMax tests
Usage (run all tests):
cd gprMax/reframe_tests
reframe -C configuration/{CONFIG_FILE} -c tests/ -r
"""
import os
from pathlib import Path
from typing import Literal, Optional, Union
import reframe.utility.sanity as sn
import reframe.utility.typecheck as typ
from reframe import RunOnlyRegressionTest, simple_test
from reframe.core.builtins import (
parameter,
performance_function,
require_deps,
required,
run_after,
run_before,
sanity_function,
variable,
)
from reframe.core.exceptions import DependencyError
from reframe.utility import udeps
from reframe_tests.tests.regression_checks import RegressionCheck
from reframe_tests.utilities.deferrable import path_join
GPRMAX_ROOT_DIR = Path(__file__).parent.parent.parent.resolve()
PATH_TO_PYENV = os.path.join(".venv", "bin", "activate")
@simple_test
class CreatePyenvTest(RunOnlyRegressionTest):
"""Create a fresh virtual environment for running the tests.
The test checks for any errors from pip installing gprMax and its
dependencies.
"""
valid_systems = ["generic", "archer2:login"]
valid_prog_environs = ["builtin", "PrgEnv-gnu"]
modules = ["cray-python"]
prerun_cmds = [
"python -m venv --system-site-packages --prompt gprMax .venv",
f"source {PATH_TO_PYENV}",
"CC=cc CXX=CC FC=ftn python -m pip install --upgrade pip",
f"CC=cc CXX=CC FC=ftn python -m pip install -r {os.path.join(GPRMAX_ROOT_DIR, 'requirements.txt')}",
]
executable = f"CC=cc CXX=CC FC=ftn python -m pip install -e {GPRMAX_ROOT_DIR}"
@run_after("init")
def install_system_specific_dependencies(self):
"""Install additional dependencies for specific systems."""
if self.current_system.name == "archer2":
"""
Needed to prevent a pip install error.
dask 2022.2.1 (installed) requires cloudpickle>=1.1.1, which
is not installed and is missed by the pip dependency checks.
Not necessary for gprMax, but any error message is picked up
by the sanity checks.
"""
self.prerun_cmds.insert(3, "CC=cc CXX=CC FC=ftn python -m pip install cloudpickle")
"""
A default pip install of h5py does not have MPI support so
it needs to built as described here:
https://docs.h5py.org/en/stable/mpi.html#building-against-parallel-hdf5
"""
self.modules.append("cray-hdf5-parallel")
self.prerun_cmds.insert(
4, "CC=mpicc HDF5_MPI='ON' python -m pip install --no-binary=h5py h5py"
)
@sanity_function
def check_requirements_installed(self):
"""Check packages were successfully installed.
Check pip is up to date and gprMax dependencies from
requirements.txt were successfully installed. Check gprMax was
installed successfully and no other errors were thrown.
"""
return (
sn.assert_found(
r"(Successfully installed pip)|(Requirement already satisfied: pip.*\n(?!Collecting pip))",
self.stdout,
"Failed to update pip",
)
and sn.assert_found(
r"Successfully installed (?!(gprMax)|(pip))",
self.stdout,
"Failed to install requirements",
)
and sn.assert_found(
r"Successfully installed gprMax", self.stdout, "Failed to install gprMax"
)
and sn.assert_not_found(r"finished with status 'error'", self.stdout)
and sn.assert_not_found(r"(ERROR|error):", self.stderr)
)
class GprMaxBaseTest(RunOnlyRegressionTest):
"""Base class that all GprMax tests should inherit from.
Test functionality can be augmented by using Mixin classes.
Attributes:
model (parameter[str]): ReFrame parameter to specify the model
name(s).
sourcesdir (str): Relative path to the test's src directory.
regression_checks (list[RegressionCheck]): List of regression
checks to perform.
test_dependency (type[GprMaxBaseTest] | None): Optional test
dependency. If specified, regression checks will use
reference files created by the test dependency.
"""
valid_systems = ["archer2:compute"]
valid_prog_environs = ["PrgEnv-gnu"]
modules = ["cray-python"]
num_cpus_per_task = 16
exclusive_access = True
model = parameter()
sourcesdir = required
executable = "time -p python -m gprMax"
regression_checks = variable(typ.List[RegressionCheck], value=[])
# TODO: Make this a ReFrame variable
# Not currently possible as ReFrame does not think an object of type
# reframe.core.meta.RegressionTestMeta is copyable, and so ReFrame
# test classes cannot be specified in a variable.
test_dependency: Optional[type["GprMaxBaseTest"]] = None
# test_dependency = variable(type(None), type, value=None)
def get_test_dependency_variant_name(self, **kwargs) -> Optional[str]:
"""Get unique ReFrame name of the test dependency variant.
By default, filter test dependencies by the model name.
Args:
**kwargs: Additional key-value pairs to filter the parameter
space of the test dependency. The key is the test
parameter name and the value is either a single value or
a unary function that evaluates to True if the parameter
point must be kept, False otherwise.
Returns:
variant_name: Unique name of the test dependency variant.
"""
if self.test_dependency is None:
return None
# Always filter by the model parameter, but allow child classes
# (or mixins) to override how models are filtered.
kwargs.setdefault("model", self.model)
variant_nums = self.test_dependency.get_variant_nums(**kwargs)
if len(variant_nums) < 1:
raise DependencyError(
f"No variant of '{self.test_dependency.__name__}' meets conditions: {kwargs}",
)
return self.test_dependency.variant_name(variant_nums[0])
def get_test_dependency(self) -> Optional["GprMaxBaseTest"]:
"""Get correct ReFrame test case from the test dependency.
Returns:
test_case: ReFrame test case.
"""
variant = self.get_test_dependency_variant_name()
if variant is None:
return None
else:
return self.getdep(variant)
def build_reference_filepath(self, name: Union[str, os.PathLike], suffix: str = ".h5") -> Path:
"""Build path to the specified reference file.
Reference files are saved in directories per test case. If this
test does not specify a test dependency, it will save and manage
its own reference files in its own directory. Otherwise, it will
use reference files saved by its test dependency.
Args:
name: Name of the file.
suffix: File extension. Default ".h5".
Returns:
filepath: Absolute path to the reference file.
"""
target = self.get_test_dependency()
if target is None:
reference_dir = self.short_name
else:
reference_dir = target.short_name
reference_file = Path("regression_checks", reference_dir, name).with_suffix(suffix)
return reference_file.absolute()
# TODO: Change CreatePyenvTest to a fixture instead of a test dependency
@run_after("init")
def inject_dependencies(self):
"""Specify test dependencies.
All tests depend on the Python virtual environment building
correctly and their own test dependency if specified.
"""
self.depends_on("CreatePyenvTest", udeps.by_env)
if self.test_dependency is not None:
variant = self.get_test_dependency_variant_name()
self.depends_on(variant, udeps.by_env)
@require_deps
def get_pyenv_path(self, CreatePyenvTest):
"""Add prerun command to load the built Python environment."""
path_to_pyenv = os.path.join(CreatePyenvTest(part="login").stagedir, PATH_TO_PYENV)
self.prerun_cmds.append(f"source {path_to_pyenv}")
@run_after("init")
def setup_env_vars(self):
"""Set necessary environment variables.
Set OMP_NUM_THREADS environment variable from num_cpus_per_task
and other system specific varaibles.
"""
self.env_vars["OMP_NUM_THREADS"] = self.num_cpus_per_task
if self.current_system.name == "archer2":
# Avoid inheriting slurm memory environment variables from any previous slurm job (i.e. the reframe job)
self.prerun_cmds.append("unset SLURM_MEM_PER_NODE")
self.prerun_cmds.append("unset SLURM_MEM_PER_CPU")
# Set the matplotlib cache to the work filesystem
self.env_vars["MPLCONFIGDIR"] = "${HOME/home/work}/.config/matplotlib"
@run_after("init")
def set_file_paths(self):
"""Set default test input and output files.
These are set in a post-init hook to allow mixins to use them
later in the pipeline.
"""
self.input_file = Path(f"{self.model}.in")
self.output_file = Path(f"{self.model}.h5")
@run_before("run")
def configure_test_run(self):
"""Configure gprMax commandline arguments and files to keep."""
input_file = str(self.input_file)
output_file = str(self.output_file)
self.executable_opts += [
input_file,
"-o",
output_file,
"--log-level",
"10",
"--hide-progress-bars",
]
regression_output_files = [str(r.output_file) for r in self.regression_checks]
self.keep_files += [input_file, output_file, *regression_output_files]
"""
if self.has_receiver_output:
self.postrun_cmds = [
f"python -m reframe_tests.utilities.plotting {self.output_file} {self.reference_file} -m {self.model}"
]
self.keep_files += [self.output_file, f"{self.model}.pdf"]
if self.is_antenna_model:
self.postrun_cmds = [
f"python -m toolboxes.Plotting.plot_antenna_params -save {self.output_file}"
]
antenna_t1_params = f"{self.model}_t1_params.pdf"
antenna_ant_params = f"{self.model}_ant_params.pdf"
self.keep_files += [
antenna_t1_params,
antenna_ant_params,
]
"""
@run_before("run")
def combine_task_outputs(self):
"""Split output from each MPI rank.
If running with multiple MPI ranks, split the output into
seperate files and add postrun commands to combine the files
after the simulation has run.
"""
if self.num_tasks > 1:
stdout = self.stdout.evaluate().split(".")[0]
stderr = self.stderr.evaluate().split(".")[0]
self.prerun_cmds.append(f"mkdir out")
self.prerun_cmds.append(f"mkdir err")
self.job.launcher.options = [
f"--output=out/{stdout}_%t.out",
f"--error=err/{stderr}_%t.err",
]
self.executable_opts += ["--log-all-ranks"]
self.postrun_cmds.append(f"cat out/{stdout}_*.out >> {self.stdout}")
self.postrun_cmds.append(f"cat err/{stderr}_*.err >> {self.stderr}")
def test_simulation_complete(self) -> Literal[True]:
"""Check simulation completed successfully.
Returns:
simulation_completed: Returns True if the simulation
completed, otherwise it fails the test.
Raises:
reframe.core.exceptions.SanityError: If the simulation did
not complete.
"""
return sn.assert_not_found(
r"(?i)error",
self.stderr,
f"An error occured. See '{path_join(self.stagedir, self.stderr)}' for details.",
) and sn.assert_found(
r"=== Simulation completed in ", self.stdout, "Simulation did not complete"
)
def test_reference_files_exist(self) -> Literal[True]:
"""Check all reference files exist and create any missing ones.
Returns:
files_exist: Returns True if all reference files exist,
otherwise it fails the test.
Raises:
reframe.core.exceptions.SanityError: If any reference files
do not exist.
"""
# Store error messages so all references files can be checked
# (and created if necessary) before the test is failed.
error_messages = []
for check in self.regression_checks:
if not check.reference_file_exists():
if self.test_dependency is None and check.create_reference_file():
error_messages.append(
f"Reference file does not exist. Creating... '{check.reference_file}'"
)
elif self.test_dependency is not None:
error_messages.append(
f"ERROR: Test dependency did not create reference file: '{check.reference_file}'"
)
else:
error_messages.append(
f"ERROR: Unable to create reference file: '{check.reference_file}'"
)
return sn.assert_true(len(error_messages) < 1, "\n".join(error_messages))
@sanity_function
def regression_check(self) -> bool:
"""Run sanity checks and regression checks.
Checks will run in the following order:
- Check the simulation completed.
- Check all reference files exist.
- Run all regression checks.
If any of these checks fail, the test will fail and none of the
other later checks will run.
Returns:
test_passed: Returns True if all checks pass.
Raises:
reframe.core.exceptions.SanityError: If any regression
checks fail.
"""
return (
self.test_simulation_complete()
and self.test_reference_files_exist()
and sn.all(sn.map(lambda check: check.run(), self.regression_checks))
)
@performance_function("s", perf_key="run_time")
def extract_run_time(self):
"""Extract total runtime from the last task to complete."""
return sn.extractsingle(
r"real\s+(?P<run_time>\S+)", self.stderr, "run_time", float, self.num_tasks - 1
)
@performance_function("s", perf_key="simulation_time")
def extract_simulation_time(self):
"""Extract simulation time reported by gprMax."""
# sn.extractall throws an error if a group has value None.
# Therefore have to handle the < 1 min, >= 1 min and >= 1 hour cases separately.
timeframe = sn.extractsingle(
r"=== Simulation completed in \S+ (?P<timeframe>hour|minute|second)",
self.stdout,
"timeframe",
)
if timeframe == "hour":
simulation_time = sn.extractall(
r"=== Simulation completed in (?P<hours>\S+) hours?, (?P<minutes>\S+) minutes? and (?P<seconds>\S+) seconds? =*",
self.stdout,
["hours", "minutes", "seconds"],
float,
)
hours = simulation_time[0][0]
minutes = simulation_time[0][1]
seconds = simulation_time[0][2]
elif timeframe == "minute":
hours = 0
simulation_time = sn.extractall(
r"=== Simulation completed in (?P<minutes>\S+) minutes? and (?P<seconds>\S+) seconds? =*",
self.stdout,
["minutes", "seconds"],
float,
)
minutes = simulation_time[0][0]
seconds = simulation_time[0][1]
else:
hours = 0
minutes = 0
seconds = sn.extractsingle(
r"=== Simulation completed in (?P<seconds>\S+) seconds? =*",
self.stdout,
"seconds",
float,
)
return hours * 3600 + minutes * 60 + seconds