Merge branch '30-parallel-build' into mpi

这个提交包含在:
Nathan Mannall
2025-03-21 16:36:30 +00:00
当前提交 47265ff2b0
共有 5 个文件被更改,包括 145 次插入241 次删除

查看文件

@@ -21,7 +21,7 @@ import itertools
import logging
import sys
from collections import OrderedDict
from typing import Any, Iterable, List, Literal, Tuple, Union
from typing import Any, Iterable, List, Tuple, Union
import numpy as np
import numpy.typing as npt
@@ -68,6 +68,9 @@ class FDTDGrid:
self.iterations = 0 # Total number of iterations
self.timewindow = 0.0
self.srcsteps = np.zeros(3, dtype=np.int32)
self.rxsteps = np.zeros(3, dtype=np.int32)
# Field Arrays
self.Ex: npt.NDArray[np.float32]
self.Ey: npt.NDArray[np.float32]
@@ -421,6 +424,16 @@ class FDTDGrid:
logger.info("")
logger.info(f"Materials [{self.name}]:\n{materialstable.table}\n")
def update_sources_and_recievers(self):
"""Update position of sources and receivers."""
# Adjust position of simple sources and receivers if required
model_num = config.sim_config.current_model
if any(self.srcsteps != 0):
self.update_simple_source_positions(model_num)
if any(self.rxsteps != 0):
self.update_receiver_positions(model_num)
def _update_positions(
self, items: Iterable[Union[Source, Rx]], step_size: npt.NDArray[np.int32], step_number: int
) -> None:
@@ -435,31 +448,22 @@ class FDTDGrid:
ValueError: Raised if any of the items would be stepped
outside of the grid.
"""
if step_size[0] != 0 or step_size[1] != 0 or step_size[2] != 0:
if any(step_size > 0):
for item in items:
if step_number == 0:
if (
item.xcoord + step_size[0] * config.sim_config.model_end < 0
or item.xcoord + step_size[0] * config.sim_config.model_end > self.nx
or item.ycoord + step_size[1] * config.sim_config.model_end < 0
or item.ycoord + step_size[1] * config.sim_config.model_end > self.ny
or item.zcoord + step_size[2] * config.sim_config.model_end < 0
or item.zcoord + step_size[2] * config.sim_config.model_end > self.nz
):
raise ValueError
item.coord = item.coordorigin + step_number * step_size
# Check item won't be stepped outside of the grid
end_coord = item.coord + step_size * config.sim_config.model_end
self.within_bounds(end_coord)
else:
item.coord = item.coordorigin + step_number * step_size
def update_simple_source_positions(
self, step_size: npt.NDArray[np.int32], step: int = 0
) -> None:
def update_simple_source_positions(self, step: int = 0) -> None:
"""Update the positions of sources in the grid.
Move hertzian dipole and magnetic dipole sources. Transmission
line sources and voltage sources will not be moved.
Args:
step_size: Number of grid cells to move the sources each
step.
step: Number of steps to move the sources by.
Raises:
@@ -468,18 +472,16 @@ class FDTDGrid:
"""
try:
self._update_positions(
itertools.chain(self.hertziandipoles, self.magneticdipoles), step_size, step
itertools.chain(self.hertziandipoles, self.magneticdipoles), self.srcsteps, step
)
except ValueError as e:
logger.exception("Source(s) will be stepped to a position outside the domain.")
raise ValueError from e
def update_receiver_positions(self, step_size: npt.NDArray[np.int32], step: int = 0) -> None:
def update_receiver_positions(self, step: int = 0) -> None:
"""Update the positions of receivers in the grid.
Args:
step_size: Number of grid cells to move the receivers each
step.
step: Number of steps to move the receivers by.
Raises:
@@ -487,7 +489,7 @@ class FDTDGrid:
outside of the grid.
"""
try:
self._update_positions(self.rxs, step_size, step)
self._update_positions(self.rxs, self.rxsteps, step)
except ValueError as e:
logger.exception("Receiver(s) will be stepped to a position outside the domain.")
raise ValueError from e

查看文件

@@ -24,7 +24,7 @@ from typing import List, Optional, Tuple, TypeVar, Union
import numpy as np
import numpy.typing as npt
from mpi4py import MPI
from numpy import empty, ndarray
from numpy import ndarray
from gprMax import config
from gprMax.cython.pml_build import pml_sum_er_mr
@@ -157,6 +157,15 @@ class MPIGrid(FDTDGrid):
def get_grid_coord_from_local_coordinate(
self, local_coord: npt.NDArray[np.int32]
) -> npt.NDArray[np.int32]:
"""Get the MPI grid coordinate for a local grid coordinate.
Args:
local_coord: Local grid coordinate.
Returns:
grid_coord: Coordinate of the MPI rank containing the local
grid coordinate.
"""
coord = self.local_to_global_coordinate(local_coord)
return self.get_grid_coord_from_coordinate(coord)
@@ -277,73 +286,24 @@ class MPIGrid(FDTDGrid):
def local_bounds_overlap_grid(
self, local_start: npt.NDArray[np.int32], local_stop: npt.NDArray[np.int32]
) -> bool:
return all(local_start < self.size) and all(local_stop > self.negative_halo_offset)
"""Check if local bounds overlap with the grid.
def limit_global_bounds_to_within_local_grid(
self,
start: npt.NDArray[np.int32],
stop: npt.NDArray[np.int32],
step: npt.NDArray[np.int32] = np.ones(3, dtype=np.int32),
) -> Tuple[npt.NDArray[np.int32], npt.NDArray[np.int32], npt.NDArray[np.int32]]:
local_start = self.global_to_local_coordinate(start)
# Bring start into the local grid (and not in the negative halo)
# local_start must still be aligned with the provided step.
local_start = np.where(
local_start < self.negative_halo_offset,
self.negative_halo_offset + ((local_start - self.negative_halo_offset) % step),
local_start,
)
local_stop = self.global_to_local_coordinate(stop)
# Limit local_stop such that it is at most one step beyond the
# max index of the grid. As local_stop is the upper bound, it is
# exclusive, meaning when used to slice an array (with the
# provided step), the last element accessed will one step below
# local_stop.
# Note: using self.size as an index in any dimension would fall
# in the positive halo (this counts as outside the local grid).
local_stop = np.where(
local_stop > self.size,
self.size + ((local_stop - self.size) % step),
local_stop,
)
offset = self.local_to_global_coordinate(local_start) - start
return local_start, local_stop, offset
def scatter_coord_objects(self, objects: List[CoordType]) -> List[CoordType]:
"""Scatter coord objects to the correct MPI rank.
Coord objects (sources and receivers) are scattered to the MPI
rank based on their location in the grid. The receiving MPI rank
converts the object locations to its own local grid.
The bounds overlap if any of the 3D box as defined by the lower
and upper bounds overlaps with the local grid (excluding the
halo).
Args:
objects: Coord objects to be scattered.
local_start: Lower bound in the local grid coordinate space.
local_stop: Upper bound in the local grid coordinate space.
Returns:
scattered_objects: List of Coord objects belonging to the
current MPI rank.
overlaps_grid: True if the box generated by the lower and
upper bound overlaps with the local grid.
"""
if self.is_coordinator():
objects_by_rank: List[List[CoordType]] = [[] for _ in range(self.comm.size)]
for o in objects:
objects_by_rank[self.get_rank_from_coordinate(o.coord)].append(o)
else:
objects_by_rank = None
objects = self.comm.scatter(objects_by_rank, root=self.COORDINATOR_RANK)
for o in objects:
o.coord = self.global_to_local_coordinate(o.coord)
return objects
return all(local_start < self.size) and all(local_stop > self.negative_halo_offset)
def gather_coord_objects(self, objects: List[CoordType]) -> List[CoordType]:
"""Scatter coord objects to the correct MPI rank.
"""Gather coord objects on the coordinator MPI rank.
The sending MPI rank converts the object locations to the global
grid. The coord objects (sources and receivers) are all sent to
@@ -359,6 +319,7 @@ class MPIGrid(FDTDGrid):
"""
for o in objects:
o.coord = self.local_to_global_coordinate(o.coord)
gathered_objects: Optional[List[List[CoordType]]] = self.comm.gather(
objects, root=self.COORDINATOR_RANK
)
@@ -368,152 +329,6 @@ class MPIGrid(FDTDGrid):
else:
return objects
def scatter_snapshots(self):
"""Scatter snapshots to the correct MPI rank.
Each snapshot is sent by the coordinator to the MPI ranks
containing the snapshot. A new communicator is created for each
snapshot, and each rank bounds the snapshot to within its own
local grid.
"""
if self.is_coordinator():
snapshots_by_rank = [[] for _ in range(self.comm.size)]
for snapshot in self.snapshots:
ranks = self.get_ranks_between_coordinates(snapshot.start, snapshot.stop)
for rank in range(
self.comm.size
): # TODO: Loop over ranks in snapshot, not all ranks
if rank in ranks:
snapshots_by_rank[rank].append(snapshot)
else:
# All ranks need the same number of 'snapshots'
# (which may be None) to ensure snapshot
# communicators are setup correctly and to avoid
# deadlock.
snapshots_by_rank[rank].append(None)
else:
snapshots_by_rank = None
snapshots = self.comm.scatter(snapshots_by_rank, root=self.COORDINATOR_RANK)
for snapshot in snapshots:
if snapshot is None:
self.comm.Split(MPI.UNDEFINED)
else:
comm = self.comm.Split()
assert isinstance(comm, MPI.Intracomm)
start = self.get_grid_coord_from_coordinate(snapshot.start)
stop = self.get_grid_coord_from_coordinate(snapshot.stop) + 1
snapshot.comm = comm.Create_cart((stop - start).tolist())
snapshot.start = self.global_to_local_coordinate(snapshot.start)
# Calculate number of steps needed to bring the start
# into the local grid (and not in the negative halo)
snapshot.offset = np.where(
snapshot.start < self.negative_halo_offset,
np.abs((snapshot.start - self.negative_halo_offset) // snapshot.step),
snapshot.offset,
)
snapshot.start += snapshot.step * snapshot.offset
snapshot.stop = self.global_to_local_coordinate(snapshot.stop)
snapshot.stop = np.where(
snapshot.stop > self.size,
self.size + ((snapshot.stop - self.size) % snapshot.step),
snapshot.stop,
)
self.snapshots = [s for s in snapshots if s is not None]
def scatter_3d_array(self, array: npt.NDArray) -> npt.NDArray:
"""Scatter a 3D array to each MPI rank
Use to distribute a 3D array across MPI ranks. Each rank will
receive its own segment of the array including a negative halo,
but NOT a positive halo.
Args:
array: Array to be scattered
Returns:
scattered_array: Local extent of the array for the current
MPI rank.
"""
# TODO: Use Scatter instead of Bcast
self.comm.Bcast(array, root=self.COORDINATOR_RANK)
return array[
self.lower_extent[Dim.X] : self.upper_extent[Dim.X],
self.lower_extent[Dim.Y] : self.upper_extent[Dim.Y],
self.lower_extent[Dim.Z] : self.upper_extent[Dim.Z],
].copy(order="C")
def scatter_4d_array(self, array: npt.NDArray) -> npt.NDArray:
"""Scatter a 4D array to each MPI rank
Use to distribute a 4D array across MPI ranks. The first
dimension is ignored when partitioning the array. Each rank will
receive its own segment of the array including a negative halo,
but NOT a positive halo.
Args:
array: Array to be scattered
Returns:
scattered_array: Local extent of the array for the current
MPI rank.
"""
# TODO: Use Scatter instead of Bcast
self.comm.Bcast(array, root=self.COORDINATOR_RANK)
return array[
:,
self.lower_extent[Dim.X] : self.upper_extent[Dim.X],
self.lower_extent[Dim.Y] : self.upper_extent[Dim.Y],
self.lower_extent[Dim.Z] : self.upper_extent[Dim.Z],
].copy(order="C")
def scatter_4d_array_with_positive_halo(self, array: npt.NDArray) -> npt.NDArray:
"""Scatter a 4D array to each MPI rank
Use to distribute a 4D array across MPI ranks. The first
dimension is ignored when partitioning the array. Each rank will
receive its own segment of the array including both a negative
and positive halo.
Args:
array: Array to be scattered
Returns:
scattered_array: Local extent of the array for the current
MPI rank.
"""
# TODO: Use Scatter instead of Bcast
self.comm.Bcast(array, root=self.COORDINATOR_RANK)
return array[
:,
self.lower_extent[Dim.X] : self.upper_extent[Dim.X] + 1,
self.lower_extent[Dim.Y] : self.upper_extent[Dim.Y] + 1,
self.lower_extent[Dim.Z] : self.upper_extent[Dim.Z] + 1,
].copy(order="C")
def distribute_grid(self):
"""Distribute grid properties and objects to all MPI ranks.
Global properties/objects are broadcast to all ranks whereas
local properties/objects are scattered to the relevant ranks.
"""
pass
# self.scatter_snapshots()
# self._halo_swap_array(self.ID[0])
# self._halo_swap_array(self.ID[1])
# self._halo_swap_array(self.ID[2])
# self._halo_swap_array(self.ID[3])
# self._halo_swap_array(self.ID[4])
# self._halo_swap_array(self.ID[5])
def gather_grid_objects(self):
"""Gather sources and receivers."""
@@ -670,7 +485,6 @@ class MPIGrid(FDTDGrid):
raise ValueError
self.set_halo_map()
self.distribute_grid()
# TODO: Check PML is not thicker than the grid size
@@ -688,6 +502,88 @@ class MPIGrid(FDTDGrid):
super().build()
def update_sources_and_recievers(self):
"""Update position of sources and receivers.
If any sources or receivers have stepped outside of the local
grid, they will be moved to the correct MPI rank.
"""
super().update_sources_and_recievers()
# Check it is possible for sources and receivers to have moved
model_num = config.sim_config.current_model
if (all(self.srcsteps == 0) and all(self.rxsteps == 0)) or model_num == 0:
return
# Get items that are outside the local bounds of the grid
items_to_send = list(
itertools.filterfalse(
lambda x: self.within_bounds(x.coord),
itertools.chain(
self.voltagesources,
self.hertziandipoles,
self.magneticdipoles,
self.transmissionlines,
self.discreteplanewaves,
self.rxs,
),
)
)
# Map items being sent to the global coordinate space
for item in items_to_send:
item.coord = self.local_to_global_coordinate(item.coord)
send_count_by_rank = np.zeros(self.comm.size, dtype=np.int32)
# Send items to correct rank
for rank, items in itertools.groupby(
items_to_send, lambda x: self.get_rank_from_coordinate(x.coord)
):
self.comm.isend(list(items), rank)
send_count_by_rank[rank] += 1
# Communicate the number of messages sent to each rank
if self.is_coordinator():
self.comm.Reduce(MPI.IN_PLACE, [send_count_by_rank, MPI.INT32_T], op=MPI.SUM)
else:
self.comm.Reduce([send_count_by_rank, MPI.INT32_T], None, op=MPI.SUM)
# Get number of messages this rank will receive
messages_to_receive = np.zeros(1, dtype=np.int32)
if self.is_coordinator():
self.comm.Scatter([send_count_by_rank, MPI.INT32_T], [messages_to_receive, MPI.INT32_T])
else:
self.comm.Scatter(None, [messages_to_receive, MPI.INT32_T])
# Receive new items for this rank
for _ in range(messages_to_receive[0]):
new_items = self.comm.recv(None, MPI.ANY_SOURCE)
for item in new_items:
item.coord = self.global_to_local_coordinate(item.coord)
if isinstance(item, Rx):
self.add_receiver(item)
else:
self.add_source(item)
# If this rank sent any items, remove them from our source and
# receiver lists
if len(items_to_send) > 0:
# Map items sent back to the local coordinate space
for item in items_to_send:
item.coord = self.global_to_local_coordinate(item.coord)
filter_items = lambda items: list(
filter(lambda item: self.within_bounds(item.coord), items)
)
self.voltagesources = filter_items(self.voltagesources)
self.hertziandipoles = filter_items(self.hertziandipoles)
self.magneticdipoles = filter_items(self.magneticdipoles)
self.transmissionlines = filter_items(self.transmissionlines)
self.discreteplanewaves = filter_items(self.discreteplanewaves)
self.rxs = filter_items(self.rxs)
def has_neighbour(self, dim: Dim, dir: Dir) -> bool:
"""Test if the current rank has a specified neighbour.

