Add initial MPI implementation

这个提交包含在:
nmannall
2024-05-31 17:21:00 +01:00
父节点 d8005d35d8
当前提交 1931617a49
共有 8 个文件被更改,包括 287 次插入83 次删除

查看文件

@@ -26,6 +26,7 @@ from scipy import interpolate
import gprMax.config as config
from gprMax.grid.fdtd_grid import FDTDGrid
from gprMax.grid.mpi_grid import MPIGrid
from gprMax.model import Model
from gprMax.user_inputs import MainGridUserInput
@@ -1139,6 +1140,11 @@ class Snapshot(UserObjectMulti):
def build(self, model, uip):
grid = uip.grid
if isinstance(grid, MPIGrid):
logger.exception(
f"{self.params_str()} Snapshots are not currently compatible with MPI."
)
raise ValueError
if isinstance(grid, SubGridBaseGrid):
logger.exception(f"{self.params_str()} do not add snapshots to subgrids.")
raise ValueError

查看文件

@@ -170,7 +170,7 @@ class MPIContext(Context):
if self.comm.size < requested_mpi_size:
logger.exception(
(
"MPI_COMM_WORLD size of {self.comm.size} is too small for requested dimensions of"
f"MPI_COMM_WORLD size of {self.comm.size} is too small for requested dimensions of"
f" {config.sim_config.mpi}. {requested_mpi_size} ranks are required."
)
)
@@ -211,8 +211,6 @@ class MPIContext(Context):
model.build()
return
if not config.sim_config.geometry_only:
solver = create_solver(model)
model.solve(solver)

查看文件

