你已经派生过 gprMax
镜像自地址
https://gitee.com/sunhf/gprMax.git
已同步 2025-08-08 07:24:19 +08:00
Working MPI task farm with either CPU or GPU solver.
这个提交包含在:
@@ -264,7 +264,7 @@ class SimulationConfig:
|
|||||||
list of deviceID given.
|
list of deviceID given.
|
||||||
"""
|
"""
|
||||||
for gpu in self.cuda['gpus']:
|
for gpu in self.cuda['gpus']:
|
||||||
if gpu.deviceID == self.args.gpu[deviceID]:
|
if gpu.deviceID == deviceID:
|
||||||
return gpu
|
return gpu
|
||||||
|
|
||||||
def _set_precision(self):
|
def _set_precision(self):
|
||||||
|
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
import gprMax.config as config
|
import gprMax.config as config
|
||||||
from ._version import __version__, codename
|
from ._version import __version__, codename
|
||||||
@@ -113,13 +114,17 @@ class MPIContext(Context):
|
|||||||
self.rank = self.comm.rank
|
self.rank = self.comm.rank
|
||||||
self.MPIExecutor = MPIExecutor
|
self.MPIExecutor = MPIExecutor
|
||||||
|
|
||||||
def _run_model(self, i, GPUdeviceID):
|
def _run_model(self, i):
|
||||||
"""Process for running a single model."""
|
"""Process for running a single model."""
|
||||||
|
|
||||||
|
# Create configuration for model
|
||||||
config.model_num = i
|
config.model_num = i
|
||||||
model_config = config.ModelConfig()
|
model_config = config.ModelConfig()
|
||||||
|
# Set GPU deviceID according to worker rank
|
||||||
if config.sim_config.general['cuda']:
|
if config.sim_config.general['cuda']:
|
||||||
config.sim_config.set_model_gpu(GPUdeviceID)
|
gpu = config.sim_config.set_model_gpu(deviceID=self.rank - 1)
|
||||||
|
model_config.cuda = {'gpu': gpu,
|
||||||
|
'snapsgpu2cpu': False}
|
||||||
config.model_configs = model_config
|
config.model_configs = model_config
|
||||||
|
|
||||||
G = create_G()
|
G = create_G()
|
||||||
@@ -133,22 +138,24 @@ class MPIContext(Context):
|
|||||||
def run(self):
|
def run(self):
|
||||||
"""Specialise how the models are run."""
|
"""Specialise how the models are run."""
|
||||||
|
|
||||||
self.tsimstart = timer()
|
if self.rank == 0:
|
||||||
|
self.tsimstart = timer()
|
||||||
|
self.print_logo_copyright()
|
||||||
|
self.print_host_info()
|
||||||
|
if config.sim_config.general['cuda']:
|
||||||
|
self.print_gpu_info()
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
# Contruct MPIExecutor
|
# Contruct MPIExecutor
|
||||||
executor = self.MPIExecutor(self._run_model, comm=self.comm)
|
executor = self.MPIExecutor(self._run_model, comm=self.comm)
|
||||||
|
|
||||||
# Compile jobs
|
# Create job list
|
||||||
jobs = []
|
jobs = []
|
||||||
for i in self.model_range:
|
for i in self.model_range:
|
||||||
jobs.append({'i': i,
|
jobs.append({'i': i})
|
||||||
'GPUdeviceID': 0})
|
|
||||||
|
|
||||||
# if executor.is_master():
|
|
||||||
# self.print_logo_copyright()
|
|
||||||
# self.print_host_info()
|
|
||||||
# if config.sim_config.general['cuda']:
|
|
||||||
# self.print_gpu_info()
|
|
||||||
|
|
||||||
# Send the workers to their work loop
|
# Send the workers to their work loop
|
||||||
executor.start()
|
executor.start()
|
||||||
@@ -158,21 +165,10 @@ class MPIContext(Context):
|
|||||||
# Make the workers exit their work loop and join the main loop again
|
# Make the workers exit their work loop and join the main loop again
|
||||||
executor.join()
|
executor.join()
|
||||||
|
|
||||||
# with self.MPIExecutor(self._run_model, comm=self.comm) as executor:
|
|
||||||
# if executor is not None:
|
|
||||||
# results = executor.submit(jobs)
|
|
||||||
# logger.info('Results: %s' % str(results))
|
|
||||||
# logger.basic('Finished.')
|
|
||||||
|
|
||||||
self.tsimend = timer()
|
|
||||||
if executor.is_master():
|
if executor.is_master():
|
||||||
|
self.tsimend = timer()
|
||||||
self.print_time_report()
|
self.print_time_report()
|
||||||
|
|
||||||
def print_time_report(self):
|
|
||||||
"""Print the total simulation time based on context."""
|
|
||||||
s = f"\n=== Simulation on {config.sim_config.hostinfo['hostname']} completed in [HH:MM:SS]: {datetime.timedelta(seconds=self.tsimend - self.tsimstart)}"
|
|
||||||
logger.basic(f"{s} {'=' * (get_terminal_width() - 1 - len(s))}\n")
|
|
||||||
|
|
||||||
|
|
||||||
def create_context():
|
def create_context():
|
||||||
"""Create a context in which to run the simulation. i.e MPI.
|
"""Create a context in which to run the simulation. i.e MPI.
|
||||||
|
@@ -171,7 +171,6 @@ def run_main(args):
|
|||||||
args (Namespace): arguments from either API or CLI.
|
args (Namespace): arguments from either API or CLI.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
config.sim_config = config.SimulationConfig(args)
|
config.sim_config = config.SimulationConfig(args)
|
||||||
context = create_context()
|
context = create_context()
|
||||||
context.run()
|
context.run()
|
||||||
|
138
gprMax/mpi.py
138
gprMax/mpi.py
@@ -166,11 +166,8 @@ class MPIExecutor(object):
|
|||||||
# holds the state of workers on the master
|
# holds the state of workers on the master
|
||||||
self.busy = [False] * len(self.workers)
|
self.busy = [False] * len(self.workers)
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: MPIExecutor on comm: {self.comm.name}, Master: {self.master}, Workers: {self.workers}')
|
|
||||||
if self.is_master():
|
if self.is_master():
|
||||||
logger.debug(f'Rank {self.rank} = MASTER')
|
logger.basic(f'\nMPIExecutor on comm: {self.comm.name}, Master: {self.master}, Workers: {self.workers}')
|
||||||
else:
|
|
||||||
logger.debug(f'Rank {self.rank} = WORKER')
|
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
"""Context manager enter.
|
"""Context manager enter.
|
||||||
@@ -225,7 +222,7 @@ class MPIExecutor(object):
|
|||||||
raise RuntimeError('Start has already been called')
|
raise RuntimeError('Start has already been called')
|
||||||
self._up = True
|
self._up = True
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: Starting up MPIExecutor master/workers')
|
logger.debug('Starting up MPIExecutor master/workers...')
|
||||||
if self.is_worker():
|
if self.is_worker():
|
||||||
self.__wait()
|
self.__wait()
|
||||||
|
|
||||||
@@ -234,12 +231,12 @@ class MPIExecutor(object):
|
|||||||
"""
|
"""
|
||||||
if self.is_master():
|
if self.is_master():
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: Terminating. Sending sentinel to all workers.')
|
logger.debug('Terminating. Sending sentinel to all workers.')
|
||||||
# Send sentinel to all workers
|
# Send sentinel to all workers
|
||||||
for worker in self.workers:
|
for worker in self.workers:
|
||||||
self.comm.send(None, dest=worker, tag=Tags.EXIT)
|
self.comm.send(None, dest=worker, tag=Tags.EXIT)
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: Waiting for all workers to terminate.')
|
logger.debug('Waiting for all workers to terminate.')
|
||||||
|
|
||||||
down = [False] * len(self.workers)
|
down = [False] * len(self.workers)
|
||||||
while True:
|
while True:
|
||||||
@@ -251,7 +248,7 @@ class MPIExecutor(object):
|
|||||||
break
|
break
|
||||||
|
|
||||||
self._up = False
|
self._up = False
|
||||||
logger.basic(f'Rank {self.rank}: All workers terminated.')
|
logger.debug('All workers terminated.')
|
||||||
|
|
||||||
def submit(self, jobs, sleep=0.0):
|
def submit(self, jobs, sleep=0.0):
|
||||||
"""Submits a list of jobs to the workers and returns the results.
|
"""Submits a list of jobs to the workers and returns the results.
|
||||||
@@ -274,19 +271,17 @@ class MPIExecutor(object):
|
|||||||
if not self._up:
|
if not self._up:
|
||||||
raise RuntimeError('Cannot run jobs without a call to start()')
|
raise RuntimeError('Cannot run jobs without a call to start()')
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: Running {len(jobs):d} jobs.')
|
logger.basic(f'Running {len(jobs):d} jobs.')
|
||||||
assert self.is_master(), 'run() must not be called on a worker process'
|
assert self.is_master(), 'run() must not be called on a worker process'
|
||||||
|
|
||||||
my_jobs = jobs.copy()
|
my_jobs = jobs.copy()
|
||||||
num_jobs = len(my_jobs)
|
num_jobs = len(my_jobs)
|
||||||
results = [None] * num_jobs
|
results = [None] * num_jobs
|
||||||
while len(my_jobs) or not self.is_idle():
|
while len(my_jobs) or not self.is_idle():
|
||||||
|
|
||||||
for i, worker in enumerate(self.workers):
|
for i, worker in enumerate(self.workers):
|
||||||
|
|
||||||
if self.comm.Iprobe(source=worker, tag=Tags.DONE):
|
if self.comm.Iprobe(source=worker, tag=Tags.DONE):
|
||||||
job_idx, result = self.comm.recv(source=worker, tag=Tags.DONE)
|
job_idx, result = self.comm.recv(source=worker, tag=Tags.DONE)
|
||||||
logger.basic(f'Rank {self.rank}: Received finished job {job_idx} from worker {worker:d}.')
|
logger.debug(f'Received finished job {job_idx} from worker {worker:d}.')
|
||||||
results[job_idx] = result
|
results[job_idx] = result
|
||||||
self.busy[i] = False
|
self.busy[i] = False
|
||||||
elif self.comm.Iprobe(source=worker, tag=Tags.READY):
|
elif self.comm.Iprobe(source=worker, tag=Tags.READY):
|
||||||
@@ -294,16 +289,16 @@ class MPIExecutor(object):
|
|||||||
self.comm.recv(source=worker, tag=Tags.READY)
|
self.comm.recv(source=worker, tag=Tags.READY)
|
||||||
self.busy[i] = True
|
self.busy[i] = True
|
||||||
job_idx = num_jobs - len(my_jobs)
|
job_idx = num_jobs - len(my_jobs)
|
||||||
logger.basic(f'Rank {self.rank}: Sending job {job_idx} to worker {worker:d}.')
|
logger.debug(f'Sending job {job_idx} to worker {worker:d}.')
|
||||||
self.comm.send((job_idx, my_jobs.pop(0)), dest=worker, tag=Tags.START)
|
self.comm.send((job_idx, my_jobs.pop(0)), dest=worker, tag=Tags.START)
|
||||||
elif self.comm.Iprobe(source=worker, tag=Tags.EXIT):
|
elif self.comm.Iprobe(source=worker, tag=Tags.EXIT):
|
||||||
logger.basic(f'Rank {self.rank}: Worker on rank {worker:d} has terminated.')
|
logger.debug(f'Worker on rank {worker:d} has terminated.')
|
||||||
self.comm.recv(source=worker, tag=Tags.EXIT)
|
self.comm.recv(source=worker, tag=Tags.EXIT)
|
||||||
self.busy[i] = False
|
self.busy[i] = False
|
||||||
|
|
||||||
time.sleep(sleep)
|
time.sleep(sleep)
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: Finished all jobs.')
|
logger.debug('Finished all jobs.')
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
@@ -318,26 +313,26 @@ class MPIExecutor(object):
|
|||||||
|
|
||||||
status = MPI.Status()
|
status = MPI.Status()
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: Starting up worker.')
|
logger.debug('Starting up worker.')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
self.comm.send(None, dest=self.master, tag=Tags.READY)
|
self.comm.send(None, dest=self.master, tag=Tags.READY)
|
||||||
logger.basic(f'Rank {self.rank}: Worker on rank {self.rank} waiting for job.')
|
logger.debug(f'Worker on rank {self.rank} waiting for job.')
|
||||||
|
|
||||||
data = self.comm.recv(source=self.master, tag=MPI.ANY_TAG, status=status)
|
data = self.comm.recv(source=self.master, tag=MPI.ANY_TAG, status=status)
|
||||||
tag = status.tag
|
tag = status.tag
|
||||||
|
|
||||||
if tag == Tags.START:
|
if tag == Tags.START:
|
||||||
job_idx, work = data
|
job_idx, work = data
|
||||||
logger.basic(f'Rank {self.rank}: Received job {job_idx} (work={work}).')
|
logger.debug(f'Received job {job_idx} (work={work}).')
|
||||||
result = self.__guarded_work(work)
|
result = self.__guarded_work(work)
|
||||||
logger.basic(f'Rank {self.rank}: Finished job. Sending results to master.')
|
logger.debug('Finished job. Sending results to master.')
|
||||||
self.comm.send((job_idx, result), dest=self.master, tag=Tags.DONE)
|
self.comm.send((job_idx, result), dest=self.master, tag=Tags.DONE)
|
||||||
elif tag == Tags.EXIT:
|
elif tag == Tags.EXIT:
|
||||||
logger.basic(f'Rank {self.rank}: Received sentinel from master.')
|
logger.debug('Received sentinel from master.')
|
||||||
break
|
break
|
||||||
|
|
||||||
logger.basic(f'Rank {self.rank}: Terminating worker.')
|
logger.debug('Terminating worker.')
|
||||||
self.comm.send(None, dest=self.master, tag=Tags.EXIT)
|
self.comm.send(None, dest=self.master, tag=Tags.EXIT)
|
||||||
|
|
||||||
def __guarded_work(self, work):
|
def __guarded_work(self, work):
|
||||||
@@ -359,104 +354,3 @@ class MPIExecutor(object):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(str(e))
|
logger.exception(str(e))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
# def main(args=None):
|
|
||||||
# """CLI for gprMax in MPI mode.
|
|
||||||
# Example Usage:
|
|
||||||
# mpirun -np 4 python -m mpi -n 10 my_input_file.in
|
|
||||||
# """
|
|
||||||
# import argparse
|
|
||||||
# import os
|
|
||||||
# from gprMax.constants import c, e0, m0, z0
|
|
||||||
# from gprMax.model_build_run import run_model
|
|
||||||
#
|
|
||||||
# # Parse command line arguments
|
|
||||||
# parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
||||||
# parser.add_argument(
|
|
||||||
# 'inputfile',
|
|
||||||
# help='relative or absolute path to inputfile.')
|
|
||||||
# parser.add_argument(
|
|
||||||
# '-n', '--num-traces', type=int, default=1,
|
|
||||||
# help='number of model runs (traces) to create a B-scan')
|
|
||||||
# parser.add_argument(
|
|
||||||
# '--geometry-only', action='store_true', default=False,
|
|
||||||
# help='flag to only build model and produce geometry file(s)')
|
|
||||||
# parser.add_argument(
|
|
||||||
# '--geometry-fixed', action='store_true', default=False,
|
|
||||||
# help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed')
|
|
||||||
# parser.add_argument(
|
|
||||||
# '--write-processed', action='store_true', default=False,
|
|
||||||
# help='flag to write an input file after any Python code and include commands '
|
|
||||||
# 'in the original input file have been processed')
|
|
||||||
# parser.add_argument(
|
|
||||||
# '-r', '--restart', type=int, default=1,
|
|
||||||
# help='model number to restart from, e.g. when creating B-scan')
|
|
||||||
# parser.add_argument(
|
|
||||||
# '-l', '--logfile', action='store_true', default=False,
|
|
||||||
# help='flag to enable writing to a log file')
|
|
||||||
# parser.add_argument(
|
|
||||||
# '-v', '--verbose', action='store_true', default=False,
|
|
||||||
# help="flag to increase output")
|
|
||||||
# parser.add_argument(
|
|
||||||
# '--gpu', type=int, action='append', nargs='*',
|
|
||||||
# help='flag to use Nvidia GPU or option to give list of device ID(s)')
|
|
||||||
#
|
|
||||||
# args = parser.parse_args(args)
|
|
||||||
#
|
|
||||||
# comm = MPI.COMM_WORLD
|
|
||||||
# rank = comm.rank
|
|
||||||
#
|
|
||||||
# # set-up logging
|
|
||||||
# logger = logging.getLogger('gprMax')
|
|
||||||
# level = logging.DEBUG if args.verbose else logging.INFO
|
|
||||||
# logger.setLevel(level)
|
|
||||||
#
|
|
||||||
# if args.logfile != "":
|
|
||||||
# mh = logging.FileHandler(f"log_{rank}.txt", mode='w')
|
|
||||||
# mh.setLevel(level)
|
|
||||||
# formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s: %(message)s')
|
|
||||||
# mh.setFormatter(formatter)
|
|
||||||
# logger.addHandler(mh)
|
|
||||||
#
|
|
||||||
# namespace = {
|
|
||||||
# 'c': c,
|
|
||||||
# 'e0': e0,
|
|
||||||
# 'm0': m0,
|
|
||||||
# 'z0': z0,
|
|
||||||
# 'number_model_runs': args.num_traces,
|
|
||||||
# 'inputfile': os.path.abspath(args.inputfile)
|
|
||||||
# }
|
|
||||||
#
|
|
||||||
# model_args = argparse.Namespace(**{
|
|
||||||
# 'geometry_only': args.geometry_only,
|
|
||||||
# 'geometry_fixed': args.geometry_fixed,
|
|
||||||
# 'write_processed': args.write_processed,
|
|
||||||
# 'task': False,
|
|
||||||
# 'restart': False,
|
|
||||||
# 'gpu': args.gpu
|
|
||||||
# })
|
|
||||||
#
|
|
||||||
# # compile jobs
|
|
||||||
# jobs = []
|
|
||||||
# for i in range(args.num_traces):
|
|
||||||
# jobs.append({
|
|
||||||
# 'args': model_args,
|
|
||||||
# 'inputfile': args.inputfile,
|
|
||||||
# 'currentmodelrun': i + 1,
|
|
||||||
# 'modelend': args.num_traces,
|
|
||||||
# 'numbermodelruns': args.num_traces,
|
|
||||||
# 'usernamespace': namespace.copy()
|
|
||||||
# })
|
|
||||||
#
|
|
||||||
# # execute jobs
|
|
||||||
# logger.info(f'Starting execution of {args.num_traces} gprMax model runs.')
|
|
||||||
# with MPIExecutor(run_model, comm=comm) as gpr:
|
|
||||||
# if gpr is not None:
|
|
||||||
# results = gpr.submit(jobs)
|
|
||||||
# logger.info('Results: %s' % str(results))
|
|
||||||
# logger.info('Finished.')
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# if __name__ == '__main__':
|
|
||||||
# main()
|
|
||||||
|
@@ -105,7 +105,7 @@ def logo(version):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
description = '\n=== Electromagnetic modelling software based on the Finite-Difference Time-Domain (FDTD) method'
|
description = '\n=== Electromagnetic modelling software based on the Finite-Difference Time-Domain (FDTD) method'
|
||||||
copyright = 'Copyright (C) 2015-2019: The University of Edinburgh'
|
copyright = 'Copyright (C) 2015-2020: The University of Edinburgh'
|
||||||
authors = 'Authors: Craig Warren and Antonis Giannopoulos'
|
authors = 'Authors: Craig Warren and Antonis Giannopoulos'
|
||||||
licenseinfo1 = 'gprMax is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.\n'
|
licenseinfo1 = 'gprMax is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.\n'
|
||||||
licenseinfo2 = 'gprMax is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.'
|
licenseinfo2 = 'gprMax is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.'
|
||||||
|
在新工单中引用
屏蔽一个用户