diff --git a/gprMax/gprMax.py b/gprMax/gprMax.py index 1a154601..a22df27e 100644 --- a/gprMax/gprMax.py +++ b/gprMax/gprMax.py @@ -16,58 +16,11 @@ # You should have received a copy of the GNU General Public License # along with gprMax. If not, see . -"""gprMax.gprMax: provides entry point main().""" +from .config import create_simulation_config +from .contexts import create_runner +from .solvers import create_solver import argparse -import datetime -import os -import sys - -from enum import Enum - -# There is a bug with threading and MKL on macOS -# (https://github.com/gprMax/gprMax/issues/195) . Setting the MKL threading -# layer to sequential solves it, but must be done before numpy is imported. -if sys.platform == 'darwin': - os.environ["MKL_THREADING_LAYER"] = 'sequential' - -import h5py -import numpy as np - -from ._version import __version__ -from ._version import codename -import gprMax.config as config -from .exceptions import GeneralError -from .model_build_run import run_model -from .utilities import detect_check_gpus -from .utilities import get_terminal_width -from .utilities import human_size -from .utilities import logo -from .utilities import open_path_file -from .utilities import timer - - -def main(): - """This is the main function for gprMax.""" - - # Parse command line arguments - parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('inputfile', help='path to, and name of inputfile or file object') - parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan') - parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)') - parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan') - parser.add_argument('-mpi', type=int, help='number of MPI tasks, i.e. master + workers') - parser.add_argument('--mpi-no-spawn', action='store_true', default=False, help='flag to use MPI without spawn mechanism') - parser.add_argument('--mpi-worker', action='store_true', default=False, help=argparse.SUPPRESS) - parser.add_argument('-gpu', type=int, action='append', nargs='*', help='flag to use Nvidia GPU or option to give list of device ID(s)') - parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode') - parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)') - parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed') - parser.add_argument('--write-processed', action='store_true', default=False, help='flag to write an input file after any Python code and include commands in the original input file have been processed') - args = parser.parse_args() - - run_main(args) - def api( inputfile, @@ -106,412 +59,30 @@ def api( run_main(args) +def main(): + """This is the main function for gprMax.""" + + # Parse command line arguments + parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('inputfile', help='path to, and name of inputfile or file object') + parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan') + parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)') + parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan') + parser.add_argument('-mpi', type=int, help='number of MPI tasks, i.e. master + workers') + parser.add_argument('--mpi-no-spawn', action='store_true', default=False, help='flag to use MPI without spawn mechanism') + parser.add_argument('--mpi-worker', action='store_true', default=False, help=argparse.SUPPRESS) + parser.add_argument('-gpu', type=int, action='append', nargs='*', help='flag to use Nvidia GPU or option to give list of device ID(s)') + parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode') + parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)') + parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed') + parser.add_argument('--write-processed', action='store_true', default=False, help='flag to write an input file after any Python code and include commands in the original input file have been processed') + args = parser.parse_args() + + run_main(args) + def run_main(args): - """ - Top-level function that controls what mode of simulation (standard/optimsation/benchmark etc...) is run. - Args: - args (dict): Namespace with input arguments from command line or api. - """ - - # Print gprMax logo, version, and licencing/copyright information - logo(__version__ + ' (' + codename + ')') - - # Print information about host machine - hyperthreading = ', {} cores with HT'.format(config.hostinfo['logicalcores']) if config.hostinfo['hyperthreading'] else '' - print('\nHost: {} | {} | {} x {} ({} cores{}) | {} RAM | {}'.format(config.hostinfo['hostname'], - config.hostinfo['machineID'], config.hostinfo['sockets'], config.hostinfo['cpuID'], config.hostinfo['physicalcores'], - hyperthreading, human_size(config.hostinfo['ram'], a_kilobyte_is_1024_bytes=True), config.hostinfo['osversion'])) - - # Get information/setup any Nvidia GPU(s) - if args.gpu is not None: - # Flatten a list of lists - if any(isinstance(element, list) for element in args.gpu): - args.gpu = [val for sublist in args.gpu for val in sublist] - config.cuda['gpus'], allgpustext = detect_check_gpus(args.gpu) - print('GPU(s): {}'.format(' | '.join(allgpustext))) - - # Process input file - with open_path_file(args.inputfile) as inputfile: - - # Create a separate namespace that users can access in any Python code blocks in the input file - usernamespace = {'c': config.c, 'e0': config.e0, 'm0': config.m0, 'z0': config.z0, 'number_model_runs': args.n, 'inputfile': os.path.abspath(inputfile.name)} - - ####################################### - # Process for benchmarking simulation # - ####################################### - if args.benchmark: - if args.mpi or args.task or args.n > 1: - raise GeneralError('Benchmarking mode cannot be combined with MPI, job array mode, or multiple model runs.') - run_benchmark_sim(args, inputfile, usernamespace) - - ################################################ - # Process for standard simulation (CPU or GPU) # - ################################################ - else: - # Mixed mode MPI with OpenMP or CUDA - MPI task farm for models with each model parallelised with OpenMP (CPU) or CUDA (GPU) - if args.mpi: - if args.n == 1: - raise GeneralError('MPI is not beneficial when there is only one model to run') - if args.task: - raise GeneralError('MPI cannot be combined with job array mode') - run_mpi_sim(args, inputfile, usernamespace) - - # Alternate MPI configuration that does not use MPI spawn mechanism - elif args.mpi_no_spawn: - if args.n == 1: - raise GeneralError('MPI is not beneficial when there is only one model to run') - if args.task: - raise GeneralError('MPI cannot be combined with job array mode') - run_mpi_no_spawn_sim(args, inputfile, usernamespace) - - # Standard behaviour - models run serially with each model parallelised with OpenMP (CPU) or CUDA (GPU) - else: - if args.task and args.restart: - raise GeneralError('Job array and restart modes cannot be used together') - if config.cuda['gpus']: - config.cuda['gpus'] = config.cuda['gpus'][0] - run_std_sim(args, inputfile, usernamespace) - - -def run_std_sim(args, inputfile, usernamespace): - """ - Run standard simulation - models are run one after another and each model - is parallelised using either OpenMP (CPU) or CUDA (GPU) - - Args: - args (dict): Namespace with command line arguments - inputfile (object): File object for the input file. - usernamespace (dict): Namespace that can be accessed by user in any - Python code blocks in input file. - """ - - # Set range for number of models to run - if args.task: - # Job array feeds args.n number of single tasks - modelstart = args.task - modelend = args.task + 1 - elif args.restart: - modelstart = args.restart - modelend = modelstart + args.n - else: - modelstart = 1 - modelend = modelstart + args.n - numbermodelruns = args.n - - tsimstart = timer() - for currentmodelrun in range(modelstart, modelend): - run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) - tsimend = timer() - simcompletestr = '\n=== Simulation on {} completed in [HH:MM:SS]: {}'.format(config.hostinfo['hostname'], datetime.timedelta(seconds=tsimend - tsimstart)) - print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) - - -def run_benchmark_sim(args, inputfile, usernamespace): - """ - Run standard simulation in benchmarking mode - models are run one - after another and each model is parallelised using either OpenMP (CPU) - or CUDA (GPU) - - Args: - args (dict): Namespace with command line arguments - inputfile (object): File object for the input file. - usernamespace (dict): Namespace that can be accessed by user in any - Python code blocks in input file. - """ - - # Store information about host machine - hyperthreading = ', {} cores with HT'.format(config.hostinfo['logicalcores']) if config.hostinfo['hyperthreading'] else '' - machineIDlong = '{}; {} x {} ({} cores{}); {} RAM; {}'.format(config.hostinfo['machineID'], config.hostinfo['sockets'], config.hostinfo['cpuID'], config.hostinfo['physicalcores'], hyperthreading, human_size(config.hostinfo['ram'], a_kilobyte_is_1024_bytes=True), config.hostinfo['osversion']) - - # Initialise arrays to hold CPU thread info and times, and GPU info and times - cputhreads = np.array([], dtype=np.int32) - cputimes = np.array([]) - gpuIDs = [] - gputimes = np.array([]) - - # CPU only benchmarking - if args.gpu is None: - # Number of CPU threads to benchmark - start from single thread and double threads until maximum number of physical cores - threads = 1 - maxthreads = config.hostinfo['physicalcores'] - maxthreadspersocket = config.hostinfo['physicalcores'] / config.hostinfo['sockets'] - while threads < maxthreadspersocket: - cputhreads = np.append(cputhreads, int(threads)) - threads *= 2 - # Check for system with only single thread - if cputhreads.size == 0: - cputhreads = np.append(cputhreads, threads) - # Add maxthreadspersocket and maxthreads if necessary - if cputhreads[-1] != maxthreadspersocket: - cputhreads = np.append(cputhreads, int(maxthreadspersocket)) - if cputhreads[-1] != maxthreads: - cputhreads = np.append(cputhreads, int(maxthreads)) - cputhreads = cputhreads[::-1] - cputimes = np.zeros(len(cputhreads)) - - numbermodelruns = len(cputhreads) - - # GPU only benchmarking - else: - # Set size of array to store GPU runtimes and number of runs of model required - for gpu in args.gpu: - gpuIDs.append(gpu.name) - gputimes = np.zeros(len(args.gpu)) - numbermodelruns = len(args.gpu) - # Store GPU information in a temp variable - gpus = args.gpu - - usernamespace['number_model_runs'] = numbermodelruns - modelend = numbermodelruns + 1 - - for currentmodelrun in range(1, modelend): - # Run CPU benchmark - if args.gpu is None: - os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1]) - cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) - # Run GPU benchmark - else: - args.gpu = gpus[(currentmodelrun - 1)] - os.environ['OMP_NUM_THREADS'] = str(config.hostinfo['physicalcores']) - gputimes[(currentmodelrun - 1)] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) - - # Get model size (in cells) and number of iterations - if currentmodelrun == 1: - if numbermodelruns == 1: - outputfile = os.path.splitext(args.inputfile)[0] + '.out' - else: - outputfile = os.path.splitext(args.inputfile)[0] + str(currentmodelrun) + '.out' - f = h5py.File(outputfile, 'r') - iterations = f.attrs['Iterations'] - numcells = f.attrs['nx_ny_nz'] - - # Save number of threads and benchmarking times to NumPy archive - np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=gpuIDs, cputhreads=cputhreads, cputimes=cputimes, gputimes=gputimes, iterations=iterations, numcells=numcells, version=__version__) - - simcompletestr = '\n=== Simulation completed' - print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) - - -def run_mpi_sim(args, inputfile, usernamespace): - """ - Run mixed mode MPI/OpenMP simulation - MPI task farm for models with - each model parallelised using either OpenMP (CPU) or CUDA (GPU) - - Args: - args (dict): Namespace with command line arguments - inputfile (object): File object for the input file. - usernamespace (dict): Namespace that can be accessed by user in any - Python code blocks in input file. - """ - - from mpi4py import MPI - - status = MPI.Status() - hostname = MPI.Get_processor_name() - - # Set range for number of models to run - modelstart = args.restart if args.restart else 1 - modelend = modelstart + args.n - numbermodelruns = args.n - - # Command line flag used to indicate a spawned worker instance - workerflag = '--mpi-worker' - numworkers = args.mpi - 1 - - ################## - # Master process # - ################## - if workerflag not in sys.argv: - # N.B Spawned worker flag (--mpi-worker) applied to sys.argv when MPI.Spawn is called - - # See if the MPI communicator object is being passed as an argument (likely from a MPI.Split) - if hasattr(args, 'mpicomm'): - comm = args.mpicomm - else: - comm = MPI.COMM_WORLD - tsimstart = timer() - mpistartstr = '\n=== MPI task farm (USING MPI Spawn)' - print('{} {}'.format(mpistartstr, '=' * (get_terminal_width() - 1 - len(mpistartstr)))) - print('=== MPI master ({}, rank: {}) on {} spawning {} workers...'.format(comm.name, comm.Get_rank(), hostname, numworkers)) - - # Assemble a sys.argv replacement to pass to spawned worker - # N.B This is required as sys.argv not available when gprMax is called via api() - # Ignore mpicomm object if it exists as only strings can be passed via spawn - myargv = [] - for key, value in vars(args).items(): - if value: - # Input file name always comes first - if 'inputfile' in key: - myargv.append(value) - elif 'gpu' in key: - myargv.append('-' + key) - # Add GPU device ID(s) from GPU objects - for gpu in args.gpu: - myargv.append(str(gpu.deviceID)) - elif 'mpicomm' in key: - pass - elif '_' in key: - key = key.replace('_', '-') - myargv.append('--' + key) - else: - myargv.append('-' + key) - if value is not True: - myargv.append(str(value)) - - # Create a list of work - worklist = [] - for model in range(modelstart, modelend): - workobj = dict() - workobj['currentmodelrun'] = model - workobj['mpicommname'] = comm.name - worklist.append(workobj) - # Add stop sentinels - worklist += ([StopIteration] * numworkers) - - # Spawn workers - newcomm = comm.Spawn(sys.executable, args=['-m', 'gprMax'] + myargv + [workerflag], maxprocs=numworkers) - - # Reply to whoever asks until done - for work in worklist: - newcomm.recv(source=MPI.ANY_SOURCE, status=status) - newcomm.send(obj=work, dest=status.Get_source()) - - # Shutdown communicators - newcomm.Disconnect() - - tsimend = timer() - simcompletestr = '\n=== MPI master ({}, rank: {}) on {} completed simulation in [HH:MM:SS]: {}'.format(comm.name, comm.Get_rank(), hostname, datetime.timedelta(seconds=tsimend - tsimstart)) - print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) - - ################## - # Worker process # - ################## - elif workerflag in sys.argv: - # Connect to parent to get communicator - try: - comm = MPI.Comm.Get_parent() - rank = comm.Get_rank() - except ValueError: - raise ValueError('MPI worker could not connect to parent') - - # Select GPU and get info - gpuinfo = '' - if args.gpu is not None: - # Set device ID based on rank from list of GPUs - args.gpu = args.gpu[rank] - gpuinfo = ' using {} - {}, {} RAM '.format(args.gpu.deviceID, args.gpu.name, human_size(args.gpu.totalmem, a_kilobyte_is_1024_bytes=True)) - - # Ask for work until stop sentinel - for work in iter(lambda: comm.sendrecv(0, dest=0), StopIteration): - currentmodelrun = work['currentmodelrun'] - - # Run the model - print('Starting MPI spawned worker (parent: {}, rank: {}) on {} with model {}/{}{}\n'.format(work['mpicommname'], rank, hostname, currentmodelrun, numbermodelruns, gpuinfo)) - tsolve = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) - print('Completed MPI spawned worker (parent: {}, rank: {}) on {} with model {}/{}{} in [HH:MM:SS]: {}\n'.format(work['mpicommname'], rank, hostname, currentmodelrun, numbermodelruns, gpuinfo, datetime.timedelta(seconds=tsolve))) - - # Shutdown - comm.Disconnect() - - -def run_mpi_no_spawn_sim(args, inputfile, usernamespace): - """ - Alternate MPI implementation that avoids using the MPI spawn mechanism. - This implementation is designed to be used as - e.g. 'mpirun -n 5 python -m gprMax user_models/mymodel.in -n 10 --mpi-no-spawn' - - Run mixed mode MPI/OpenMP simulation - MPI task farm for models with - each model parallelised using either OpenMP (CPU) or CUDA (GPU) - - Args: - args (dict): Namespace with command line arguments - inputfile (object): File object for the input file. - usernamespace (dict): Namespace that can be accessed by user in any - Python code blocks in input file. - """ - - from mpi4py import MPI - - # Define MPI message tags - tags = Enum('tags', {'READY': 0, 'DONE': 1, 'EXIT': 2, 'START': 3}) - - # Initializations and preliminaries - comm = MPI.COMM_WORLD - size = comm.Get_size() # total number of processes - rank = comm.Get_rank() # rank of this process - status = MPI.Status() # get MPI status object - hostname = MPI.Get_processor_name() # get name of processor/host - - # Set range for number of models to run - modelstart = args.restart if args.restart else 1 - modelend = modelstart + args.n - numbermodelruns = args.n - currentmodelrun = modelstart # can use -task argument to start numbering from something other than 1 - numworkers = size - 1 - - ################## - # Master process # - ################## - if rank == 0: - tsimstart = timer() - mpistartstr = '\n=== MPI task farm (WITHOUT using MPI Spawn)' - print('{} {}'.format(mpistartstr, '=' * (get_terminal_width() - 1 - len(mpistartstr)))) - print('=== MPI master ({}, rank: {}) on {} using {} workers...'.format(comm.name, comm.Get_rank(), hostname, numworkers)) - - closedworkers = 0 - while closedworkers < numworkers: - comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) - source = status.Get_source() - tag = status.Get_tag() - - # Worker is ready, so send it a task - if tag == tags.READY.value: - if currentmodelrun < modelend: - comm.send(currentmodelrun, dest=source, tag=tags.START.value) - currentmodelrun += 1 - else: - comm.send(None, dest=source, tag=tags.EXIT.value) - - # Worker has completed a task - elif tag == tags.DONE.value: - pass - - # Worker has completed all tasks - elif tag == tags.EXIT.value: - closedworkers += 1 - - tsimend = timer() - simcompletestr = '\n=== MPI master ({}, rank: {}) on {} completed simulation in [HH:MM:SS]: {}'.format(comm.name, comm.Get_rank(), hostname, datetime.timedelta(seconds=tsimend - tsimstart)) - print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) - - ################## - # Worker process # - ################## - else: - # Get info and setup device ID for GPU(s) - gpuinfo = '' - if args.gpu is not None: - # Set device ID based on rank from list of GPUs - deviceID = (rank - 1) % len(args.gpu) - args.gpu = next(gpu for gpu in args.gpu if gpu.deviceID == deviceID) - gpuinfo = ' using {} - {}, {}'.format(args.gpu.deviceID, args.gpu.name, human_size(args.gpu.totalmem, a_kilobyte_is_1024_bytes=True)) - - while True: - comm.send(None, dest=0, tag=tags.READY.value) - # Receive a model number to run from the master - currentmodelrun = comm.recv(source=0, tag=MPI.ANY_TAG, status=status) - tag = status.Get_tag() - - # Run a model - if tag == tags.START.value: - print('Starting MPI worker (parent: {}, rank: {}) on {} with model {}/{}{}\n'.format(comm.name, rank, hostname, currentmodelrun, numbermodelruns, gpuinfo)) - tsolve = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) - comm.send(None, dest=0, tag=tags.DONE.value) - print('Completed MPI worker (parent: {}, rank: {}) on {} with model {}/{}{} in [HH:MM:SS]: {}\n'.format(comm.name, rank, hostname, currentmodelrun, numbermodelruns, gpuinfo, datetime.timedelta(seconds=tsolve))) - - # Break out of loop when work receives exit message - elif tag == tags.EXIT.value: - break - - comm.send(None, dest=0, tag=tags.EXIT.value) + sim_conf = create_simulation_config() + solver = create_solver(sim_config) + run = create_runner(sim_config) + run(sim_config, solver)