你已经派生过 gprMax
镜像自地址
https://gitee.com/sunhf/gprMax.git
已同步 2025-08-08 07:24:19 +08:00
gprMax refactor rough
这个提交包含在:
485
gprMax/gprMax.py
485
gprMax/gprMax.py
@@ -16,58 +16,11 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with gprMax. If not, see <http://www.gnu.org/licenses/>.
|
# along with gprMax. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""gprMax.gprMax: provides entry point main()."""
|
from .config import create_simulation_config
|
||||||
|
from .contexts import create_runner
|
||||||
|
from .solvers import create_solver
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import datetime
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from enum import Enum
|
|
||||||
|
|
||||||
# There is a bug with threading and MKL on macOS
|
|
||||||
# (https://github.com/gprMax/gprMax/issues/195) . Setting the MKL threading
|
|
||||||
# layer to sequential solves it, but must be done before numpy is imported.
|
|
||||||
if sys.platform == 'darwin':
|
|
||||||
os.environ["MKL_THREADING_LAYER"] = 'sequential'
|
|
||||||
|
|
||||||
import h5py
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
from ._version import __version__
|
|
||||||
from ._version import codename
|
|
||||||
import gprMax.config as config
|
|
||||||
from .exceptions import GeneralError
|
|
||||||
from .model_build_run import run_model
|
|
||||||
from .utilities import detect_check_gpus
|
|
||||||
from .utilities import get_terminal_width
|
|
||||||
from .utilities import human_size
|
|
||||||
from .utilities import logo
|
|
||||||
from .utilities import open_path_file
|
|
||||||
from .utilities import timer
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
"""This is the main function for gprMax."""
|
|
||||||
|
|
||||||
# Parse command line arguments
|
|
||||||
parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
||||||
parser.add_argument('inputfile', help='path to, and name of inputfile or file object')
|
|
||||||
parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan')
|
|
||||||
parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)')
|
|
||||||
parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan')
|
|
||||||
parser.add_argument('-mpi', type=int, help='number of MPI tasks, i.e. master + workers')
|
|
||||||
parser.add_argument('--mpi-no-spawn', action='store_true', default=False, help='flag to use MPI without spawn mechanism')
|
|
||||||
parser.add_argument('--mpi-worker', action='store_true', default=False, help=argparse.SUPPRESS)
|
|
||||||
parser.add_argument('-gpu', type=int, action='append', nargs='*', help='flag to use Nvidia GPU or option to give list of device ID(s)')
|
|
||||||
parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode')
|
|
||||||
parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)')
|
|
||||||
parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed')
|
|
||||||
parser.add_argument('--write-processed', action='store_true', default=False, help='flag to write an input file after any Python code and include commands in the original input file have been processed')
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
run_main(args)
|
|
||||||
|
|
||||||
|
|
||||||
def api(
|
def api(
|
||||||
inputfile,
|
inputfile,
|
||||||
@@ -106,412 +59,30 @@ def api(
|
|||||||
run_main(args)
|
run_main(args)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""This is the main function for gprMax."""
|
||||||
|
|
||||||
|
# Parse command line arguments
|
||||||
|
parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
|
parser.add_argument('inputfile', help='path to, and name of inputfile or file object')
|
||||||
|
parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan')
|
||||||
|
parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)')
|
||||||
|
parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan')
|
||||||
|
parser.add_argument('-mpi', type=int, help='number of MPI tasks, i.e. master + workers')
|
||||||
|
parser.add_argument('--mpi-no-spawn', action='store_true', default=False, help='flag to use MPI without spawn mechanism')
|
||||||
|
parser.add_argument('--mpi-worker', action='store_true', default=False, help=argparse.SUPPRESS)
|
||||||
|
parser.add_argument('-gpu', type=int, action='append', nargs='*', help='flag to use Nvidia GPU or option to give list of device ID(s)')
|
||||||
|
parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode')
|
||||||
|
parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)')
|
||||||
|
parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed')
|
||||||
|
parser.add_argument('--write-processed', action='store_true', default=False, help='flag to write an input file after any Python code and include commands in the original input file have been processed')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
run_main(args)
|
||||||
|
|
||||||
def run_main(args):
|
def run_main(args):
|
||||||
"""
|
|
||||||
Top-level function that controls what mode of simulation (standard/optimsation/benchmark etc...) is run.
|
|
||||||
|
|
||||||
Args:
|
sim_conf = create_simulation_config()
|
||||||
args (dict): Namespace with input arguments from command line or api.
|
solver = create_solver(sim_config)
|
||||||
"""
|
run = create_runner(sim_config)
|
||||||
|
run(sim_config, solver)
|
||||||
# Print gprMax logo, version, and licencing/copyright information
|
|
||||||
logo(__version__ + ' (' + codename + ')')
|
|
||||||
|
|
||||||
# Print information about host machine
|
|
||||||
hyperthreading = ', {} cores with HT'.format(config.hostinfo['logicalcores']) if config.hostinfo['hyperthreading'] else ''
|
|
||||||
print('\nHost: {} | {} | {} x {} ({} cores{}) | {} RAM | {}'.format(config.hostinfo['hostname'],
|
|
||||||
config.hostinfo['machineID'], config.hostinfo['sockets'], config.hostinfo['cpuID'], config.hostinfo['physicalcores'],
|
|
||||||
hyperthreading, human_size(config.hostinfo['ram'], a_kilobyte_is_1024_bytes=True), config.hostinfo['osversion']))
|
|
||||||
|
|
||||||
# Get information/setup any Nvidia GPU(s)
|
|
||||||
if args.gpu is not None:
|
|
||||||
# Flatten a list of lists
|
|
||||||
if any(isinstance(element, list) for element in args.gpu):
|
|
||||||
args.gpu = [val for sublist in args.gpu for val in sublist]
|
|
||||||
config.cuda['gpus'], allgpustext = detect_check_gpus(args.gpu)
|
|
||||||
print('GPU(s): {}'.format(' | '.join(allgpustext)))
|
|
||||||
|
|
||||||
# Process input file
|
|
||||||
with open_path_file(args.inputfile) as inputfile:
|
|
||||||
|
|
||||||
# Create a separate namespace that users can access in any Python code blocks in the input file
|
|
||||||
usernamespace = {'c': config.c, 'e0': config.e0, 'm0': config.m0, 'z0': config.z0, 'number_model_runs': args.n, 'inputfile': os.path.abspath(inputfile.name)}
|
|
||||||
|
|
||||||
#######################################
|
|
||||||
# Process for benchmarking simulation #
|
|
||||||
#######################################
|
|
||||||
if args.benchmark:
|
|
||||||
if args.mpi or args.task or args.n > 1:
|
|
||||||
raise GeneralError('Benchmarking mode cannot be combined with MPI, job array mode, or multiple model runs.')
|
|
||||||
run_benchmark_sim(args, inputfile, usernamespace)
|
|
||||||
|
|
||||||
################################################
|
|
||||||
# Process for standard simulation (CPU or GPU) #
|
|
||||||
################################################
|
|
||||||
else:
|
|
||||||
# Mixed mode MPI with OpenMP or CUDA - MPI task farm for models with each model parallelised with OpenMP (CPU) or CUDA (GPU)
|
|
||||||
if args.mpi:
|
|
||||||
if args.n == 1:
|
|
||||||
raise GeneralError('MPI is not beneficial when there is only one model to run')
|
|
||||||
if args.task:
|
|
||||||
raise GeneralError('MPI cannot be combined with job array mode')
|
|
||||||
run_mpi_sim(args, inputfile, usernamespace)
|
|
||||||
|
|
||||||
# Alternate MPI configuration that does not use MPI spawn mechanism
|
|
||||||
elif args.mpi_no_spawn:
|
|
||||||
if args.n == 1:
|
|
||||||
raise GeneralError('MPI is not beneficial when there is only one model to run')
|
|
||||||
if args.task:
|
|
||||||
raise GeneralError('MPI cannot be combined with job array mode')
|
|
||||||
run_mpi_no_spawn_sim(args, inputfile, usernamespace)
|
|
||||||
|
|
||||||
# Standard behaviour - models run serially with each model parallelised with OpenMP (CPU) or CUDA (GPU)
|
|
||||||
else:
|
|
||||||
if args.task and args.restart:
|
|
||||||
raise GeneralError('Job array and restart modes cannot be used together')
|
|
||||||
if config.cuda['gpus']:
|
|
||||||
config.cuda['gpus'] = config.cuda['gpus'][0]
|
|
||||||
run_std_sim(args, inputfile, usernamespace)
|
|
||||||
|
|
||||||
|
|
||||||
def run_std_sim(args, inputfile, usernamespace):
|
|
||||||
"""
|
|
||||||
Run standard simulation - models are run one after another and each model
|
|
||||||
is parallelised using either OpenMP (CPU) or CUDA (GPU)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
args (dict): Namespace with command line arguments
|
|
||||||
inputfile (object): File object for the input file.
|
|
||||||
usernamespace (dict): Namespace that can be accessed by user in any
|
|
||||||
Python code blocks in input file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Set range for number of models to run
|
|
||||||
if args.task:
|
|
||||||
# Job array feeds args.n number of single tasks
|
|
||||||
modelstart = args.task
|
|
||||||
modelend = args.task + 1
|
|
||||||
elif args.restart:
|
|
||||||
modelstart = args.restart
|
|
||||||
modelend = modelstart + args.n
|
|
||||||
else:
|
|
||||||
modelstart = 1
|
|
||||||
modelend = modelstart + args.n
|
|
||||||
numbermodelruns = args.n
|
|
||||||
|
|
||||||
tsimstart = timer()
|
|
||||||
for currentmodelrun in range(modelstart, modelend):
|
|
||||||
run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
|
|
||||||
tsimend = timer()
|
|
||||||
simcompletestr = '\n=== Simulation on {} completed in [HH:MM:SS]: {}'.format(config.hostinfo['hostname'], datetime.timedelta(seconds=tsimend - tsimstart))
|
|
||||||
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
|
|
||||||
|
|
||||||
|
|
||||||
def run_benchmark_sim(args, inputfile, usernamespace):
|
|
||||||
"""
|
|
||||||
Run standard simulation in benchmarking mode - models are run one
|
|
||||||
after another and each model is parallelised using either OpenMP (CPU)
|
|
||||||
or CUDA (GPU)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
args (dict): Namespace with command line arguments
|
|
||||||
inputfile (object): File object for the input file.
|
|
||||||
usernamespace (dict): Namespace that can be accessed by user in any
|
|
||||||
Python code blocks in input file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Store information about host machine
|
|
||||||
hyperthreading = ', {} cores with HT'.format(config.hostinfo['logicalcores']) if config.hostinfo['hyperthreading'] else ''
|
|
||||||
machineIDlong = '{}; {} x {} ({} cores{}); {} RAM; {}'.format(config.hostinfo['machineID'], config.hostinfo['sockets'], config.hostinfo['cpuID'], config.hostinfo['physicalcores'], hyperthreading, human_size(config.hostinfo['ram'], a_kilobyte_is_1024_bytes=True), config.hostinfo['osversion'])
|
|
||||||
|
|
||||||
# Initialise arrays to hold CPU thread info and times, and GPU info and times
|
|
||||||
cputhreads = np.array([], dtype=np.int32)
|
|
||||||
cputimes = np.array([])
|
|
||||||
gpuIDs = []
|
|
||||||
gputimes = np.array([])
|
|
||||||
|
|
||||||
# CPU only benchmarking
|
|
||||||
if args.gpu is None:
|
|
||||||
# Number of CPU threads to benchmark - start from single thread and double threads until maximum number of physical cores
|
|
||||||
threads = 1
|
|
||||||
maxthreads = config.hostinfo['physicalcores']
|
|
||||||
maxthreadspersocket = config.hostinfo['physicalcores'] / config.hostinfo['sockets']
|
|
||||||
while threads < maxthreadspersocket:
|
|
||||||
cputhreads = np.append(cputhreads, int(threads))
|
|
||||||
threads *= 2
|
|
||||||
# Check for system with only single thread
|
|
||||||
if cputhreads.size == 0:
|
|
||||||
cputhreads = np.append(cputhreads, threads)
|
|
||||||
# Add maxthreadspersocket and maxthreads if necessary
|
|
||||||
if cputhreads[-1] != maxthreadspersocket:
|
|
||||||
cputhreads = np.append(cputhreads, int(maxthreadspersocket))
|
|
||||||
if cputhreads[-1] != maxthreads:
|
|
||||||
cputhreads = np.append(cputhreads, int(maxthreads))
|
|
||||||
cputhreads = cputhreads[::-1]
|
|
||||||
cputimes = np.zeros(len(cputhreads))
|
|
||||||
|
|
||||||
numbermodelruns = len(cputhreads)
|
|
||||||
|
|
||||||
# GPU only benchmarking
|
|
||||||
else:
|
|
||||||
# Set size of array to store GPU runtimes and number of runs of model required
|
|
||||||
for gpu in args.gpu:
|
|
||||||
gpuIDs.append(gpu.name)
|
|
||||||
gputimes = np.zeros(len(args.gpu))
|
|
||||||
numbermodelruns = len(args.gpu)
|
|
||||||
# Store GPU information in a temp variable
|
|
||||||
gpus = args.gpu
|
|
||||||
|
|
||||||
usernamespace['number_model_runs'] = numbermodelruns
|
|
||||||
modelend = numbermodelruns + 1
|
|
||||||
|
|
||||||
for currentmodelrun in range(1, modelend):
|
|
||||||
# Run CPU benchmark
|
|
||||||
if args.gpu is None:
|
|
||||||
os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1])
|
|
||||||
cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
|
|
||||||
# Run GPU benchmark
|
|
||||||
else:
|
|
||||||
args.gpu = gpus[(currentmodelrun - 1)]
|
|
||||||
os.environ['OMP_NUM_THREADS'] = str(config.hostinfo['physicalcores'])
|
|
||||||
gputimes[(currentmodelrun - 1)] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
|
|
||||||
|
|
||||||
# Get model size (in cells) and number of iterations
|
|
||||||
if currentmodelrun == 1:
|
|
||||||
if numbermodelruns == 1:
|
|
||||||
outputfile = os.path.splitext(args.inputfile)[0] + '.out'
|
|
||||||
else:
|
|
||||||
outputfile = os.path.splitext(args.inputfile)[0] + str(currentmodelrun) + '.out'
|
|
||||||
f = h5py.File(outputfile, 'r')
|
|
||||||
iterations = f.attrs['Iterations']
|
|
||||||
numcells = f.attrs['nx_ny_nz']
|
|
||||||
|
|
||||||
# Save number of threads and benchmarking times to NumPy archive
|
|
||||||
np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=gpuIDs, cputhreads=cputhreads, cputimes=cputimes, gputimes=gputimes, iterations=iterations, numcells=numcells, version=__version__)
|
|
||||||
|
|
||||||
simcompletestr = '\n=== Simulation completed'
|
|
||||||
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
|
|
||||||
|
|
||||||
|
|
||||||
def run_mpi_sim(args, inputfile, usernamespace):
|
|
||||||
"""
|
|
||||||
Run mixed mode MPI/OpenMP simulation - MPI task farm for models with
|
|
||||||
each model parallelised using either OpenMP (CPU) or CUDA (GPU)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
args (dict): Namespace with command line arguments
|
|
||||||
inputfile (object): File object for the input file.
|
|
||||||
usernamespace (dict): Namespace that can be accessed by user in any
|
|
||||||
Python code blocks in input file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from mpi4py import MPI
|
|
||||||
|
|
||||||
status = MPI.Status()
|
|
||||||
hostname = MPI.Get_processor_name()
|
|
||||||
|
|
||||||
# Set range for number of models to run
|
|
||||||
modelstart = args.restart if args.restart else 1
|
|
||||||
modelend = modelstart + args.n
|
|
||||||
numbermodelruns = args.n
|
|
||||||
|
|
||||||
# Command line flag used to indicate a spawned worker instance
|
|
||||||
workerflag = '--mpi-worker'
|
|
||||||
numworkers = args.mpi - 1
|
|
||||||
|
|
||||||
##################
|
|
||||||
# Master process #
|
|
||||||
##################
|
|
||||||
if workerflag not in sys.argv:
|
|
||||||
# N.B Spawned worker flag (--mpi-worker) applied to sys.argv when MPI.Spawn is called
|
|
||||||
|
|
||||||
# See if the MPI communicator object is being passed as an argument (likely from a MPI.Split)
|
|
||||||
if hasattr(args, 'mpicomm'):
|
|
||||||
comm = args.mpicomm
|
|
||||||
else:
|
|
||||||
comm = MPI.COMM_WORLD
|
|
||||||
tsimstart = timer()
|
|
||||||
mpistartstr = '\n=== MPI task farm (USING MPI Spawn)'
|
|
||||||
print('{} {}'.format(mpistartstr, '=' * (get_terminal_width() - 1 - len(mpistartstr))))
|
|
||||||
print('=== MPI master ({}, rank: {}) on {} spawning {} workers...'.format(comm.name, comm.Get_rank(), hostname, numworkers))
|
|
||||||
|
|
||||||
# Assemble a sys.argv replacement to pass to spawned worker
|
|
||||||
# N.B This is required as sys.argv not available when gprMax is called via api()
|
|
||||||
# Ignore mpicomm object if it exists as only strings can be passed via spawn
|
|
||||||
myargv = []
|
|
||||||
for key, value in vars(args).items():
|
|
||||||
if value:
|
|
||||||
# Input file name always comes first
|
|
||||||
if 'inputfile' in key:
|
|
||||||
myargv.append(value)
|
|
||||||
elif 'gpu' in key:
|
|
||||||
myargv.append('-' + key)
|
|
||||||
# Add GPU device ID(s) from GPU objects
|
|
||||||
for gpu in args.gpu:
|
|
||||||
myargv.append(str(gpu.deviceID))
|
|
||||||
elif 'mpicomm' in key:
|
|
||||||
pass
|
|
||||||
elif '_' in key:
|
|
||||||
key = key.replace('_', '-')
|
|
||||||
myargv.append('--' + key)
|
|
||||||
else:
|
|
||||||
myargv.append('-' + key)
|
|
||||||
if value is not True:
|
|
||||||
myargv.append(str(value))
|
|
||||||
|
|
||||||
# Create a list of work
|
|
||||||
worklist = []
|
|
||||||
for model in range(modelstart, modelend):
|
|
||||||
workobj = dict()
|
|
||||||
workobj['currentmodelrun'] = model
|
|
||||||
workobj['mpicommname'] = comm.name
|
|
||||||
worklist.append(workobj)
|
|
||||||
# Add stop sentinels
|
|
||||||
worklist += ([StopIteration] * numworkers)
|
|
||||||
|
|
||||||
# Spawn workers
|
|
||||||
newcomm = comm.Spawn(sys.executable, args=['-m', 'gprMax'] + myargv + [workerflag], maxprocs=numworkers)
|
|
||||||
|
|
||||||
# Reply to whoever asks until done
|
|
||||||
for work in worklist:
|
|
||||||
newcomm.recv(source=MPI.ANY_SOURCE, status=status)
|
|
||||||
newcomm.send(obj=work, dest=status.Get_source())
|
|
||||||
|
|
||||||
# Shutdown communicators
|
|
||||||
newcomm.Disconnect()
|
|
||||||
|
|
||||||
tsimend = timer()
|
|
||||||
simcompletestr = '\n=== MPI master ({}, rank: {}) on {} completed simulation in [HH:MM:SS]: {}'.format(comm.name, comm.Get_rank(), hostname, datetime.timedelta(seconds=tsimend - tsimstart))
|
|
||||||
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
|
|
||||||
|
|
||||||
##################
|
|
||||||
# Worker process #
|
|
||||||
##################
|
|
||||||
elif workerflag in sys.argv:
|
|
||||||
# Connect to parent to get communicator
|
|
||||||
try:
|
|
||||||
comm = MPI.Comm.Get_parent()
|
|
||||||
rank = comm.Get_rank()
|
|
||||||
except ValueError:
|
|
||||||
raise ValueError('MPI worker could not connect to parent')
|
|
||||||
|
|
||||||
# Select GPU and get info
|
|
||||||
gpuinfo = ''
|
|
||||||
if args.gpu is not None:
|
|
||||||
# Set device ID based on rank from list of GPUs
|
|
||||||
args.gpu = args.gpu[rank]
|
|
||||||
gpuinfo = ' using {} - {}, {} RAM '.format(args.gpu.deviceID, args.gpu.name, human_size(args.gpu.totalmem, a_kilobyte_is_1024_bytes=True))
|
|
||||||
|
|
||||||
# Ask for work until stop sentinel
|
|
||||||
for work in iter(lambda: comm.sendrecv(0, dest=0), StopIteration):
|
|
||||||
currentmodelrun = work['currentmodelrun']
|
|
||||||
|
|
||||||
# Run the model
|
|
||||||
print('Starting MPI spawned worker (parent: {}, rank: {}) on {} with model {}/{}{}\n'.format(work['mpicommname'], rank, hostname, currentmodelrun, numbermodelruns, gpuinfo))
|
|
||||||
tsolve = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
|
|
||||||
print('Completed MPI spawned worker (parent: {}, rank: {}) on {} with model {}/{}{} in [HH:MM:SS]: {}\n'.format(work['mpicommname'], rank, hostname, currentmodelrun, numbermodelruns, gpuinfo, datetime.timedelta(seconds=tsolve)))
|
|
||||||
|
|
||||||
# Shutdown
|
|
||||||
comm.Disconnect()
|
|
||||||
|
|
||||||
|
|
||||||
def run_mpi_no_spawn_sim(args, inputfile, usernamespace):
|
|
||||||
"""
|
|
||||||
Alternate MPI implementation that avoids using the MPI spawn mechanism.
|
|
||||||
This implementation is designed to be used as
|
|
||||||
e.g. 'mpirun -n 5 python -m gprMax user_models/mymodel.in -n 10 --mpi-no-spawn'
|
|
||||||
|
|
||||||
Run mixed mode MPI/OpenMP simulation - MPI task farm for models with
|
|
||||||
each model parallelised using either OpenMP (CPU) or CUDA (GPU)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
args (dict): Namespace with command line arguments
|
|
||||||
inputfile (object): File object for the input file.
|
|
||||||
usernamespace (dict): Namespace that can be accessed by user in any
|
|
||||||
Python code blocks in input file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from mpi4py import MPI
|
|
||||||
|
|
||||||
# Define MPI message tags
|
|
||||||
tags = Enum('tags', {'READY': 0, 'DONE': 1, 'EXIT': 2, 'START': 3})
|
|
||||||
|
|
||||||
# Initializations and preliminaries
|
|
||||||
comm = MPI.COMM_WORLD
|
|
||||||
size = comm.Get_size() # total number of processes
|
|
||||||
rank = comm.Get_rank() # rank of this process
|
|
||||||
status = MPI.Status() # get MPI status object
|
|
||||||
hostname = MPI.Get_processor_name() # get name of processor/host
|
|
||||||
|
|
||||||
# Set range for number of models to run
|
|
||||||
modelstart = args.restart if args.restart else 1
|
|
||||||
modelend = modelstart + args.n
|
|
||||||
numbermodelruns = args.n
|
|
||||||
currentmodelrun = modelstart # can use -task argument to start numbering from something other than 1
|
|
||||||
numworkers = size - 1
|
|
||||||
|
|
||||||
##################
|
|
||||||
# Master process #
|
|
||||||
##################
|
|
||||||
if rank == 0:
|
|
||||||
tsimstart = timer()
|
|
||||||
mpistartstr = '\n=== MPI task farm (WITHOUT using MPI Spawn)'
|
|
||||||
print('{} {}'.format(mpistartstr, '=' * (get_terminal_width() - 1 - len(mpistartstr))))
|
|
||||||
print('=== MPI master ({}, rank: {}) on {} using {} workers...'.format(comm.name, comm.Get_rank(), hostname, numworkers))
|
|
||||||
|
|
||||||
closedworkers = 0
|
|
||||||
while closedworkers < numworkers:
|
|
||||||
comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
|
|
||||||
source = status.Get_source()
|
|
||||||
tag = status.Get_tag()
|
|
||||||
|
|
||||||
# Worker is ready, so send it a task
|
|
||||||
if tag == tags.READY.value:
|
|
||||||
if currentmodelrun < modelend:
|
|
||||||
comm.send(currentmodelrun, dest=source, tag=tags.START.value)
|
|
||||||
currentmodelrun += 1
|
|
||||||
else:
|
|
||||||
comm.send(None, dest=source, tag=tags.EXIT.value)
|
|
||||||
|
|
||||||
# Worker has completed a task
|
|
||||||
elif tag == tags.DONE.value:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Worker has completed all tasks
|
|
||||||
elif tag == tags.EXIT.value:
|
|
||||||
closedworkers += 1
|
|
||||||
|
|
||||||
tsimend = timer()
|
|
||||||
simcompletestr = '\n=== MPI master ({}, rank: {}) on {} completed simulation in [HH:MM:SS]: {}'.format(comm.name, comm.Get_rank(), hostname, datetime.timedelta(seconds=tsimend - tsimstart))
|
|
||||||
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
|
|
||||||
|
|
||||||
##################
|
|
||||||
# Worker process #
|
|
||||||
##################
|
|
||||||
else:
|
|
||||||
# Get info and setup device ID for GPU(s)
|
|
||||||
gpuinfo = ''
|
|
||||||
if args.gpu is not None:
|
|
||||||
# Set device ID based on rank from list of GPUs
|
|
||||||
deviceID = (rank - 1) % len(args.gpu)
|
|
||||||
args.gpu = next(gpu for gpu in args.gpu if gpu.deviceID == deviceID)
|
|
||||||
gpuinfo = ' using {} - {}, {}'.format(args.gpu.deviceID, args.gpu.name, human_size(args.gpu.totalmem, a_kilobyte_is_1024_bytes=True))
|
|
||||||
|
|
||||||
while True:
|
|
||||||
comm.send(None, dest=0, tag=tags.READY.value)
|
|
||||||
# Receive a model number to run from the master
|
|
||||||
currentmodelrun = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
|
|
||||||
tag = status.Get_tag()
|
|
||||||
|
|
||||||
# Run a model
|
|
||||||
if tag == tags.START.value:
|
|
||||||
print('Starting MPI worker (parent: {}, rank: {}) on {} with model {}/{}{}\n'.format(comm.name, rank, hostname, currentmodelrun, numbermodelruns, gpuinfo))
|
|
||||||
tsolve = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
|
|
||||||
comm.send(None, dest=0, tag=tags.DONE.value)
|
|
||||||
print('Completed MPI worker (parent: {}, rank: {}) on {} with model {}/{}{} in [HH:MM:SS]: {}\n'.format(comm.name, rank, hostname, currentmodelrun, numbermodelruns, gpuinfo, datetime.timedelta(seconds=tsolve)))
|
|
||||||
|
|
||||||
# Break out of loop when work receives exit message
|
|
||||||
elif tag == tags.EXIT.value:
|
|
||||||
break
|
|
||||||
|
|
||||||
comm.send(None, dest=0, tag=tags.EXIT.value)
|
|
||||||
|
在新工单中引用
屏蔽一个用户