@@ -16,16 +16,20 @@
# You should have received a copy of the GNU General Public License
# along with gprMax. If not, see <http://www.gnu.org/licenses/>.
import itertools
from enum import IntEnum, unique
from typing import List
from typing import List, Optional, TypeVar, Union
import numpy as np
import numpy.typing as npt
from matplotlib.sankey import UP
from mpi4py import MPI
from numpy import ndarray
from gprMax.grid.fdtd_grid import FDTDGrid
from gprMax.receivers import Rx
from gprMax.sources import Source
CoordType = TypeVar("CoordType", bound=Union[Rx, Source])
@unique
@@ -43,6 +47,7 @@ class Dir(IntEnum):
class MPIGrid(FDTDGrid):
HALO_SIZE = 1
COORDINATOR_RANK = 0
def __init__(self, comm: MPI.Cartcomm):
self.size = np.zeros(3, dtype=int)
@@ -61,7 +66,6 @@ class MPIGrid(FDTDGrid):
self.neighbours[Dim.X] = self.comm.Shift(direction=Dim.X, disp=1)
self.neighbours[Dim.Y] = self.comm.Shift(direction=Dim.Y, disp=1)
self.neighbours[Dim.Z] = self.comm.Shift(direction=Dim.Z, disp=1)
print(f"[Rank {self.rank}] Neighbours = {self.neighbours}")
self.send_halo_map = np.empty((3, 2), dtype=MPI.Datatype)
self.recv_halo_map = np.empty((3, 2), dtype=MPI.Datatype)
@@ -98,39 +102,109 @@ class MPIGrid(FDTDGrid):
def nz(self, value: int):
self.size[Dim.Z] = value
def broadcast_grid(self):
self.calculate_local_extents()
print(
f"[Rank {self.rank}] - Global size: {self.global_size}, Local size: {self.size}, Cart comm dims: {self.mpi_tasks}, Local coordinates: {self.coords}, Lower extent = {self.lower_extent}, Upper extent = {self.upper_extent}"
def is_coordinator(self) -> bool:
return self.rank == self.COORDINATOR_RANK
def get_rank_from_coordinate(self, coord: npt.NDArray) -> int:
step_size = self.global_size // self.mpi_tasks
overflow = self.global_size % self.mpi_tasks
grid_coord = np.where(
(step_size + 1) * overflow > coord,
coord // (step_size + 1),
(coord - overflow) // step_size,
)
return self.comm.Get_cart_rank(grid_coord.tolist())
def global_to_local_coordinate(self, global_coord: npt.NDArray) -> npt.NDArray:
return global_coord - self.lower_extent
def local_to_global_coordinate(self, local_coord: npt.NDArray) -> npt.NDArray:
return local_coord + self.lower_extent
def scatter_coord_objects(self, objects: List[CoordType]) -> List[CoordType]:
if self.is_coordinator():
objects_by_rank: List[List[CoordType]] = [[] for _ in range(self.comm.size)]
for o in objects:
objects_by_rank[self.get_rank_from_coordinate(o.coord)].append(o)
else:
objects_by_rank = None
objects = self.comm.scatter(objects_by_rank, root=self.COORDINATOR_RANK)
for o in objects:
o.coord = self.global_to_local_coordinate(o.coord)
return objects
def gather_coord_objects(self, objects: List[CoordType]) -> List[CoordType]:
for o in objects:
o.coord = self.local_to_global_coordinate(o.coord)
gathered_objects: Optional[List[List[CoordType]]] = self.comm.gather(
objects, root=self.COORDINATOR_RANK
)
def _get_halo(self, array: ndarray, dim: Dim, direction: Dir) -> ndarray:
if direction == Dir.NEG:
index = 0
else: # Direction.UP
index = -1
if gathered_objects is not None:
return list(itertools.chain(*gathered_objects))
else:
return objects
if dim == Dim.X:
halo = array[:, index, index]
elif dim == Dim.Y:
halo = array[index, :, index]
else: # Dim.Z
halo = array[index, index, :]
def scatter_3d_array(self, array: npt.NDArray) -> npt.NDArray:
self.comm.Bcast(array, root=self.COORDINATOR_RANK)
return halo
return array[
self.lower_extent[Dim.X] : self.upper_extent[Dim.X],
self.lower_extent[Dim.Y] : self.upper_extent[Dim.Y],
self.lower_extent[Dim.Z] : self.upper_extent[Dim.Z],
].copy(order="C")
def _set_halo(self, array: ndarray, halo: ndarray, dim: Dim, direction: Dir):
if direction == Dir.NEG:
index = 0
else: # Direction.UP
index = -1
def scatter_4d_array(self, array: npt.NDArray) -> npt.NDArray:
self.comm.Bcast(array, root=self.COORDINATOR_RANK)
if dim == Dim.X:
array[:, index, index] = halo
elif dim == Dim.Y:
array[index, :, index] = halo
else: # Dim.Z
array[index, index, :] = halo
return array[
:,
self.lower_extent[Dim.X] : self.upper_extent[Dim.X],
self.lower_extent[Dim.Y] : self.upper_extent[Dim.Y],
self.lower_extent[Dim.Z] : self.upper_extent[Dim.Z],
].copy(order="C")
def scatter_grid(self):
self.materials = self.comm.bcast(self.materials, root=self.COORDINATOR_RANK)
self.rxs = self.scatter_coord_objects(self.rxs)
self.voltagesources = self.scatter_coord_objects(self.voltagesources)
self.magneticdipoles = self.scatter_coord_objects(self.magneticdipoles)
self.hertziandipoles = self.scatter_coord_objects(self.hertziandipoles)
self.transmissionlines = self.scatter_coord_objects(self.transmissionlines)
self.pmls = self.comm.bcast(self.pmls, root=self.COORDINATOR_RANK)
if self.coords[Dim.X] != 0:
self.pmls["thickness"]["x0"] = 0
if self.coords[Dim.X] != self.mpi_tasks[Dim.X] - 1:
self.pmls["thickness"]["xmax"] = 0
if self.coords[Dim.Y] != 0:
self.pmls["thickness"]["y0"] = 0
if self.coords[Dim.Y] != self.mpi_tasks[Dim.Y] - 1:
self.pmls["thickness"]["ymax"] = 0
if self.coords[Dim.Z] != 0:
self.pmls["thickness"]["z0"] = 0
if self.coords[Dim.Z] != self.mpi_tasks[Dim.Z] - 1:
self.pmls["thickness"]["zmax"] = 0
old_size = self.size
self.size = self.global_size
if not self.is_coordinator():
self.initialise_geometry_arrays()
self.size = old_size
self.solid = self.scatter_3d_array(self.solid)
self.rigidE = self.scatter_4d_array(self.rigidE)
self.rigidH = self.scatter_4d_array(self.rigidH)
def gather_grid_objects(self):
self.rxs = self.gather_coord_objects(self.rxs)
self.voltagesources = self.gather_coord_objects(self.voltagesources)
self.magneticdipoles = self.gather_coord_objects(self.magneticdipoles)
self.hertziandipoles = self.gather_coord_objects(self.hertziandipoles)
self.transmissionlines = self.gather_coord_objects(self.transmissionlines)
def _halo_swap(self, array: ndarray, dim: Dim, dir: Dir):
neighbour = self.neighbours[dim][dir]
@@ -145,7 +219,7 @@ class MPIGrid(FDTDGrid):
None,
)
def _halo_swap_dimension(self, array: ndarray, dim: Dim):
def _halo_swap_by_dimension(self, array: ndarray, dim: Dim):
if self.coords[dim] % 2 == 0:
self._halo_swap(array, dim, Dir.NEG)
self._halo_swap(array, dim, Dir.POS)
@@ -153,51 +227,94 @@ class MPIGrid(FDTDGrid):
self._halo_swap(array, dim, Dir.POS)
self._halo_swap(array, dim, Dir.NEG)
def halo_swap(self, array: ndarray):
self._halo_swap_dimension(array, Dim.X)
self._halo_swap_dimension(array, Dim.Y)
self._halo_swap_dimension(array, Dim.Z)
def _halo_swap_array(self, array: ndarray):
self._halo_swap_by_dimension(array, Dim.X)
self._halo_swap_by_dimension(array, Dim.Y)
self._halo_swap_by_dimension(array, Dim.Z)
def halo_swap_electric(self):
self._halo_swap_array(self.Ex)
self._halo_swap_array(self.Ey)
self._halo_swap_array(self.Ez)
def halo_swap_magnetic(self):
self._halo_swap_array(self.Hx)
self._halo_swap_array(self.Hy)
self._halo_swap_array(self.Hz)
def build(self):
self.calculate_local_extents()
self.set_halo_map()
self.scatter_grid()
super().build()
self.halo_swap(self.Ex)
def has_neighbour(self, dim: Dim, dir: Dir) -> bool:
return self.neighbours[dim][dir] != -1
def set_halo_map(self):
size = self.size.tolist()
print(f"[Rank {self.rank}] Size = {self.size}")
size = (self.size + 1).tolist()
for dim in Dim:
halo_size = (self.size - 2).tolist()
halo_size = (self.size + 1 - np.sum(self.neighbours >= 0, axis=1)).tolist()
halo_size[dim] = 1
start = [1 if self.has_neighbour(dim, Dir.NEG) else 0 for dim in Dim]
start = [1, 1, 1]
self.send_halo_map[dim][Dir.NEG] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
start[dim] = size[dim] - 2
self.send_halo_map[dim][Dir.POS] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
if self.has_neighbour(dim, Dir.NEG):
start[dim] = 1
print(
f"[Rank {self.rank}, Dim {dim}, Dir {Dir.NEG}] Grid of size {size}, creating halo map of size {halo_size} at start {start}"
)
self.send_halo_map[dim][Dir.NEG] = MPI.FLOAT.Create_subarray(size, halo_size, start)
start[dim] = 0
print(
f"[Rank {self.rank}, Dim {dim}, Dir {Dir.NEG}] Grid of size {size}, creating halo map of size {halo_size} at start {start}"
)
self.recv_halo_map[dim][Dir.NEG] = MPI.FLOAT.Create_subarray(size, halo_size, start)
start[dim] = 0
self.recv_halo_map[dim][Dir.NEG] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
start[dim] = size[dim] - 1
self.recv_halo_map[dim][Dir.POS] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
self.send_halo_map[dim][Dir.NEG].Commit()
self.recv_halo_map[dim][Dir.NEG].Commit()
self.send_halo_map[dim][Dir.NEG].Commit()
self.send_halo_map[dim][Dir.POS].Commit()
self.recv_halo_map[dim][Dir.NEG].Commit()
self.recv_halo_map[dim][Dir.POS].Commit()
if self.has_neighbour(dim, Dir.POS):
start[dim] = size[dim] - 2
print(
f"[Rank {self.rank}, Dim {dim}, Dir {Dir.POS}] Grid of size {size}, creating halo map of size {halo_size} at start {start}"
)
self.send_halo_map[dim][Dir.POS] = MPI.FLOAT.Create_subarray(size, halo_size, start)
start[dim] = size[dim] - 1
print(
f"[Rank {self.rank}, Dim {dim}, Dir {Dir.POS}] Grid of size {size}, creating halo map of size {halo_size} at start {start}"
)
self.recv_halo_map[dim][Dir.POS] = MPI.FLOAT.Create_subarray(size, halo_size, start)
self.send_halo_map[dim][Dir.POS].Commit()
self.recv_halo_map[dim][Dir.POS].Commit()
def calculate_local_extents(self):
print(f"[Rank {self.rank}] Global size = {self.global_size}")
self.size = self.global_size // self.mpi_tasks
print(f"[Rank {self.rank}] Initial size = {self.size}")
self.lower_extent = self.size * self.coords
self.lower_extent = self.size * self.coords + np.minimum(
self.coords, self.global_size % self.mpi_tasks
)
at_end = (self.mpi_tasks - self.coords) <= 1
self.size += at_end * self.global_size % self.mpi_tasks
print(f"[Rank {self.rank}] Lower extent = {self.lower_extent}")
self.size += self.coords < self.global_size % self.mpi_tasks
print(f"[Rank {self.rank}] Expanded size = {self.size}")
# at_end = (self.mpi_tasks - self.coords) <= 1
# self.size += at_end * self.global_size % self.mpi_tasks
# Account for negative halo
# Field arrays are created with dimensions size + 1 so space for
# a positive halo will always exist. Grids not needing a
# positive halo, still need the extra size as that makes the
# global grid on the whole one larger than the user dimensions.
self.size += self.neighbours[:, 0] >= 0
self.lower_extent -= self.neighbours[:, 0] >= 0
self.upper_extent = self.lower_extent + self.size
# Account for halo
self.size += 2
def initialise_field_arrays(self):
super().initialise_field_arrays()
print(f"[Rank {self.rank}] With positive halo size = {self.size}")

