Implement Snapshots for MPI

Works, except for small errors in the corners of 2D models. Requires
further investigation.
这个提交包含在:
nmannall
2024-07-18 17:33:27 +01:00
父节点 5c9d59d7d1
当前提交 960ced4761
共有 7 个文件被更改,包括 472 次插入49 次删除

查看文件

@@ -144,6 +144,8 @@ class FDTDGrid:
logger.info(print_pml_info(self))
if not all(value == 0 for value in self.pmls["thickness"].values()):
self._build_pmls()
for snapshot in self.snapshots: # TODO: Remove if implement parallel build
snapshot.initialise_snapfields()
if self.averagevolumeobjects:
self._build_components()
self._tm_grid_update()

查看文件

@@ -31,6 +31,7 @@ from gprMax.cython.pml_build import pml_sum_er_mr
from gprMax.grid.fdtd_grid import FDTDGrid
from gprMax.pml import MPIPML, PML
from gprMax.receivers import Rx
from gprMax.snapshots import MPISnapshot, Snapshot
from gprMax.sources import Source
logger = logging.getLogger(__name__)
@@ -129,10 +130,20 @@ class MPIGrid(FDTDGrid):
grid_coord = self.get_grid_coord_from_coordinate(coord)
return self.comm.Get_cart_rank(grid_coord.tolist())
def global_to_local_coordinate(self, global_coord: npt.NDArray) -> npt.NDArray:
def get_ranks_between_coordinates(
self, start_coord: npt.NDArray, stop_coord: npt.NDArray
) -> List[int]:
start = self.get_grid_coord_from_coordinate(start_coord)
stop = self.get_grid_coord_from_coordinate(stop_coord) + 1
coord_to_rank = lambda c: self.comm.Get_cart_rank((start + c).tolist())
return [coord_to_rank(coord) for coord in np.ndindex(*(stop - start))]
def global_to_local_coordinate(
self, global_coord: npt.NDArray[np.intc]
) -> npt.NDArray[np.intc]:
return global_coord - self.lower_extent
def local_to_global_coordinate(self, local_coord: npt.NDArray) -> npt.NDArray:
def local_to_global_coordinate(self, local_coord: npt.NDArray[np.intc]) -> npt.NDArray[np.intc]:
return local_coord + self.lower_extent
def scatter_coord_objects(self, objects: List[CoordType]) -> List[CoordType]:
@@ -162,6 +173,48 @@ class MPIGrid(FDTDGrid):
else:
return objects
def scatter_snapshots(self):
if self.is_coordinator():
snapshots_by_rank: List[List[Optional[Snapshot]]] = [[] for _ in range(self.comm.size)]
for s in self.snapshots:
ranks = self.get_ranks_between_coordinates(s.start, s.stop)
for rank in range(self.comm.size):
if rank in ranks:
snapshots_by_rank[rank].append(s)
else:
snapshots_by_rank[rank].append(None)
else:
snapshots_by_rank = None
snapshots: List[Optional[MPISnapshot]] = self.comm.scatter(
snapshots_by_rank, root=self.COORDINATOR_RANK
)
for s in snapshots:
if s is None:
self.comm.Split(MPI.UNDEFINED)
else:
comm = self.comm.Split()
assert isinstance(comm, MPI.Intracomm)
start = self.get_grid_coord_from_coordinate(s.start)
stop = self.get_grid_coord_from_coordinate(s.stop) + 1
s.comm = comm.Create_cart((stop - start).tolist())
s.start = self.global_to_local_coordinate(s.start)
# Calculate number of steps needed to bring the start
# into the local grid (and not in the negative halo)
s.offset = np.where(
s.start < self.negative_halo_offset,
np.abs((s.start - self.negative_halo_offset) // s.step),
s.offset,
)
s.start += s.step * s.offset
s.stop = self.global_to_local_coordinate(s.stop)
s.stop = np.where(s.stop > self.size, self.size, s.stop)
self.snapshots = [s for s in snapshots if s is not None]
def scatter_3d_array(self, array: npt.NDArray) -> npt.NDArray:
self.comm.Bcast(array, root=self.COORDINATOR_RANK)
@@ -199,6 +252,8 @@ class MPIGrid(FDTDGrid):
self.hertziandipoles = self.scatter_coord_objects(self.hertziandipoles)
self.transmissionlines = self.scatter_coord_objects(self.transmissionlines)
self.scatter_snapshots()
self.pmls = self.comm.bcast(self.pmls, root=self.COORDINATOR_RANK)
if self.coords[Dim.X] != 0:
self.pmls["thickness"]["x0"] = 0

查看文件

@@ -285,7 +285,7 @@ class Model:
# Write any snapshots to file for each grid
for grid in [self.G] + self.subgrids:
if grid.snapshots:
save_snapshots(grid)
save_snapshots(grid.snapshots)
def solve(self, solver):
"""Solve using FDTD method.

查看文件

@@ -1,12 +1,13 @@
import logging
from typing import Optional
import numpy as np
from mpi4py import MPI
from gprMax import config
from gprMax.fields_outputs import write_hdf5_outputfile
from gprMax.grid.mpi_grid import MPIGrid
from gprMax.model import Model
from gprMax.snapshots import save_snapshots
logger = logging.getLogger(__name__)
@@ -74,10 +75,19 @@ class MPIModel(Model):
model_config.ompthreads = self.comm.bcast(model_config.ompthreads)
def write_output_data(self):
"""Writes output data, i.e. field data for receivers and snapshots to
file(s).
"""
# Write any snapshots to file for each grid
if self.G.snapshots:
save_snapshots(self.G.snapshots)
self.G.gather_grid_objects()
if self.is_coordinator():
# Write output data to file if they are any receivers in any grids
if self.is_coordinator() and (self.G.rxs or self.G.transmissionlines):
self.G.size = self.G.global_size
super().write_output_data()
write_hdf5_outputfile(config.get_model_config().output_file_path_ext, self.title, self)
def _output_geometry(self):
logger.warn("Geometry views and geometry objects are not currently supported with MPI\n")

查看文件

@@ -18,24 +18,27 @@
import logging
import sys
from enum import IntEnum, unique
from pathlib import Path
from typing import Dict
from typing import Dict, List
import h5py
import numpy as np
import numpy.typing as npt
from evtk.hl import imageToVTK
from mpi4py import MPI
from tqdm import tqdm
import gprMax.config as config
from ._version import __version__
from .cython.snapshots import calculate_snapshot_fields
from .utilities.utilities import get_terminal_width, round_value
from .utilities.utilities import get_terminal_width
logger = logging.getLogger(__name__)
def save_snapshots(grid):
def save_snapshots(snapshots: List["Snapshot"]):
"""Saves snapshots to file(s).
Args:
@@ -48,7 +51,7 @@ def save_snapshots(grid):
logger.info("")
logger.info(f"Snapshot directory: {snapshotdir.resolve()}")
for i, snap in enumerate(grid.snapshots):
for i, snap in enumerate(snapshots):
fn = snapshotdir / Path(snap.filename)
snap.filename = fn.with_suffix(snap.fileext)
pbar = tqdm(
@@ -56,14 +59,12 @@ def save_snapshots(grid):
leave=True,
unit="byte",
unit_scale=True,
desc=f"Writing snapshot file {i + 1} "
f"of {len(grid.snapshots)}, "
f"{snap.filename.name}",
desc=f"Writing snapshot file {i + 1} of {len(snapshots)}, {snap.filename.name}",
ncols=get_terminal_width() - 1,
file=sys.stdout,
disable=not config.sim_config.general["progressbars"],
)
snap.write_file(pbar, grid)
snap.write_file(pbar)
pbar.close()
logger.info("")
@@ -71,7 +72,14 @@ def save_snapshots(grid):
class Snapshot:
"""Snapshots of the electric and magnetic field values."""
allowableoutputs = {"Ex": None, "Ey": None, "Ez": None, "Hx": None, "Hy": None, "Hz": None}
allowableoutputs = {
"Ex": None,
"Ey": None,
"Ez": None,
"Hx": None,
"Hy": None,
"Hz": None,
}
# Snapshots can be output as VTK ImageData (.vti) format or
# HDF5 format (.h5) files
@@ -102,6 +110,8 @@ class Snapshot:
filename: str,
fileext: str,
outputs: Dict[str, bool],
grid_dl: npt.NDArray[np.single],
grid_dt: float,
):
"""
Args:
@@ -117,29 +127,87 @@ class Snapshot:
self.filename = filename
self.time = time
self.outputs = outputs
self.xs = xs
self.ys = ys
self.zs = zs
self.xf = xf
self.yf = yf
self.zf = zf
self.dx = dx
self.dy = dy
self.dz = dz
self.nx = round_value((self.xf - self.xs) / self.dx)
self.ny = round_value((self.yf - self.ys) / self.dy)
self.nz = round_value((self.zf - self.zs) / self.dz)
self.sx = slice(self.xs, self.xf + self.dx, self.dx)
self.sy = slice(self.ys, self.yf + self.dy, self.dy)
self.sz = slice(self.zs, self.zf + self.dz, self.dz)
self.grid_dl = grid_dl
self.grid_dt = grid_dt
self.start = np.array([xs, ys, zs], dtype=np.intc)
self.stop = np.array([xf, yf, zf], dtype=np.intc)
self.step = np.array([dx, dy, dz], dtype=np.intc)
self.size = np.ceil((self.stop - self.start) / self.step).astype(np.intc)
self.slice: list[slice] = list(map(slice, self.start, self.stop + self.step, self.step))
self.nbytes = 0
# Create arrays to hold the field data for snapshot
self.snapfields = {}
# Properties for backwards compatibility
@property
def xs(self) -> int:
return self.start[0]
@property
def ys(self) -> int:
return self.start[1]
@property
def zs(self) -> int:
return self.start[2]
@property
def xf(self) -> int:
return self.stop[0]
@property
def yf(self) -> int:
return self.stop[1]
@property
def zf(self) -> int:
return self.stop[2]
@property
def dx(self) -> int:
return self.step[0]
@property
def dy(self) -> int:
return self.step[1]
@property
def dz(self) -> int:
return self.step[2]
@property
def nx(self) -> int:
return self.size[0]
@property
def ny(self) -> int:
return self.size[1]
@property
def nz(self) -> int:
return self.size[2]
@property
def sx(self) -> slice:
return self.slice[0]
@property
def sy(self) -> slice:
return self.slice[1]
@property
def sz(self) -> slice:
return self.slice[2]
def initialise_snapfields(self):
for k, v in self.outputs.items():
if v:
self.snapfields[k] = np.zeros(
(self.nx, self.ny, self.nz), dtype=config.sim_config.dtypes["float_or_double"]
(self.nx, self.ny, self.nz),
dtype=config.sim_config.dtypes["float_or_double"],
)
self.nbytes += self.snapfields[k].nbytes
else:
@@ -191,7 +259,7 @@ class Snapshot:
self.snapfields["Hz"],
)
def write_file(self, pbar: tqdm, G):
def write_file(self, pbar: tqdm):
"""Writes snapshot file either as VTK ImageData (.vti) format
or HDF5 format (.h5) files
@@ -201,16 +269,15 @@ class Snapshot:
"""
if self.fileext == ".vti":
self.write_vtk(pbar, G)
self.write_vtk(pbar)
elif self.fileext == ".h5":
self.write_hdf5(pbar, G)
self.write_hdf5(pbar)
def write_vtk(self, pbar: tqdm, G):
def write_vtk(self, pbar: tqdm):
"""Writes snapshot file in VTK ImageData (.vti) format.
Args:
pbar: Progress bar class instance.
G: FDTDGrid class describing a grid in a model.
"""
celldata = {
@@ -221,12 +288,8 @@ class Snapshot:
imageToVTK(
str(self.filename.with_suffix("")),
origin=(
(self.xs * self.dx * G.dx),
(self.ys * self.dy * G.dy),
(self.zs * self.dz * G.dz),
),
spacing=((self.dx * G.dx), (self.dy * G.dy), (self.dz * G.dz)),
origin=tuple(self.start * self.step * self.grid_dl),
spacing=tuple(self.step * self.grid_dl),
cellData=celldata,
)
@@ -238,12 +301,11 @@ class Snapshot:
* np.dtype(config.sim_config.dtypes["float_or_double"]).itemsize
)
def write_hdf5(self, pbar: tqdm, G):
def write_hdf5(self, pbar: tqdm):
"""Writes snapshot file in HDF5 (.h5) format.
Args:
pbar: Progress bar class instance.
G: FDTDGrid class describing a grid in a model.
"""
f = h5py.File(self.filename, "w")
@@ -251,8 +313,8 @@ class Snapshot:
# TODO: Output model name (title) and grid name? in snapshot output
# f.attrs["Title"] = G.title
f.attrs["nx_ny_nz"] = (self.nx, self.ny, self.nz)
f.attrs["dx_dy_dz"] = (self.dx * G.dx, self.dy * G.dy, self.dz * G.dz)
f.attrs["time"] = self.time * G.dt
f.attrs["dx_dy_dz"] = self.step * self.grid_dl
f.attrs["time"] = self.time * self.grid_dt
for key in ["Ex", "Ey", "Ez", "Hx", "Hy", "Hz"]:
if self.outputs[key]:
@@ -262,7 +324,283 @@ class Snapshot:
f.close()
def htod_snapshot_array(G, queue=None):
@unique
class Dim(IntEnum):
X = 0
Y = 1
Z = 2
@unique
class Dir(IntEnum):
NEG = 0
POS = 1
class MPISnapshot(Snapshot):
H_TAG = 0
EX_TAG = 1
EY_TAG = 2
EZ_TAG = 3
def __init__(
self,
xs: int,
ys: int,
zs: int,
xf: int,
yf: int,
zf: int,
dx: int,
dy: int,
dz: int,
time: int,
filename: str,
fileext: str,
outputs: Dict[str, bool],
grid_dl: npt.NDArray[np.single],
grid_dt: float,
):
super().__init__(
xs, ys, zs, xf, yf, zf, dx, dy, dz, time, filename, fileext, outputs, grid_dl, grid_dt
)
self.offset = np.zeros(3, dtype=np.intc)
self.global_size = self.size.copy()
self.comm: MPI.Cartcomm = None # type: ignore
def initialise_snapfields(self):
# Start and stop may have changed since initialisation
self.size = np.ceil((self.stop - self.start) / self.step).astype(np.intc)
return super().initialise_snapfields()
def store(self, G):
"""Store (in memory) electric and magnetic field values for snapshot.
Args:
G: FDTDGrid class describing a grid in a model.
"""
logger.debug(f"Saving snapshot for iteration: {self.time}")
# Get neighbours
self.neighbours = np.full((3, 2), -1, dtype=int)
self.neighbours[Dim.X] = self.comm.Shift(direction=Dim.X, disp=1)
self.neighbours[Dim.Y] = self.comm.Shift(direction=Dim.Y, disp=1)
self.neighbours[Dim.Z] = self.comm.Shift(direction=Dim.Z, disp=1)
# If we have a positive neighbour, an extra step may land in our
# local halo. The numbers here will be correct (except the top
# corner), but using it would require extra checks below.
slice_stop = np.where(
self.neighbours[:, Dir.POS] != -1,
self.stop,
self.stop + self.step,
)
self.slice = list(map(slice, self.start, slice_stop, self.step))
# Memory views of field arrays to dimensions required for the snapshot
Exslice = np.ascontiguousarray(G.Ex[self.sx, self.sy, self.sz])
Eyslice = np.ascontiguousarray(G.Ey[self.sx, self.sy, self.sz])
Ezslice = np.ascontiguousarray(G.Ez[self.sx, self.sy, self.sz])
Hxslice = np.ascontiguousarray(G.Hx[self.sx, self.sy, self.sz])
Hyslice = np.ascontiguousarray(G.Hy[self.sx, self.sy, self.sz])
Hzslice = np.ascontiguousarray(G.Hz[self.sx, self.sy, self.sz])
"""
Exslice - y + z halo
Eyslice - x + z halo
Ezslice - x + y halo
Hxslice - x halo
Hyslice - y halo
Hzslice - z halo
"""
# Shape and dtype should be the same for all field array slices
shape = Hxslice.shape
dtype = Hxslice.dtype
Hxhalo = np.empty((1, shape[Dim.Y], shape[Dim.Z]), dtype=dtype)
Hyhalo = np.empty((shape[Dim.X], 1, shape[Dim.Z]), dtype=dtype)
Hzhalo = np.empty((shape[Dim.X], shape[Dim.Y], 1), dtype=dtype)
Exyhalo = np.empty((shape[Dim.X], 1, shape[Dim.Z]), dtype=dtype)
Eyzhalo = np.empty((shape[Dim.X], shape[Dim.Y], 1), dtype=dtype)
Ezxhalo = np.empty((1, shape[Dim.Y], shape[Dim.Z]), dtype=dtype)
x_offset = self.neighbours[Dim.X][Dir.POS] != -1
y_offset = self.neighbours[Dim.Y][Dir.POS] != -1
z_offset = self.neighbours[Dim.Z][Dir.POS] != -1
Exzhalo = np.empty((shape[Dim.X], shape[Dim.Y] + y_offset, 1), dtype=dtype)
Eyxhalo = np.empty((1, shape[Dim.Y], shape[Dim.Z] + z_offset), dtype=dtype)
Ezyhalo = np.empty((shape[Dim.X] + x_offset, 1, shape[Dim.Z]), dtype=dtype)
blocking_requests: List[MPI.Request] = []
requests: List[MPI.Request] = []
if self.neighbours[Dim.X][Dir.NEG] != -1:
requests += [
self.comm.Isend(Hxslice[0, :, :], self.neighbours[Dim.X][Dir.NEG], self.H_TAG),
self.comm.Isend(Ezslice[0, :, :], self.neighbours[Dim.X][Dir.NEG], self.EZ_TAG),
]
if self.neighbours[Dim.X][Dir.POS] != -1:
blocking_requests.append(
self.comm.Irecv(Ezxhalo, self.neighbours[Dim.X][Dir.POS], self.EZ_TAG),
)
requests += [
self.comm.Irecv(Hxhalo, self.neighbours[Dim.X][Dir.POS], self.H_TAG),
self.comm.Irecv(Eyxhalo, self.neighbours[Dim.X][Dir.POS], self.EY_TAG),
]
if self.neighbours[Dim.Y][Dir.NEG] != -1:
requests += [
self.comm.Isend(
np.ascontiguousarray(Hyslice[:, 0, :]),
self.neighbours[Dim.Y][Dir.NEG],
self.H_TAG,
),
self.comm.Isend(
np.ascontiguousarray(Exslice[:, 0, :]),
self.neighbours[Dim.Y][Dir.NEG],
self.EX_TAG,
),
]
if self.neighbours[Dim.Y][Dir.POS] != -1:
blocking_requests.append(
self.comm.Irecv(Exyhalo, self.neighbours[Dim.Y][Dir.POS], self.EX_TAG),
)
requests += [
self.comm.Irecv(Hyhalo, self.neighbours[Dim.Y][Dir.POS], self.H_TAG),
self.comm.Irecv(Ezyhalo, self.neighbours[Dim.Y][Dir.POS], self.EZ_TAG),
]
if self.neighbours[Dim.Z][Dir.NEG] != -1:
requests += [
self.comm.Isend(
np.ascontiguousarray(Hzslice[:, :, 0]),
self.neighbours[Dim.Z][Dir.NEG],
self.H_TAG,
),
self.comm.Isend(
np.ascontiguousarray(Eyslice[:, :, 0]),
self.neighbours[Dim.Z][Dir.NEG],
self.EY_TAG,
),
]
if self.neighbours[Dim.Z][Dir.POS] != -1:
blocking_requests.append(
self.comm.Irecv(Eyzhalo, self.neighbours[Dim.Z][Dir.POS], self.EY_TAG),
)
requests += [
self.comm.Irecv(Hzhalo, self.neighbours[Dim.Z][Dir.POS], self.H_TAG),
self.comm.Irecv(Exzhalo, self.neighbours[Dim.Z][Dir.POS], self.EX_TAG),
]
if len(blocking_requests) > 0:
blocking_requests[0].Waitall(blocking_requests)
logger.debug(f"Initial halo exchanges complete")
if self.neighbours[Dim.X][Dir.POS] != -1:
Ezslice = np.concatenate((Ezslice, Ezxhalo), axis=Dim.X)
if self.neighbours[Dim.Y][Dir.POS] != -1:
Exslice = np.concatenate((Exslice, Exyhalo), axis=Dim.Y)
if self.neighbours[Dim.Z][Dir.POS] != -1:
Eyslice = np.concatenate((Eyslice, Eyzhalo), axis=Dim.Z)
if self.neighbours[Dim.X][Dir.NEG] != -1:
requests.append(
self.comm.Isend(Eyslice[0, :, :], self.neighbours[Dim.X][Dir.NEG], self.EY_TAG),
)
if self.neighbours[Dim.Y][Dir.NEG] != -1:
requests.append(
self.comm.Isend(
np.ascontiguousarray(Ezslice[:, 0, :]),
self.neighbours[Dim.Y][Dir.NEG],
self.EZ_TAG,
),
)
if self.neighbours[Dim.Z][Dir.NEG] != -1:
requests.append(
self.comm.Isend(
np.ascontiguousarray(Exslice[:, :, 0]),
self.neighbours[Dim.Z][Dir.NEG],
self.EX_TAG,
),
)
if len(requests) > 0:
requests[0].Waitall(requests)
logger.debug(f"All halo exchanges complete")
if self.neighbours[Dim.X][Dir.POS] != -1:
Eyslice = np.concatenate((Eyslice, Eyxhalo), axis=Dim.X)
Hxslice = np.concatenate((Hxslice, Hxhalo), axis=Dim.X)
if self.neighbours[Dim.Y][Dir.POS] != -1:
Ezslice = np.concatenate((Ezslice, Ezyhalo), axis=Dim.Y)
Hyslice = np.concatenate((Hyslice, Hyhalo), axis=Dim.Y)
if self.neighbours[Dim.Z][Dir.POS] != -1:
Exslice = np.concatenate((Exslice, Exzhalo), axis=Dim.Z)
Hzslice = np.concatenate((Hzslice, Hzhalo), axis=Dim.Z)
# Calculate field values at points (comes from averaging field
# components in cells)
calculate_snapshot_fields(
self.nx,
self.ny,
self.nz,
config.get_model_config().ompthreads,
self.outputs["Ex"],
self.outputs["Ey"],
self.outputs["Ez"],
self.outputs["Hx"],
self.outputs["Hy"],
self.outputs["Hz"],
Exslice,
Eyslice,
Ezslice,
Hxslice,
Hyslice,
Hzslice,
self.snapfields["Ex"],
self.snapfields["Ey"],
self.snapfields["Ez"],
self.snapfields["Hx"],
self.snapfields["Hy"],
self.snapfields["Hz"],
)
def write_hdf5(self, pbar: tqdm):
"""Writes snapshot file in HDF5 (.h5) format.
Args:
pbar: Progress bar class instance.
"""
f = h5py.File(self.filename, "w", driver="mpio", comm=self.comm)
f.attrs["gprMax"] = __version__
# TODO: Output model name (title) and grid name? in snapshot output
# f.attrs["Title"] = G.title
f.attrs["nx_ny_nz"] = self.global_size
f.attrs["dx_dy_dz"] = self.step * self.grid_dl
f.attrs["time"] = self.time * self.grid_dt
for key in ["Ex", "Ey", "Ez", "Hx", "Hy", "Hz"]:
if self.outputs[key]:
dset = f.create_dataset(key, self.global_size)
# TODO: Is there a better way to do this slice?
start = self.offset
stop = start + self.size
dset[start[0] : stop[0], start[1] : stop[1], start[2] : stop[2]] = self.snapfields[
key
]
pbar.update(n=self.snapfields[key].nbytes)
f.close()
def htod_snapshot_array(snapshots: List[Snapshot], queue=None):
"""Initialises arrays on compute device to store field data for snapshots.
Args:
@@ -274,7 +612,7 @@ def htod_snapshot_array(G, queue=None):
"""
# Get dimensions of largest requested snapshot
for snap in G.snapshots:
for snap in snapshots:
if snap.nx > Snapshot.nx_max:
Snapshot.nx_max = snap.nx
if snap.ny > Snapshot.ny_max:
@@ -304,7 +642,7 @@ def htod_snapshot_array(G, queue=None):
# 4D arrays to store snapshots on GPU, e.g. snapEx(time, x, y, z);
# if snapshots are not being stored on the GPU during the simulation then
# they are copied back to the host after each iteration, hence numsnaps = 1
numsnaps = 1 if config.get_model_config().device["snapsgpu2cpu"] else len(G.snapshots)
numsnaps = 1 if config.get_model_config().device["snapsgpu2cpu"] else len(snapshots)
snapEx = np.zeros(
(numsnaps, Snapshot.nx_max, Snapshot.ny_max, Snapshot.nz_max),
dtype=config.sim_config.dtypes["float_or_double"],