Work on integrating new MPIExecutor (Tobias).

这个提交包含在:
Craig Warren
2020-03-09 17:35:41 +00:00
父节点 1e4237c2f5
当前提交 516660c2d2
共有 9 个文件被更改,包括 160 次插入182 次删除

查看文件

@@ -32,6 +32,7 @@ import gprMax.config as config
from .exceptions import CmdInputError
from .waveforms import Waveform
from .utilities import round_value
from .utilities import set_omp_threads
logger = logging.getLogger(__name__)

查看文件

@@ -49,8 +49,10 @@ model_num = 0
def get_model_config():
"""Return ModelConfig instace for specific model."""
return model_configs[model_num]
if sim_config.args.mpi:
return model_configs
else:
return model_configs[model_num]
class ModelConfig:
"""Configuration parameters for a model.
@@ -169,7 +171,7 @@ class ModelConfig:
class SimulationConfig:
"""Configuration parameters for a standard simulation.
"""Configuration parameters for a simulation.
N.B. A simulation can consist of multiple models.
"""
@@ -181,6 +183,9 @@ class SimulationConfig:
self.args = args
if args.mpi and args.geometry_fixed:
raise GeneralError('The geometry fixed option cannot be used with MPI.')
# General settings for the simulation
# inputfilepath: path to inputfile location
# outputfilepath: path to outputfile location
@@ -189,14 +194,16 @@ class SimulationConfig:
# subgrid: whether the simulation uses sub-grids
# precision: data type for electromagnetic field output (single/double)
self.general = {'log_level': logging.WARNING,
'progressbars': True,
'cpu': True,
self.general = {'cpu': True,
'cuda': False,
'opencl': False,
'subgrid': False,
'precision': 'single'}
# Progress bars on stdoout or not - switch off progressbars
# when > basic logging level is used
self.general['progressbars'] = False if logging.root.level > 20 else True
self.em_consts = {'c': c, # Speed of light in free space (m/s)
'e0': e0, # Permittivity of free space (F/m)
'm0': m0, # Permeability of free space (H/m)
@@ -252,12 +259,12 @@ class SimulationConfig:
self._set_model_start_end()
self._set_single_model()
def set_model_gpu(self):
"""Specify single GPU object for model.
Uses first GPU deviceID if list of deviceID given."""
def set_model_gpu(self, deviceID=0):
"""Specify GPU object for model. Defaults to first GPU deviceID in
list of deviceID given.
"""
for gpu in self.cuda['gpus']:
if gpu.deviceID == self.args.gpu[0]:
if gpu.deviceID == self.args.gpu[deviceID]:
return gpu
def _set_precision(self):
@@ -301,11 +308,7 @@ class SimulationConfig:
def _set_model_start_end(self):
"""Set range for number of models to run (internally 0 index)."""
if self.args.task:
# Job array feeds args.n number of single tasks
modelstart = self.args.task - 1
modelend = self.args.task
elif self.args.restart:
if self.args.restart:
modelstart = self.args.restart - 1
modelend = modelstart + self.args.n - 1
else:
@@ -323,21 +326,3 @@ class SimulationConfig:
# API/CLI
else:
self.input_file_path = Path(self.args.inputfile)
class SimulationConfigMPI(SimulationConfig):
"""Configuration parameters for a MPI simulation.
N.B. A simulation can consist of multiple models.
"""
def __init__(self, args):
super().__init__(args)
def _set_model_start_end(self):
# Set range for number of models to run
self.model_start = self.args.restart if self.args.restart else 1
self.model_end = self.model_start + self.args.n
def set_model_gpu(self):
"""Leave list of GPU object(s) as multi-object list."""
pass

查看文件

@@ -1,42 +0,0 @@
# Copyright (C) 2015-2020: The University of Edinburgh
# Authors: Craig Warren and Antonis Giannopoulos
#
# This file is part of gprMax.
#
# gprMax is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# gprMax is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gprMax. If not, see <http://www.gnu.org/licenses/>.
import gprMax.config as config
def write_simulation_config(args):
"""Write simulation level configuration parameters to config module.
Args:
args (Namespace): Arguments from either API or CLI.
"""
if args.mpi:
config.sim_config = config.SimulationConfigMPI(args)
else:
config.sim_config = config.SimulationConfig(args)
def write_model_config():
"""Write model level configuration parameters to config module. As there can
only be one instance of the config module objects are always found via
config.get_model_config()
"""
model_config = config.ModelConfig()
config.model_configs.append(model_config)

查看文件

@@ -21,7 +21,6 @@ import logging
import gprMax.config as config
from ._version import __version__, codename
from .config_parser import write_model_config
from .model_build_run import ModelBuildRun
from .solvers import create_solver
from .solvers import create_G
@@ -30,12 +29,12 @@ from .utilities import human_size
from .utilities import logo
from .utilities import timer
log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class Context:
"""Generic context for the model to run in. Sub-class with specific contexts
e.g. an MPI context.
"""Standard context - models are run one after another and each model
can exploit parallelisation using either OpenMP (CPU) or CUDA (GPU).
"""
def __init__(self):
@@ -50,33 +49,33 @@ class Context:
if config.sim_config.general['cuda']:
self.print_gpu_info()
self.tsimstart = timer()
# Clear list of model configs. It can be retained when gprMax is
# called in a loop, and want to avoid this.
config.model_configs = []
self._run()
for i in self.model_range:
config.model_num = i
model_config = config.ModelConfig()
config.model_configs.append(model_config)
# Always create a grid for the first model. The next model to run
# only gets a new grid if the geometry is not re-used.
if i != 0 and config.sim_config.args.geometry_fixed:
config.get_model_config().reuse_geometry = True
else:
G = create_G()
model = ModelBuildRun(G)
model.build()
solver = create_solver(G)
if not config.sim_config.args.geometry_only:
model.solve(solver)
self.tsimend = timer()
self.print_time_report()
def _run_model(self, i):
"""Process for running a single model."""
config.model_num = i
write_model_config()
# Always create a grid for the first model. The next model to run
# only gets a new grid if the geometry is not re-used.
if i != 0 and config.sim_config.args.geometry_fixed:
config.get_model_config().reuse_geometry = True
else:
G = create_G()
model = ModelBuildRun(G)
model.build()
solver = create_solver(G)
if not config.sim_config.args.geometry_only:
model.solve(solver)
def print_logo_copyright(self):
"""Print gprMax logo, version, and copyright/licencing information."""
logo(__version__ + ' (' + codename + ')')
@@ -84,30 +83,19 @@ class Context:
def print_host_info(self):
"""Print information about the host machine."""
hyperthreadingstr = f", {config.sim_config.hostinfo['logicalcores']} cores with Hyper-Threading" if config.sim_config.hostinfo['hyperthreading'] else ''
log.info(f"\nHost: {config.sim_config.hostinfo['hostname']} | {config.sim_config.hostinfo['machineID']} | {config.sim_config.hostinfo['sockets']} x {config.sim_config.hostinfo['cpuID']} ({config.sim_config.hostinfo['physicalcores']} cores{hyperthreadingstr}) | {human_size(config.sim_config.hostinfo['ram'], a_kilobyte_is_1024_bytes=True)} RAM | {config.sim_config.hostinfo['osversion']}")
logger.basic(f"\nHost: {config.sim_config.hostinfo['hostname']} | {config.sim_config.hostinfo['machineID']} | {config.sim_config.hostinfo['sockets']} x {config.sim_config.hostinfo['cpuID']} ({config.sim_config.hostinfo['physicalcores']} cores{hyperthreadingstr}) | {human_size(config.sim_config.hostinfo['ram'], a_kilobyte_is_1024_bytes=True)} RAM | {config.sim_config.hostinfo['osversion']}")
def print_gpu_info(self):
"""Print information about any NVIDIA CUDA GPUs detected."""
gpus_info = []
for gpu in config.sim_config.cuda['gpus']:
gpus_info.append(f'{gpu.deviceID} - {gpu.name}, {human_size(gpu.totalmem, a_kilobyte_is_1024_bytes=True)}')
log.info(f" with GPU(s): {' | '.join(gpus_info)}")
logger.basic(f" with GPU(s): {' | '.join(gpus_info)}")
def print_time_report(self):
"""Print the total simulation time based on context."""
s = f"\n=== Simulation on {config.sim_config.hostinfo['hostname']} completed in [HH:MM:SS]: {datetime.timedelta(seconds=self.tsimend - self.tsimstart)}"
log.info(f"{s} {'=' * (get_terminal_width() - 1 - len(s))}\n")
class NoMPIContext(Context):
"""Standard context - models are run one after another and each model
can exploit parallelisation using either OpenMP (CPU) or CUDA (GPU).
"""
def _run(self):
"""Specialise how models are run."""
for i in self.model_range:
self._run_model(i)
logger.basic(f"{s} {'=' * (get_terminal_width() - 1 - len(s))}\n")
class MPIContext(Context):
@@ -125,21 +113,65 @@ class MPIContext(Context):
self.rank = self.comm.rank
self.MPIExecutor = MPIExecutor
def _run(self):
def _run_model(self, i, GPUdeviceID):
"""Process for running a single model."""
config.model_num = i
model_config = config.ModelConfig()
if config.sim_config.general['cuda']:
config.sim_config.set_model_gpu(GPUdeviceID)
config.model_configs = model_config
G = create_G()
model = ModelBuildRun(G)
model.build()
solver = create_solver(G)
if not config.sim_config.args.geometry_only:
model.solve(solver)
def run(self):
"""Specialise how the models are run."""
# compile jobs
jobs = []
for i in range(config.sim_config.args.n):
jobs.append({'i': i})
self.tsimstart = timer()
# Execute jobs
log.info(f'Starting execution of {config.sim_config.args.n} gprMax model runs.')
with self.MPIExecutor(self._run_model, comm=self.comm) as executor:
if executor is not None:
results = executor.submit(jobs)
log.info('Results: %s' % str(results))
log.info('Finished.')
# Contruct MPIExecutor
executor = self.MPIExecutor(self._run_model, comm=self.comm)
# Compile jobs
jobs = []
for i in self.model_range:
jobs.append({'i': i,
'GPUdeviceID': 0})
# if executor.is_master():
# self.print_logo_copyright()
# self.print_host_info()
# if config.sim_config.general['cuda']:
# self.print_gpu_info()
# Send the workers to their work loop
executor.start()
if executor.is_master():
results = executor.submit(jobs)
# Make the workers exit their work loop and join the main loop again
executor.join()
# with self.MPIExecutor(self._run_model, comm=self.comm) as executor:
# if executor is not None:
# results = executor.submit(jobs)
# logger.info('Results: %s' % str(results))
# logger.basic('Finished.')
self.tsimend = timer()
if executor.is_master():
self.print_time_report()
def print_time_report(self):
"""Print the total simulation time based on context."""
s = f"\n=== Simulation on {config.sim_config.hostinfo['hostname']} completed in [HH:MM:SS]: {datetime.timedelta(seconds=self.tsimend - self.tsimstart)}"
logger.basic(f"{s} {'=' * (get_terminal_width() - 1 - len(s))}\n")
def create_context():
@@ -152,6 +184,6 @@ def create_context():
if config.sim_config.args.mpi:
context = MPIContext()
else:
context = NoMPIContext()
context = Context()
return context

查看文件

@@ -126,7 +126,7 @@ def write_hdf5_outputfile(outputfile, G):
write_grid(grp, sg, is_subgrid=True)
if G.rxs or sg_rxs:
logger.info(f'Written output file: {outputfile.name}')
logger.basic(f'Written output file: {outputfile.name}')
def write_grid(basegrp, G, is_subgrid=False):

查看文件

@@ -19,12 +19,12 @@
import argparse
import logging
from .config_parser import write_simulation_config
import gprMax.config as config
from .contexts import create_context
from .utilities import setup_logging
logger = logging.getLogger(__name__)
setup_logging(level=25)
def run(
scenes=None,
@@ -112,6 +112,7 @@ def run(
args = ImportArguments()
args.scenes = scenes
args.subgrid = subgrid
args.inputfile = inputfile
args.outputfile = outputfile
args.n = n
@@ -119,7 +120,6 @@ def run(
args.restart = restart
args.mpi = mpi
args.gpu = gpu
args.subgrid = subgrid
args.autotranslate = autotranslate
args.geometry_only = geometry_only
args.geometry_fixed = geometry_fixed
@@ -156,14 +156,8 @@ def main():
parser.add_argument('--write-processed', action='store_true', default=False,
help='flag to write an input file after any Python code and include commands '
'in the original input file have been processed')
parser.add_argument('-l', '--logfile', action='store_true', default=False,
help='flag to enable writing to a log file')
parser.add_argument('-v', '--verbose', action='store_true', default=False,
help="flag to increase output")
args = parser.parse_args()
setup_logging()
try:
run_main(args)
except Exception:
@@ -177,6 +171,7 @@ def run_main(args):
args (Namespace): arguments from either API or CLI.
"""
write_simulation_config(args)
config.sim_config = config.SimulationConfig(args)
context = create_context()
context.run()

查看文件

@@ -132,7 +132,7 @@ class ModelBuildRun:
def build_geometry(self):
G = self.G
logger.info(config.get_model_config().inputfilestr)
logger.basic(config.get_model_config().inputfilestr)
scene = self.build_scene()
@@ -189,7 +189,7 @@ class ModelBuildRun:
self.G.iteration = 0
s = f'\n--- Model {config.get_model_config().appendmodelnumber}/{config.sim_config.model_end}, input file (not re-processed, i.e. geometry fixed): {config.sim_config.input_file_path}'
config.get_model_config().inputfilestr = Fore.GREEN + f"{s} {'-' * (get_terminal_width() - 1 - len(s))}\n" + Style.RESET_ALL
logger.info(config.get_model_config().inputfilestr)
logger.basic(config.get_model_config().inputfilestr)
for grid in [self.G] + self.G.subgrids:
grid.reset_fields()
@@ -260,12 +260,12 @@ class ModelBuildRun:
# Check number of OpenMP threads
if config.sim_config.general['cpu']:
logger.info(f'CPU (OpenMP) threads for solving: {config.get_model_config().ompthreads}\n')
logger.basic(f'CPU (OpenMP) threads for solving: {config.get_model_config().ompthreads}\n')
if config.get_model_config().ompthreads > config.sim_config.hostinfo['physicalcores']:
logger.warning(Fore.RED + f"You have specified more threads ({config.get_model_config().ompthreads}) than available physical CPU cores ({config.sim_config.hostinfo['physicalcores']}). This may lead to degraded performance." + Style.RESET_ALL)
# Print information about any GPU in use
elif config.sim_config.general['cuda']:
logger.info(f"GPU for solving: {config.get_model_config().cuda['gpu'].deviceID} - {config.get_model_config().cuda['gpu'].name}\n")
logger.basic(f"GPU for solving: {config.get_model_config().cuda['gpu'].deviceID} - {config.get_model_config().cuda['gpu'].name}\n")
# Prepare iterator
if config.sim_config.general['progressbars']:

查看文件

@@ -22,7 +22,7 @@ import time
from mpi4py import MPI
_log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
"""
@@ -151,9 +151,9 @@ class MPIExecutor(object):
master = int(master)
if master < 0:
raise ValueError('master rank must be non-negative')
raise ValueError('Master rank must be non-negative')
elif master >= self.size:
raise ValueError('master not in comm')
raise ValueError('Master not in comm')
else:
self.master = master
@@ -161,16 +161,16 @@ class MPIExecutor(object):
self.workers = tuple(set(range(self.size)) - {self.master})
# the worker function
if not callable(func):
raise TypeError('func must be a callable')
raise TypeError('Func must be a callable')
self.func = func
# holds the state of workers on the master
self.busy = [False] * len(self.workers)
_log.debug(f'MPIExecutor on comm: {self.comm.name}, Master: {self.master}, Workers: {self.workers}')
logger.basic(f'Rank {self.rank}: MPIExecutor on comm: {self.comm.name}, Master: {self.master}, Workers: {self.workers}')
if self.is_master():
_log.debug('*** MASTER ***')
logger.debug(f'Rank {self.rank} = MASTER')
else:
_log.debug('*** WORKER ***')
logger.debug(f'Rank {self.rank} = WORKER')
def __enter__(self):
"""Context manager enter.
@@ -186,12 +186,11 @@ class MPIExecutor(object):
"""Context manager exit.
"""
if exc_type is not None:
_log.exception(exc_val)
logger.exception(exc_val)
return False
# no exception handling necessary
# since we catch everything in __guarded_work
# exc_type should always be None
# No exception handling necessary since we catch everything
# in __guarded_work exc_type should always be None
self.join()
return True
@@ -223,10 +222,10 @@ class MPIExecutor(object):
"""
if self.is_master():
if self._up:
raise RuntimeError('start has already been called')
raise RuntimeError('Start has already been called')
self._up = True
_log.info('Starting up MPIExecutor master/workers')
logger.basic(f'Rank {self.rank}: Starting up MPIExecutor master/workers')
if self.is_worker():
self.__wait()
@@ -235,12 +234,12 @@ class MPIExecutor(object):
"""
if self.is_master():
_log.debug('Terminating. Sending sentinel to all workers.')
# send sentinel to all workers
logger.basic(f'Rank {self.rank}: Terminating. Sending sentinel to all workers.')
# Send sentinel to all workers
for worker in self.workers:
self.comm.send(None, dest=worker, tag=Tags.EXIT)
_log.debug('Waiting for all workers to terminate.')
logger.basic(f'Rank {self.rank}: Waiting for all workers to terminate.')
down = [False] * len(self.workers)
while True:
@@ -252,7 +251,7 @@ class MPIExecutor(object):
break
self._up = False
_log.debug('All workers terminated.')
logger.basic(f'Rank {self.rank}: All workers terminated.')
def submit(self, jobs, sleep=0.0):
"""Submits a list of jobs to the workers and returns the results.
@@ -273,9 +272,9 @@ class MPIExecutor(object):
the order of `jobs`.
"""
if not self._up:
raise RuntimeError('cannot run jobs without a call to start()')
raise RuntimeError('Cannot run jobs without a call to start()')
_log.info('Running {:d} jobs.'.format(len(jobs)))
logger.basic(f'Rank {self.rank}: Running {len(jobs):d} jobs.')
assert self.is_master(), 'run() must not be called on a worker process'
my_jobs = jobs.copy()
@@ -287,7 +286,7 @@ class MPIExecutor(object):
if self.comm.Iprobe(source=worker, tag=Tags.DONE):
job_idx, result = self.comm.recv(source=worker, tag=Tags.DONE)
_log.debug(f'Received finished job {job_idx} from worker {worker:d}.')
logger.basic(f'Rank {self.rank}: Received finished job {job_idx} from worker {worker:d}.')
results[job_idx] = result
self.busy[i] = False
elif self.comm.Iprobe(source=worker, tag=Tags.READY):
@@ -295,16 +294,17 @@ class MPIExecutor(object):
self.comm.recv(source=worker, tag=Tags.READY)
self.busy[i] = True
job_idx = num_jobs - len(my_jobs)
_log.debug(f'Sending job {job_idx} to worker {worker:d}.')
logger.basic(f'Rank {self.rank}: Sending job {job_idx} to worker {worker:d}.')
self.comm.send((job_idx, my_jobs.pop(0)), dest=worker, tag=Tags.START)
elif self.comm.Iprobe(source=worker, tag=Tags.EXIT):
_log.debug(f'Worker on rank {worker:d} has terminated.')
logger.basic(f'Rank {self.rank}: Worker on rank {worker:d} has terminated.')
self.comm.recv(source=worker, tag=Tags.EXIT)
self.busy[i] = False
time.sleep(sleep)
_log.info('Finished all jobs.')
logger.basic(f'Rank {self.rank}: Finished all jobs.')
return results
def __wait(self):
@@ -318,27 +318,26 @@ class MPIExecutor(object):
status = MPI.Status()
_log.debug(f'Starting up worker.')
logger.basic(f'Rank {self.rank}: Starting up worker.')
while True:
self.comm.send(None, dest=self.master, tag=Tags.READY)
_log.debug(f'Worker on rank {self.rank} waiting for job.')
logger.basic(f'Rank {self.rank}: Worker on rank {self.rank} waiting for job.')
data = self.comm.recv(source=self.master, tag=MPI.ANY_TAG, status=status)
tag = status.tag
if tag == Tags.START:
job_idx, work = data
_log.debug(f'Received job {job_idx} (work={work}).')
logger.basic(f'Rank {self.rank}: Received job {job_idx} (work={work}).')
result = self.__guarded_work(work)
_log.debug(f'Finished job. Sending results to master.')
logger.basic(f'Rank {self.rank}: Finished job. Sending results to master.')
self.comm.send((job_idx, result), dest=self.master, tag=Tags.DONE)
elif tag == Tags.EXIT:
_log.debug(f'Received sentinel from master.')
logger.basic(f'Rank {self.rank}: Received sentinel from master.')
break
_log.debug('Terminating worker.')
logger.basic(f'Rank {self.rank}: Terminating worker.')
self.comm.send(None, dest=self.master, tag=Tags.EXIT)
def __guarded_work(self, work):
@@ -358,7 +357,7 @@ class MPIExecutor(object):
try:
return self.func(**work)
except Exception as e:
_log.exception(str(e))
logger.exception(str(e))
return None

查看文件

@@ -55,6 +55,14 @@ def setup_logging(level=logging.INFO, logfile=False):
logfile (bool): additional logging to file.
"""
# Add a custom log level
BASIC_NUM = 25
logging.addLevelName(BASIC_NUM, "BASIC")
def basic(self, message, *args, **kws):
if self.isEnabledFor(BASIC_NUM):
self._log(BASIC_NUM, message, args, **kws)
logging.Logger.basic = basic
# Get root logger
logger = logging.getLogger()
logger.setLevel(level)
@@ -111,14 +119,14 @@ def logo(version):
|___/|_|
v""" + version
logger.info(f"{description} {'=' * (get_terminal_width() - len(description) - 1)}\n")
logger.info(Fore.CYAN + f'{logo}\n')
logger.info(Style.RESET_ALL + textwrap.fill(copyright, width=get_terminal_width() - 1, initial_indent=' '))
logger.info(textwrap.fill(authors, width=get_terminal_width() - 1, initial_indent=' '))
logger.info('')
logger.info(textwrap.fill(licenseinfo1, width=get_terminal_width() - 1, initial_indent=' ', subsequent_indent=' '))
logger.info(textwrap.fill(licenseinfo2, width=get_terminal_width() - 1, initial_indent=' ', subsequent_indent=' '))
logger.info(textwrap.fill(licenseinfo3, width=get_terminal_width() - 1, initial_indent=' ', subsequent_indent=' '))
logger.basic(f"{description} {'=' * (get_terminal_width() - len(description) - 1)}\n")
logger.basic(Fore.CYAN + f'{logo}\n')
logger.basic(Style.RESET_ALL + textwrap.fill(copyright, width=get_terminal_width() - 1, initial_indent=' '))
logger.basic(textwrap.fill(authors, width=get_terminal_width() - 1, initial_indent=' '))
logger.basic('')
logger.basic(textwrap.fill(licenseinfo1, width=get_terminal_width() - 1, initial_indent=' ', subsequent_indent=' '))
logger.basic(textwrap.fill(licenseinfo2, width=get_terminal_width() - 1, initial_indent=' ', subsequent_indent=' '))
logger.basic(textwrap.fill(licenseinfo3, width=get_terminal_width() - 1, initial_indent=' ', subsequent_indent=' '))
def round_value(value, decimalplaces=0):