查看文件

@@ -19,7 +19,7 @@
import datetime
import logging
import sys
from typing import List
from typing import List, Sequence
import humanize
import numpy as np
@@ -52,6 +52,7 @@ class Model:
def __init__(self):
self.title = ""
# TODO: Remove these in favour of properties below (only needed in MPI Grid)
self.gnx = 0
self.gny = 0
self.gnz = 0
@@ -179,6 +180,9 @@ class Model:
G.update_simple_source_positions(self.srcsteps, step=model_num)
G.update_receiver_positions(self.rxsteps, step=model_num)
self._output_geometry()
def _output_geometry(self):
# Write files for any geometry views and geometry object outputs
if (
not self.geometryviews
@@ -219,16 +223,11 @@ class Model:
self._check_for_dispersive_materials(grids)
self._check_memory_requirements(grids)
# TODO: Make this correctly set local nx, ny and nz when using MPI (likely use a function inside FDTDGrid/MPIGrid)
self.G.nx = self.gnx
self.G.ny = self.gny
self.G.nz = self.gnz
for grid in grids:
grid.build()
grid.dispersion_analysis(self.iterations)
def _check_for_dispersive_materials(self, grids: List[FDTDGrid]):
def _check_for_dispersive_materials(self, grids: Sequence[FDTDGrid]):
# Check for dispersive materials (and specific type)
if config.get_model_config().materials["maxpoles"] != 0:
# TODO: This sets materials["drudelorentz"] based only the
@@ -241,7 +240,7 @@ class Model:
# Set data type if any dispersive materials (must be done before memory checks)
config.get_model_config().set_dispersive_material_types()
def _check_memory_requirements(self, grids: List[FDTDGrid]):
def _check_memory_requirements(self, grids: Sequence[FDTDGrid]):
# Check memory requirements to build model/scene (different to memory
# requirements to run model when FractalVolumes/FractalSurfaces are
# used as these can require significant additional memory)

查看文件

@@ -28,11 +28,6 @@ class MPIModel(Model):
def is_coordinator(self):
return self.rank == 0
def build(self):
self.build_geometry()
return
return super().build()
def build_geometry(self):
if self.is_coordinator():
self._check_for_dispersive_materials([self.G])
@@ -41,9 +36,7 @@ class MPIModel(Model):
self.G.global_size = np.array([self.gnx, self.gny, self.gnz], dtype=int)
self.G.build()
return
self.G.dispersion_analysis(self.iterations)
return super().build_geometry()
def _broadcast_model(self):
self.gnx = self.comm.bcast(self.gnx)
@@ -58,6 +51,12 @@ class MPIModel(Model):
self.srcsteps = self.comm.bcast(self.srcsteps)
self.rxsteps = self.comm.bcast(self.rxsteps)
def write_output_data(self):
self.G.gather_grid_objects()
if self.is_coordinator():
self.G.size = self.G.global_size
super().write_output_data()
def _output_geometry(self):
if self.is_coordinator():
logger.info("Geometry views and geometry objects are not currently supported with MPI.")

查看文件

@@ -17,7 +17,9 @@
# along with gprMax. If not, see <http://www.gnu.org/licenses/>.
import gprMax.config as config
from gprMax.grid.mpi_grid import MPIGrid
from gprMax.model import Model
from gprMax.updates.mpi_updates import MPIUpdates
from .grid.cuda_grid import CUDAGrid
from .grid.fdtd_grid import FDTDGrid
@@ -59,6 +61,8 @@ class Solver:
self.updates.update_magnetic()
self.updates.update_magnetic_pml()
self.updates.update_magnetic_sources(iteration)
if isinstance(self.updates, MPIUpdates):
self.updates.halo_swap_magnetic()
if isinstance(self.updates, SubgridUpdates):
self.updates.hsg_2()
self.updates.update_electric_a()
@@ -68,6 +72,8 @@ class Solver:
if isinstance(self.updates, SubgridUpdates):
self.updates.hsg_1()
self.updates.update_electric_b()
if isinstance(self.updates, MPIUpdates):
self.updates.halo_swap_electric()
if isinstance(self.updates, CUDAUpdates):
self.memused = self.updates.calculate_memory_used(iteration)
@@ -106,6 +112,10 @@ def create_solver(model: Model) -> Solver:
updates = CPUUpdates(grid)
if config.get_model_config().materials["maxpoles"] != 0:
updates.set_dispersive_updates()
elif type(grid) is MPIGrid:
updates = MPIUpdates(grid)
if config.get_model_config().materials["maxpoles"] != 0:
updates.set_dispersive_updates()
elif type(grid) is CUDAGrid:
updates = CUDAUpdates(grid)
elif type(grid) is OpenCLGrid:

查看文件

@@ -19,6 +19,7 @@
from copy import deepcopy
import numpy as np
import numpy.typing as npt
import gprMax.config as config
from gprMax.waveforms import Waveform
@@ -32,16 +33,60 @@ class Source:
def __init__(self):
self.ID: str
self.polarisation = None
self.xcoord: int
self.ycoord: int
self.zcoord: int
self.xcoordorigin: int
self.ycoordorigin: int
self.zcoordorigin: int
self.coord: npt.NDArray[np.int_] = np.zeros(3, dtype=int)
self.coordorigin: npt.NDArray[np.int_] = np.zeros(3, dtype=int)
self.start = 0.0
self.stop = 0.0
self.waveform: Waveform
@property
def xcoord(self) -> int:
return self.coord[0]
@xcoord.setter
def xcoord(self, value: int):
self.coord[0] = value
@property
def ycoord(self) -> int:
return self.coord[1]
@ycoord.setter
def ycoord(self, value: int):
self.coord[1] = value
@property
def zcoord(self) -> int:
return self.coord[2]
@zcoord.setter
def zcoord(self, value: int):
self.coord[2] = value
@property
def xcoordorigin(self) -> int:
return self.coordorigin[0]
@xcoordorigin.setter
def xcoordorigin(self, value: int):
self.coordorigin[0] = value
@property
def ycoordorigin(self) -> int:
return self.coordorigin[1]
@ycoordorigin.setter
def ycoordorigin(self, value: int):
self.coordorigin[1] = value
@property
def zcoordorigin(self) -> int:
return self.coordorigin[2]
@zcoordorigin.setter
def zcoordorigin(self, value: int):
self.coordorigin[2] = value
def calculate_waveform_values(self, iterations: int, dt: float):
"""Calculates all waveform values for source for duration of simulation.

查看文件

@@ -0,0 +1,30 @@
# Copyright (C) 2015-2024: The University of Edinburgh, United Kingdom
# Authors: Craig Warren, Antonis Giannopoulos, and John Hartley
#
# This file is part of gprMax.
#
# gprMax is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# gprMax is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gprMax. If not, see <http://www.gnu.org/licenses/>.
from gprMax.grid.mpi_grid import MPIGrid
from gprMax.updates.cpu_updates import CPUUpdates
class MPIUpdates(CPUUpdates[MPIGrid]):
"""Defines update functions for MPI CPU-based solver."""
def halo_swap_electric(self):
self.grid.halo_swap_electric()
def halo_swap_magnetic(self):
self.grid.halo_swap_magnetic()