From 8e39207272285f1f679d6ad662468f889ca70475 Mon Sep 17 00:00:00 2001 From: nmannall Date: Thu, 23 May 2024 11:47:06 +0100 Subject: [PATCH] Add halo swap function to MPIGrid Distribute Model to each rank --- gprMax/contexts.py | 68 +++++++++---- gprMax/gprMax.py | 5 +- gprMax/grid/mpi_grid.py | 206 ++++++++++++++++++++++++++++++++-------- gprMax/mpi_model.py | 67 +++++++++++++ 4 files changed, 288 insertions(+), 58 deletions(-) create mode 100644 gprMax/mpi_model.py diff --git a/gprMax/contexts.py b/gprMax/contexts.py index 833e0dc8..6dc59e19 100644 --- a/gprMax/contexts.py +++ b/gprMax/contexts.py @@ -23,9 +23,11 @@ import sys from typing import Any, Dict, List, Optional import humanize +import numpy as np from colorama import Fore, Style, init from gprMax.hash_cmds_file import parse_hash_commands +from gprMax.mpi_model import MPIModel from gprMax.scene import Scene init() @@ -157,37 +159,71 @@ class Context: class MPIContext(Context): - def __init__(self, x_dim: int, y_dim: int, z_dim: int): + def __init__(self): super().__init__() from mpi4py import MPI self.comm = MPI.COMM_WORLD self.rank = self.comm.rank - if self.rank >= x_dim * y_dim * z_dim: + requested_mpi_size = np.product(config.sim_config.mpi) + if self.comm.size < requested_mpi_size: + logger.exception( + ( + "MPI_COMM_WORLD size of {self.comm.size} is too small for requested dimensions of" + f" {config.sim_config.mpi}. {requested_mpi_size} ranks are required." + ) + ) + raise ValueError + + if self.rank >= requested_mpi_size: logger.warn( ( - f"Rank {self.rank}: Only {x_dim * y_dim * z_dim} MPI ranks required for the" + f"Rank {self.rank}: Only {requested_mpi_size} MPI ranks required for the" " dimensions specified. Either increase your MPI dimension size, or request" " fewer MPI tasks." ) ) - self.x = -1 - self.y = -1 - self.z = -1 - else: - self.x = self.rank % x_dim - self.y = (self.rank // x_dim) % y_dim - self.z = (self.rank // (x_dim * y_dim)) % z_dim + exit() + + def _create_model(self) -> MPIModel: + return MPIModel() def run(self) -> Dict: - print(f"I am rank {self.rank} and I will run at grid position {self.x}, {self.y}, {self.z}") + return super().run() - if self.rank == 0: - print("Rank 0 is running the simulation") - return super().run() - else: - return {} + def _run_model(self, model_num: int) -> None: + """Process for running a single model. + + Args: + model_num: index of model to be run + """ + + config.sim_config.set_current_model(model_num) + model_config = self._create_model_config(model_num) + config.sim_config.set_model_config(model_config) + + if not model_config.reuse_geometry(): + model = self._create_model() + if model.is_coordinator(): + scene = self._get_scene(model_num) + scene.create_internal_objects(model) + + model.build() + + return + + if not config.sim_config.geometry_only: + solver = create_solver(model) + model.solve(solver) + del solver + + if not config.sim_config.geometry_fixed: + # Manual garbage collection required to stop memory leak on GPUs + # when using pycuda + del model.G, model + + gc.collect() class TaskfarmContext(Context): diff --git a/gprMax/gprMax.py b/gprMax/gprMax.py index bc3fec28..545daac4 100644 --- a/gprMax/gprMax.py +++ b/gprMax/gprMax.py @@ -269,10 +269,7 @@ def run_main(args): context = TaskfarmContext() # MPI running to divide model between ranks elif config.sim_config.args.mpi is not None: - x = config.sim_config.args.mpi[0] - y = config.sim_config.args.mpi[1] - z = config.sim_config.args.mpi[2] - context = MPIContext(x, y, z) + context = MPIContext() # Standard running (OpenMP/CUDA/OpenCL) else: context = Context() diff --git a/gprMax/grid/mpi_grid.py b/gprMax/grid/mpi_grid.py index 146aba93..80a009ee 100644 --- a/gprMax/grid/mpi_grid.py +++ b/gprMax/grid/mpi_grid.py @@ -16,58 +16,188 @@ # You should have received a copy of the GNU General Public License # along with gprMax. If not, see . -from typing import Optional +from enum import IntEnum, unique +from typing import List +import numpy as np +import numpy.typing as npt +from matplotlib.sankey import UP from mpi4py import MPI +from numpy import ndarray from gprMax.grid.fdtd_grid import FDTDGrid +@unique +class Dim(IntEnum): + X = 0 + Y = 1 + Z = 2 + + +@unique +class Dir(IntEnum): + NEG = 0 + POS = 1 + + class MPIGrid(FDTDGrid): - def __init__( - self, - mpi_tasks_x: int, - mpi_tasks_y: int, - mpi_tasks_z: int, - comm: Optional[MPI.Intracomm] = None, - ): + HALO_SIZE = 1 + + def __init__(self, comm: MPI.Cartcomm): + self.size = np.zeros(3, dtype=int) + super().__init__() - if comm is None: - self.comm = MPI.COMM_WORLD + self.comm = comm + + self.mpi_tasks = np.array(self.comm.dims) + + self.lower_extent: npt.NDArray[np.intc] = np.zeros(3, dtype=int) + self.upper_extent: npt.NDArray[np.intc] = np.zeros(3, dtype=int) + self.global_size: npt.NDArray[np.intc] = np.zeros(3, dtype=int) + + self.neighbours = np.full((3, 2), -1, dtype=int) + self.neighbours[Dim.X] = self.comm.Shift(direction=Dim.X, disp=1) + self.neighbours[Dim.Y] = self.comm.Shift(direction=Dim.Y, disp=1) + self.neighbours[Dim.Z] = self.comm.Shift(direction=Dim.Z, disp=1) + print(f"[Rank {self.rank}] Neighbours = {self.neighbours}") + + self.send_halo_map = np.empty((3, 2), dtype=MPI.Datatype) + self.recv_halo_map = np.empty((3, 2), dtype=MPI.Datatype) + + @property + def rank(self) -> int: + return self.comm.Get_rank() + + @property + def coords(self) -> List[int]: + return self.comm.coords + + @property + def nx(self) -> int: + return self.size[Dim.X] + + @nx.setter + def nx(self, value: int): + self.size[Dim.X] = value + + @property + def ny(self) -> int: + return self.size[Dim.Y] + + @ny.setter + def ny(self, value: int): + self.size[Dim.Y] = value + + @property + def nz(self) -> int: + return self.size[Dim.Z] + + @nz.setter + def nz(self, value: int): + self.size[Dim.Z] = value + + def broadcast_grid(self): + self.calculate_local_extents() + print( + f"[Rank {self.rank}] - Global size: {self.global_size}, Local size: {self.size}, Cart comm dims: {self.mpi_tasks}, Local coordinates: {self.coords}, Lower extent = {self.lower_extent}, Upper extent = {self.upper_extent}" + ) + + def _get_halo(self, array: ndarray, dim: Dim, direction: Dir) -> ndarray: + if direction == Dir.NEG: + index = 0 + else: # Direction.UP + index = -1 + + if dim == Dim.X: + halo = array[:, index, index] + elif dim == Dim.Y: + halo = array[index, :, index] + else: # Dim.Z + halo = array[index, index, :] + + return halo + + def _set_halo(self, array: ndarray, halo: ndarray, dim: Dim, direction: Dir): + if direction == Dir.NEG: + index = 0 + else: # Direction.UP + index = -1 + + if dim == Dim.X: + array[:, index, index] = halo + elif dim == Dim.Y: + array[index, :, index] = halo + else: # Dim.Z + array[index, index, :] = halo + + def _halo_swap(self, array: ndarray, dim: Dim, dir: Dir): + neighbour = self.neighbours[dim][dir] + if neighbour != -1: + self.comm.Sendrecv( + [array, self.send_halo_map[dim][dir]], + neighbour, + 0, + [array, self.recv_halo_map[dim][dir]], + neighbour, + 0, + None, + ) + + def _halo_swap_dimension(self, array: ndarray, dim: Dim): + if self.coords[dim] % 2 == 0: + self._halo_swap(array, dim, Dir.NEG) + self._halo_swap(array, dim, Dir.POS) else: - self.comm = comm + self._halo_swap(array, dim, Dir.POS) + self._halo_swap(array, dim, Dir.NEG) - if mpi_tasks_x * mpi_tasks_y * mpi_tasks_z > self.comm.size: - # TODO: Raise expection - insufficient MPI tasks to create the grid as requested - pass + def halo_swap(self, array: ndarray): + self._halo_swap_dimension(array, Dim.X) + self._halo_swap_dimension(array, Dim.Y) + self._halo_swap_dimension(array, Dim.Z) - self.mpi_tasks_x = mpi_tasks_x - self.mpi_tasks_y = mpi_tasks_y - self.mpi_tasks_z = mpi_tasks_z + def build(self): + self.calculate_local_extents() + self.set_halo_map() + super().build() + self.halo_swap(self.Ex) - self.rank = self.comm.rank - self.size = self.comm.size + def set_halo_map(self): + size = self.size.tolist() - self.xmin = 0 - self.ymin = 0 - self.zmin = 0 - self.xmax = 0 - self.ymax = 0 - self.zmax = 0 + for dim in Dim: + halo_size = (self.size - 2).tolist() + halo_size[dim] = 1 + + start = [1, 1, 1] + self.send_halo_map[dim][Dir.NEG] = MPI.DOUBLE.Create_subarray(size, halo_size, start) + start[dim] = size[dim] - 2 + self.send_halo_map[dim][Dir.POS] = MPI.DOUBLE.Create_subarray(size, halo_size, start) + + start[dim] = 0 + self.recv_halo_map[dim][Dir.NEG] = MPI.DOUBLE.Create_subarray(size, halo_size, start) + start[dim] = size[dim] - 1 + self.recv_halo_map[dim][Dir.POS] = MPI.DOUBLE.Create_subarray(size, halo_size, start) + + self.send_halo_map[dim][Dir.NEG].Commit() + self.send_halo_map[dim][Dir.POS].Commit() + self.recv_halo_map[dim][Dir.NEG].Commit() + self.recv_halo_map[dim][Dir.POS].Commit() + + def calculate_local_extents(self): + self.size = self.global_size // self.mpi_tasks + + self.lower_extent = self.size * self.coords + + at_end = (self.mpi_tasks - self.coords) <= 1 + self.size += at_end * self.global_size % self.mpi_tasks + + self.upper_extent = self.lower_extent + self.size + + # Account for halo + self.size += 2 def initialise_field_arrays(self): super().initialise_field_arrays() - - local_grid_size_x = self.nx // self.mpi_tasks_x - local_grid_size_y = self.ny // self.mpi_tasks_y - local_grid_size_z = self.nz // self.mpi_tasks_z - - self.xmin = (self.rank % self.nx) * local_grid_size_x - self.ymin = ((self.mpi_tasks_x * self.rank) % self.ny) * local_grid_size_y - self.zmin = ( - (self.mpi_tasks_y * self.mpi_tasks_x * self.rank) % self.nz - ) * local_grid_size_z - self.xmax = self.xmin + local_grid_size_x - self.ymax = self.ymin + local_grid_size_y - self.zmax = self.zmin + local_grid_size_z diff --git a/gprMax/mpi_model.py b/gprMax/mpi_model.py new file mode 100644 index 00000000..2059c7fc --- /dev/null +++ b/gprMax/mpi_model.py @@ -0,0 +1,67 @@ +import logging +from re import L +from typing import Optional + +import numpy as np +from mpi4py import MPI + +from gprMax import config +from gprMax.grid.mpi_grid import MPIGrid +from gprMax.model import Model + +logger = logging.getLogger(__name__) + + +class MPIModel(Model): + def __init__(self, comm: Optional[MPI.Intracomm] = None): + if comm is None: + self.comm = MPI.COMM_WORLD + else: + self.comm = comm + + self.rank = self.comm.Get_rank() + + self.G = self._create_grid() + + return super().__init__() + + def is_coordinator(self): + return self.rank == 0 + + def build(self): + self.build_geometry() + return + return super().build() + + def build_geometry(self): + if self.is_coordinator(): + self._check_for_dispersive_materials([self.G]) + self._check_memory_requirements([self.G]) + self._broadcast_model() + + self.G.global_size = np.array([self.gnx, self.gny, self.gnz], dtype=int) + + self.G.build() + return + self.G.dispersion_analysis(self.iterations) + + def _broadcast_model(self): + self.gnx = self.comm.bcast(self.gnx) + self.gny = self.comm.bcast(self.gny) + self.gnz = self.comm.bcast(self.gnz) + + self.comm.Bcast(self.dl) + self.dt = self.comm.bcast(self.dt) + + self.iterations = self.comm.bcast(self.iterations) + + self.srcsteps = self.comm.bcast(self.srcsteps) + self.rxsteps = self.comm.bcast(self.rxsteps) + + def _output_geometry(self): + if self.is_coordinator(): + logger.info("Geometry views and geometry objects are not currently supported with MPI.") + + def _create_grid(self) -> MPIGrid: + cart_comm = MPI.COMM_WORLD.Create_cart(config.sim_config.mpi) + return MPIGrid(cart_comm)