Add halo swap function to MPIGrid

Distribute Model to each rank
这个提交包含在:
nmannall
2024-05-23 11:47:06 +01:00
父节点 d004f64d81
当前提交 8e39207272
共有 4 个文件被更改,包括 288 次插入58 次删除

查看文件

@@ -23,9 +23,11 @@ import sys
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import humanize import humanize
import numpy as np
from colorama import Fore, Style, init from colorama import Fore, Style, init
from gprMax.hash_cmds_file import parse_hash_commands from gprMax.hash_cmds_file import parse_hash_commands
from gprMax.mpi_model import MPIModel
from gprMax.scene import Scene from gprMax.scene import Scene
init() init()
@@ -157,37 +159,71 @@ class Context:
class MPIContext(Context): class MPIContext(Context):
def __init__(self, x_dim: int, y_dim: int, z_dim: int): def __init__(self):
super().__init__() super().__init__()
from mpi4py import MPI from mpi4py import MPI
self.comm = MPI.COMM_WORLD self.comm = MPI.COMM_WORLD
self.rank = self.comm.rank self.rank = self.comm.rank
if self.rank >= x_dim * y_dim * z_dim: requested_mpi_size = np.product(config.sim_config.mpi)
if self.comm.size < requested_mpi_size:
logger.exception(
(
"MPI_COMM_WORLD size of {self.comm.size} is too small for requested dimensions of"
f" {config.sim_config.mpi}. {requested_mpi_size} ranks are required."
)
)
raise ValueError
if self.rank >= requested_mpi_size:
logger.warn( logger.warn(
( (
f"Rank {self.rank}: Only {x_dim * y_dim * z_dim} MPI ranks required for the" f"Rank {self.rank}: Only {requested_mpi_size} MPI ranks required for the"
" dimensions specified. Either increase your MPI dimension size, or request" " dimensions specified. Either increase your MPI dimension size, or request"
" fewer MPI tasks." " fewer MPI tasks."
) )
) )
self.x = -1 exit()
self.y = -1
self.z = -1 def _create_model(self) -> MPIModel:
else: return MPIModel()
self.x = self.rank % x_dim
self.y = (self.rank // x_dim) % y_dim
self.z = (self.rank // (x_dim * y_dim)) % z_dim
def run(self) -> Dict: def run(self) -> Dict:
print(f"I am rank {self.rank} and I will run at grid position {self.x}, {self.y}, {self.z}") return super().run()
if self.rank == 0: def _run_model(self, model_num: int) -> None:
print("Rank 0 is running the simulation") """Process for running a single model.
return super().run()
else: Args:
return {} model_num: index of model to be run
"""
config.sim_config.set_current_model(model_num)
model_config = self._create_model_config(model_num)
config.sim_config.set_model_config(model_config)
if not model_config.reuse_geometry():
model = self._create_model()
if model.is_coordinator():
scene = self._get_scene(model_num)
scene.create_internal_objects(model)
model.build()
return
if not config.sim_config.geometry_only:
solver = create_solver(model)
model.solve(solver)
del solver
if not config.sim_config.geometry_fixed:
# Manual garbage collection required to stop memory leak on GPUs
# when using pycuda
del model.G, model
gc.collect()
class TaskfarmContext(Context): class TaskfarmContext(Context):

查看文件

@@ -269,10 +269,7 @@ def run_main(args):
context = TaskfarmContext() context = TaskfarmContext()
# MPI running to divide model between ranks # MPI running to divide model between ranks
elif config.sim_config.args.mpi is not None: elif config.sim_config.args.mpi is not None:
x = config.sim_config.args.mpi[0] context = MPIContext()
y = config.sim_config.args.mpi[1]
z = config.sim_config.args.mpi[2]
context = MPIContext(x, y, z)
# Standard running (OpenMP/CUDA/OpenCL) # Standard running (OpenMP/CUDA/OpenCL)
else: else:
context = Context() context = Context()

查看文件

@@ -16,58 +16,188 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with gprMax. If not, see <http://www.gnu.org/licenses/>. # along with gprMax. If not, see <http://www.gnu.org/licenses/>.
from typing import Optional from enum import IntEnum, unique
from typing import List
import numpy as np
import numpy.typing as npt
from matplotlib.sankey import UP
from mpi4py import MPI from mpi4py import MPI
from numpy import ndarray
from gprMax.grid.fdtd_grid import FDTDGrid from gprMax.grid.fdtd_grid import FDTDGrid
@unique
class Dim(IntEnum):
X = 0
Y = 1
Z = 2
@unique
class Dir(IntEnum):
NEG = 0
POS = 1
class MPIGrid(FDTDGrid): class MPIGrid(FDTDGrid):
def __init__( HALO_SIZE = 1
self,
mpi_tasks_x: int, def __init__(self, comm: MPI.Cartcomm):
mpi_tasks_y: int, self.size = np.zeros(3, dtype=int)
mpi_tasks_z: int,
comm: Optional[MPI.Intracomm] = None,
):
super().__init__() super().__init__()
if comm is None: self.comm = comm
self.comm = MPI.COMM_WORLD
self.mpi_tasks = np.array(self.comm.dims)
self.lower_extent: npt.NDArray[np.intc] = np.zeros(3, dtype=int)
self.upper_extent: npt.NDArray[np.intc] = np.zeros(3, dtype=int)
self.global_size: npt.NDArray[np.intc] = np.zeros(3, dtype=int)
self.neighbours = np.full((3, 2), -1, dtype=int)
self.neighbours[Dim.X] = self.comm.Shift(direction=Dim.X, disp=1)
self.neighbours[Dim.Y] = self.comm.Shift(direction=Dim.Y, disp=1)
self.neighbours[Dim.Z] = self.comm.Shift(direction=Dim.Z, disp=1)
print(f"[Rank {self.rank}] Neighbours = {self.neighbours}")
self.send_halo_map = np.empty((3, 2), dtype=MPI.Datatype)
self.recv_halo_map = np.empty((3, 2), dtype=MPI.Datatype)
@property
def rank(self) -> int:
return self.comm.Get_rank()
@property
def coords(self) -> List[int]:
return self.comm.coords
@property
def nx(self) -> int:
return self.size[Dim.X]
@nx.setter
def nx(self, value: int):
self.size[Dim.X] = value
@property
def ny(self) -> int:
return self.size[Dim.Y]
@ny.setter
def ny(self, value: int):
self.size[Dim.Y] = value
@property
def nz(self) -> int:
return self.size[Dim.Z]
@nz.setter
def nz(self, value: int):
self.size[Dim.Z] = value
def broadcast_grid(self):
self.calculate_local_extents()
print(
f"[Rank {self.rank}] - Global size: {self.global_size}, Local size: {self.size}, Cart comm dims: {self.mpi_tasks}, Local coordinates: {self.coords}, Lower extent = {self.lower_extent}, Upper extent = {self.upper_extent}"
)
def _get_halo(self, array: ndarray, dim: Dim, direction: Dir) -> ndarray:
if direction == Dir.NEG:
index = 0
else: # Direction.UP
index = -1
if dim == Dim.X:
halo = array[:, index, index]
elif dim == Dim.Y:
halo = array[index, :, index]
else: # Dim.Z
halo = array[index, index, :]
return halo
def _set_halo(self, array: ndarray, halo: ndarray, dim: Dim, direction: Dir):
if direction == Dir.NEG:
index = 0
else: # Direction.UP
index = -1
if dim == Dim.X:
array[:, index, index] = halo
elif dim == Dim.Y:
array[index, :, index] = halo
else: # Dim.Z
array[index, index, :] = halo
def _halo_swap(self, array: ndarray, dim: Dim, dir: Dir):
neighbour = self.neighbours[dim][dir]
if neighbour != -1:
self.comm.Sendrecv(
[array, self.send_halo_map[dim][dir]],
neighbour,
0,
[array, self.recv_halo_map[dim][dir]],
neighbour,
0,
None,
)
def _halo_swap_dimension(self, array: ndarray, dim: Dim):
if self.coords[dim] % 2 == 0:
self._halo_swap(array, dim, Dir.NEG)
self._halo_swap(array, dim, Dir.POS)
else: else:
self.comm = comm self._halo_swap(array, dim, Dir.POS)
self._halo_swap(array, dim, Dir.NEG)
if mpi_tasks_x * mpi_tasks_y * mpi_tasks_z > self.comm.size: def halo_swap(self, array: ndarray):
# TODO: Raise expection - insufficient MPI tasks to create the grid as requested self._halo_swap_dimension(array, Dim.X)
pass self._halo_swap_dimension(array, Dim.Y)
self._halo_swap_dimension(array, Dim.Z)
self.mpi_tasks_x = mpi_tasks_x def build(self):
self.mpi_tasks_y = mpi_tasks_y self.calculate_local_extents()
self.mpi_tasks_z = mpi_tasks_z self.set_halo_map()
super().build()
self.halo_swap(self.Ex)
self.rank = self.comm.rank def set_halo_map(self):
self.size = self.comm.size size = self.size.tolist()
self.xmin = 0 for dim in Dim:
self.ymin = 0 halo_size = (self.size - 2).tolist()
self.zmin = 0 halo_size[dim] = 1
self.xmax = 0
self.ymax = 0 start = [1, 1, 1]
self.zmax = 0 self.send_halo_map[dim][Dir.NEG] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
start[dim] = size[dim] - 2
self.send_halo_map[dim][Dir.POS] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
start[dim] = 0
self.recv_halo_map[dim][Dir.NEG] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
start[dim] = size[dim] - 1
self.recv_halo_map[dim][Dir.POS] = MPI.DOUBLE.Create_subarray(size, halo_size, start)
self.send_halo_map[dim][Dir.NEG].Commit()
self.send_halo_map[dim][Dir.POS].Commit()
self.recv_halo_map[dim][Dir.NEG].Commit()
self.recv_halo_map[dim][Dir.POS].Commit()
def calculate_local_extents(self):
self.size = self.global_size // self.mpi_tasks
self.lower_extent = self.size * self.coords
at_end = (self.mpi_tasks - self.coords) <= 1
self.size += at_end * self.global_size % self.mpi_tasks
self.upper_extent = self.lower_extent + self.size
# Account for halo
self.size += 2
def initialise_field_arrays(self): def initialise_field_arrays(self):
super().initialise_field_arrays() super().initialise_field_arrays()
local_grid_size_x = self.nx // self.mpi_tasks_x
local_grid_size_y = self.ny // self.mpi_tasks_y
local_grid_size_z = self.nz // self.mpi_tasks_z
self.xmin = (self.rank % self.nx) * local_grid_size_x
self.ymin = ((self.mpi_tasks_x * self.rank) % self.ny) * local_grid_size_y
self.zmin = (
(self.mpi_tasks_y * self.mpi_tasks_x * self.rank) % self.nz
) * local_grid_size_z
self.xmax = self.xmin + local_grid_size_x
self.ymax = self.ymin + local_grid_size_y
self.zmax = self.zmin + local_grid_size_z

67
gprMax/mpi_model.py 普通文件
查看文件

@@ -0,0 +1,67 @@
import logging
from re import L
from typing import Optional
import numpy as np
from mpi4py import MPI
from gprMax import config
from gprMax.grid.mpi_grid import MPIGrid
from gprMax.model import Model
logger = logging.getLogger(__name__)
class MPIModel(Model):
def __init__(self, comm: Optional[MPI.Intracomm] = None):
if comm is None:
self.comm = MPI.COMM_WORLD
else:
self.comm = comm
self.rank = self.comm.Get_rank()
self.G = self._create_grid()
return super().__init__()
def is_coordinator(self):
return self.rank == 0
def build(self):
self.build_geometry()
return
return super().build()
def build_geometry(self):
if self.is_coordinator():
self._check_for_dispersive_materials([self.G])
self._check_memory_requirements([self.G])
self._broadcast_model()
self.G.global_size = np.array([self.gnx, self.gny, self.gnz], dtype=int)
self.G.build()
return
self.G.dispersion_analysis(self.iterations)
def _broadcast_model(self):
self.gnx = self.comm.bcast(self.gnx)
self.gny = self.comm.bcast(self.gny)
self.gnz = self.comm.bcast(self.gnz)
self.comm.Bcast(self.dl)
self.dt = self.comm.bcast(self.dt)
self.iterations = self.comm.bcast(self.iterations)
self.srcsteps = self.comm.bcast(self.srcsteps)
self.rxsteps = self.comm.bcast(self.rxsteps)
def _output_geometry(self):
if self.is_coordinator():
logger.info("Geometry views and geometry objects are not currently supported with MPI.")
def _create_grid(self) -> MPIGrid:
cart_comm = MPI.COMM_WORLD.Create_cart(config.sim_config.mpi)
return MPIGrid(cart_comm)