查看文件

@@ -59,9 +59,6 @@ class Model:
self.iteration = 0 # Current iteration number
self.srcsteps = np.zeros(3, dtype=np.int32)
self.rxsteps = np.zeros(3, dtype=np.int32)
self.G = self._create_grid()
self.subgrids: List[SubGridBaseGrid] = []
@@ -157,6 +154,22 @@ class Model:
def timewindow(self, value: float):
self.G.timewindow = value
@property
def srcsteps(self) -> npt.NDArray[np.int32]:
return self.G.srcsteps
@srcsteps.setter
def srcsteps(self, value: npt.NDArray[np.int32]):
self.G.srcsteps = value
@property
def rxsteps(self) -> npt.NDArray[np.int32]:
return self.G.rxsteps
@rxsteps.setter
def rxsteps(self, value: npt.NDArray[np.int32]):
self.G.rxsteps = value
def _create_grid(self) -> FDTDGrid:
"""Create grid object according to solver.
@@ -320,8 +333,6 @@ class Model:
def build(self):
"""Builds the Yee cells for a model."""
G = self.G
# Monitor memory usage
self.p = psutil.Process()
@@ -335,10 +346,7 @@ class Model:
f"Output directory: {config.get_model_config().output_file_path.parent.resolve()}\n"
)
# Adjust position of simple sources and receivers if required
model_num = config.sim_config.current_model
G.update_simple_source_positions(self.srcsteps, step=model_num)
G.update_receiver_positions(self.rxsteps, step=model_num)
self.G.update_sources_and_recievers()
self._output_geometry()

查看文件

@@ -242,6 +242,7 @@ class MPIModel(Model):
if self.G.snapshots:
save_snapshots(self.G.snapshots)
# TODO: Output sources and receivers using parallel I/O
self.G.gather_grid_objects()
# Write output data to file if they are any receivers in any grids

查看文件

@@ -105,9 +105,6 @@ class CPUUpdates(Updates[GridType]):
precompute=False,
)
# if(self.grid.iteration % 10 == 0):
# np.save('./snapshots/electric_z_{}.npy'.format(self.grid.iteration), self.grid.Ex)
def update_electric_a(self):
"""Updates electric field components."""
# All materials are non-dispersive so do standard update.