Build subgrids from Model and simplify FDTDGrid.build()

Subgrids are no longer contained within an FDTDGrid. The FDTDGrid.build()
function now just builds the grid it is called on. The Model class now
checks memory requirements etc.
这个提交包含在:
nmannall
2024-05-20 16:00:47 +01:00
父节点 d4a3f3e3a4
当前提交 ce1e811fb1
共有 2 个文件被更改,包括 246 次插入238 次删除

查看文件

@@ -23,7 +23,6 @@ import sys
from collections import OrderedDict
from typing import Any, Iterable, List, Union
import humanize
import numpy as np
from terminaltables import SingleTable
from tqdm import tqdm
@@ -40,7 +39,6 @@ from gprMax.snapshots import Snapshot
from gprMax.sources import HertzianDipole, MagneticDipole, Source, TransmissionLine, VoltageSource
# from gprMax.subgrids.grid import SubGridBaseGrid
from gprMax.utilities.host_info import mem_check_build_all, mem_check_run_all
from gprMax.utilities.utilities import fft_power, get_terminal_width, round_value
from gprMax.waveforms import Waveform
@@ -138,105 +136,22 @@ class FDTDGrid:
self.dl[2] = value
def build(self) -> None:
# Print info on any subgrids
for subgrid in self.subgrids:
subgrid.print_info()
# Combine available grids
grids = [self] + self.subgrids
# Check for dispersive materials (and specific type)
if config.get_model_config().materials["maxpoles"] != 0:
# TODO: This sets materials["drudelorentz"] based only the
# last grid/subgrid. Is that correct?
for grid in grids:
config.get_model_config().materials["drudelorentz"] = any(
[m for m in grid.materials if "drude" in m.type or "lorentz" in m.type]
)
# Set data type if any dispersive materials (must be done before memory checks)
config.get_model_config().set_dispersive_material_types()
# Check memory requirements to build model/scene (different to memory
# requirements to run model when FractalVolumes/FractalSurfaces are
# used as these can require significant additional memory)
total_mem_build, mem_strs_build = mem_check_build_all(grids)
# Check memory requirements to run model
total_mem_run, mem_strs_run = mem_check_run_all(grids)
if total_mem_build > total_mem_run:
logger.info(
f'\nMemory required (estimated): {" + ".join(mem_strs_build)} + '
f"~{humanize.naturalsize(config.get_model_config().mem_overhead)} "
f"overhead = {humanize.naturalsize(total_mem_build)}"
)
else:
logger.info(
f'\nMemory required (estimated): {" + ".join(mem_strs_run)} + '
f"~{humanize.naturalsize(config.get_model_config().mem_overhead)} "
f"overhead = {humanize.naturalsize(total_mem_run)}"
)
# Build grids
for grid in grids:
# Set default CFS parameter for PMLs if not user provided
if not grid.pmls["cfs"]:
grid.pmls["cfs"] = [CFS()]
logger.info(print_pml_info(grid))
if not all(value == 0 for value in grid.pmls["thickness"].values()):
grid._build_pmls()
if grid.averagevolumeobjects:
grid._build_components()
grid._tm_grid_update()
grid._update_voltage_source_materials()
grid.initialise_field_arrays()
grid.initialise_std_update_coeff_arrays()
if config.get_model_config().materials["maxpoles"] > 0:
grid.initialise_dispersive_arrays()
grid.initialise_dispersive_update_coeff_array()
grid._build_materials()
# Check to see if numerical dispersion might be a problem
results = dispersion_analysis(grid)
if results["error"]:
logger.warning(
f"\nNumerical dispersion analysis [{grid.name}] "
f"not carried out as {results['error']}"
)
elif results["N"] < config.get_model_config().numdispersion["mingridsampling"]:
logger.exception(
f"\nNon-physical wave propagation in [{grid.name}] "
f"detected. Material '{results['material'].ID}' "
f"has wavelength sampled by {results['N']} cells, "
f"less than required minimum for physical wave "
f"propagation. Maximum significant frequency "
f"estimated as {results['maxfreq']:g}Hz"
)
raise ValueError
elif (
results["deltavp"]
and np.abs(results["deltavp"])
> config.get_model_config().numdispersion["maxnumericaldisp"]
):
logger.warning(
f"\n[{grid.name}] has potentially significant "
f"numerical dispersion. Estimated largest physical "
f"phase-velocity error is {results['deltavp']:.2f}% "
f"in material '{results['material'].ID}' whose "
f"wavelength sampled by {results['N']} cells. "
f"Maximum significant frequency estimated as "
f"{results['maxfreq']:g}Hz"
)
elif results["deltavp"]:
logger.info(
f"\nNumerical dispersion analysis [{grid.name}]: "
f"estimated largest physical phase-velocity error is "
f"{results['deltavp']:.2f}% in material '{results['material'].ID}' "
f"whose wavelength sampled by {results['N']} cells. "
f"Maximum significant frequency estimated as "
f"{results['maxfreq']:g}Hz"
)
# Set default CFS parameter for PMLs if not user provided
if not self.pmls["cfs"]:
self.pmls["cfs"] = [CFS()]
logger.info(print_pml_info(self))
if not all(value == 0 for value in self.pmls["thickness"].values()):
self._build_pmls()
if self.averagevolumeobjects:
self._build_components()
self._tm_grid_update()
self._create_voltage_source_materials()
self.initialise_field_arrays()
self.initialise_std_update_coeff_arrays()
if config.get_model_config().materials["maxpoles"] > 0:
self.initialise_dispersive_arrays()
self.initialise_dispersive_update_coeff_array()
self._build_materials()
def _build_pmls(self) -> None:
pbar = tqdm(
@@ -277,7 +192,7 @@ class FDTDGrid:
elif config.get_model_config().mode == "2D TMz":
self.tmz()
def _update_voltage_source_materials(self):
def _create_voltage_source_materials(self):
# Process any voltage sources (that have resistance) to create a new
# material at the source location
for voltagesource in self.voltagesources:
@@ -651,138 +566,183 @@ class FDTDGrid:
return Iz
def dispersion_analysis(G):
"""Analysis of numerical dispersion (Taflove et al, 2005, p112) -
worse case of maximum frequency and minimum wavelength
Args:
G: FDTDGrid class describing a grid in a model.
Returns:
results: dict of results from dispersion analysis.
"""
# deltavp: physical phase velocity error (percentage)
# N: grid sampling density
# material: material with maximum permittivity
# maxfreq: maximum significant frequency
# error: error message
results = {"deltavp": None, "N": None, "material": None, "maxfreq": [], "error": ""}
# Find maximum significant frequency
if G.waveforms:
for waveform in G.waveforms:
if waveform.type in ["sine", "contsine"]:
results["maxfreq"].append(4 * waveform.freq)
elif waveform.type == "impulse":
results["error"] = "impulse waveform used."
elif waveform.type == "user":
results["error"] = "user waveform detected."
else:
# Time to analyse waveform - 4*pulse_width as using entire
# time window can result in demanding FFT
waveform.calculate_coefficients()
iterations = round_value(4 * waveform.chi / G.dt)
iterations = min(iterations, G.iterations)
waveformvalues = np.zeros(G.iterations)
for iteration in range(G.iterations):
waveformvalues[iteration] = waveform.calculate_value(iteration * G.dt, G.dt)
# Ensure source waveform is not being overly truncated before attempting any FFT
if np.abs(waveformvalues[-1]) < np.abs(np.amax(waveformvalues)) / 100:
# FFT
freqs, power = fft_power(waveformvalues, G.dt)
# Get frequency for max power
freqmaxpower = np.where(np.isclose(power, 0))[0][0]
# Set maximum frequency to a threshold drop from maximum power, ignoring DC value
try:
freqthres = (
np.where(
power[freqmaxpower:]
< -config.get_model_config().numdispersion["highestfreqthres"]
)[0][0]
+ freqmaxpower
)
results["maxfreq"].append(freqs[freqthres])
except ValueError:
results["error"] = (
"unable to calculate maximum power "
+ "from waveform, most likely due to "
+ "undersampling."
)
# Ignore case where someone is using a waveform with zero amplitude, i.e. on a receiver
elif waveform.amp == 0:
pass
# If waveform is truncated don't do any further analysis
else:
results["error"] = (
"waveform does not fit within specified "
+ "time window and is therefore being truncated."
)
else:
results["error"] = "no waveform detected."
if results["maxfreq"]:
results["maxfreq"] = max(results["maxfreq"])
# Find minimum wavelength (material with maximum permittivity)
maxer = 0
matmaxer = ""
for x in G.materials:
if x.se != float("inf"):
er = x.er
# If there are dispersive materials calculate the complex
# relative permittivity at maximum frequency and take the real part
if x.__class__.__name__ == "DispersiveMaterial":
er = x.calculate_er(results["maxfreq"])
er = er.real
if er > maxer:
maxer = er
matmaxer = x.ID
results["material"] = next(x for x in G.materials if x.ID == matmaxer)
# Minimum velocity
minvelocity = config.c / np.sqrt(maxer)
# Minimum wavelength
minwavelength = minvelocity / results["maxfreq"]
# Maximum spatial step
if "3D" in config.get_model_config().mode:
delta = max(G.dx, G.dy, G.dz)
elif "2D" in config.get_model_config().mode:
if G.nx == 1:
delta = max(G.dy, G.dz)
elif G.ny == 1:
delta = max(G.dx, G.dz)
elif G.nz == 1:
delta = max(G.dx, G.dy)
# Courant stability factor
S = (config.c * G.dt) / delta
# Grid sampling density
results["N"] = minwavelength / delta
# Check grid sampling will result in physical wave propagation
if (
int(np.floor(results["N"]))
>= config.get_model_config().numdispersion["mingridsampling"]
def dispersion_analysis(self, iterations: int):
# Check to see if numerical dispersion might be a problem
results = self._dispersion_analysis(iterations)
if results["error"]:
logger.warning(
f"\nNumerical dispersion analysis [{self.name}] "
f"not carried out as {results['error']}"
)
elif results["N"] < config.get_model_config().numdispersion["mingridsampling"]:
logger.exception(
f"\nNon-physical wave propagation in [{self.name}] "
f"detected. Material '{results['material'].ID}' "
f"has wavelength sampled by {results['N']} cells, "
f"less than required minimum for physical wave "
f"propagation. Maximum significant frequency "
f"estimated as {results['maxfreq']:g}Hz"
)
raise ValueError
elif (
results["deltavp"]
and np.abs(results["deltavp"])
> config.get_model_config().numdispersion["maxnumericaldisp"]
):
# Numerical phase velocity
vp = np.pi / (results["N"] * np.arcsin((1 / S) * np.sin((np.pi * S) / results["N"])))
logger.warning(
f"\n[{self.name}] has potentially significant "
f"numerical dispersion. Estimated largest physical "
f"phase-velocity error is {results['deltavp']:.2f}% "
f"in material '{results['material'].ID}' whose "
f"wavelength sampled by {results['N']} cells. "
f"Maximum significant frequency estimated as "
f"{results['maxfreq']:g}Hz"
)
elif results["deltavp"]:
logger.info(
f"\nNumerical dispersion analysis [{self.name}]: "
f"estimated largest physical phase-velocity error is "
f"{results['deltavp']:.2f}% in material '{results['material'].ID}' "
f"whose wavelength sampled by {results['N']} cells. "
f"Maximum significant frequency estimated as "
f"{results['maxfreq']:g}Hz"
)
# Physical phase velocity error (percentage)
results["deltavp"] = (((vp * config.c) - config.c) / config.c) * 100
def _dispersion_analysis(self, iterations: int):
"""Analysis of numerical dispersion (Taflove et al, 2005, p112) -
worse case of maximum frequency and minimum wavelength
# Store rounded down value of grid sampling density
results["N"] = int(np.floor(results["N"]))
Args:
G: FDTDGrid class describing a grid in a model.
return results
Returns:
results: dict of results from dispersion analysis.
"""
# deltavp: physical phase velocity error (percentage)
# N: grid sampling density
# material: material with maximum permittivity
# maxfreq: maximum significant frequency
# error: error message
results = {"deltavp": None, "N": None, "material": None, "maxfreq": [], "error": ""}
# Find maximum significant frequency
if self.waveforms:
for waveform in self.waveforms:
if waveform.type in ["sine", "contsine"]:
results["maxfreq"].append(4 * waveform.freq)
elif waveform.type == "impulse":
results["error"] = "impulse waveform used."
elif waveform.type == "user":
results["error"] = "user waveform detected."
else:
# Time to analyse waveform - 4*pulse_width as using entire
# time window can result in demanding FFT
waveform.calculate_coefficients()
iterations = round_value(4 * waveform.chi / self.dt)
iterations = min(iterations, iterations)
waveformvalues = np.zeros(iterations)
for iteration in range(iterations):
waveformvalues[iteration] = waveform.calculate_value(
iteration * self.dt, self.dt
)
# Ensure source waveform is not being overly truncated before attempting any FFT
if np.abs(waveformvalues[-1]) < np.abs(np.amax(waveformvalues)) / 100:
# FFT
freqs, power = fft_power(waveformvalues, self.dt)
# Get frequency for max power
freqmaxpower = np.where(np.isclose(power, 0))[0][0]
# Set maximum frequency to a threshold drop from maximum power, ignoring DC value
try:
freqthres = (
np.where(
power[freqmaxpower:]
< -config.get_model_config().numdispersion["highestfreqthres"]
)[0][0]
+ freqmaxpower
)
results["maxfreq"].append(freqs[freqthres])
except ValueError:
results["error"] = (
"unable to calculate maximum power "
+ "from waveform, most likely due to "
+ "undersampling."
)
# Ignore case where someone is using a waveform with zero amplitude, i.e. on a receiver
elif waveform.amp == 0:
pass
# If waveform is truncated don't do any further analysis
else:
results["error"] = (
"waveform does not fit within specified "
+ "time window and is therefore being truncated."
)
else:
results["error"] = "no waveform detected."
if results["maxfreq"]:
results["maxfreq"] = max(results["maxfreq"])
# Find minimum wavelength (material with maximum permittivity)
maxer = 0
matmaxer = ""
for x in self.materials:
if x.se != float("inf"):
er = x.er
# If there are dispersive materials calculate the complex
# relative permittivity at maximum frequency and take the real part
if x.__class__.__name__ == "DispersiveMaterial":
er = x.calculate_er(results["maxfreq"])
er = er.real
if er > maxer:
maxer = er
matmaxer = x.ID
results["material"] = next(x for x in self.materials if x.ID == matmaxer)
# Minimum velocity
minvelocity = config.c / np.sqrt(maxer)
# Minimum wavelength
minwavelength = minvelocity / results["maxfreq"]
# Maximum spatial step
if "3D" in config.get_model_config().mode:
delta = max(self.dx, self.dy, self.dz)
elif "2D" in config.get_model_config().mode:
if self.nx == 1:
delta = max(self.dy, self.dz)
elif self.ny == 1:
delta = max(self.dx, self.dz)
elif self.nz == 1:
delta = max(self.dx, self.dy)
# Courant stability factor
S = (config.c * self.dt) / delta
# Grid sampling density
results["N"] = minwavelength / delta
# Check grid sampling will result in physical wave propagation
if (
int(np.floor(results["N"]))
>= config.get_model_config().numdispersion["mingridsampling"]
):
# Numerical phase velocity
vp = np.pi / (
results["N"] * np.arcsin((1 / S) * np.sin((np.pi * S) / results["N"]))
)
# Physical phase velocity error (percentage)
results["deltavp"] = (((vp * config.c) - config.c) / config.c) * 100
# Store rounded down value of grid sampling density
results["N"] = int(np.floor(results["N"]))
return results

查看文件

@@ -19,7 +19,7 @@
import datetime
import logging
import sys
from typing import List, Union
from typing import List
import humanize
import numpy as np
@@ -28,7 +28,6 @@ from colorama import Fore, Style, init
from gprMax.grid.cuda_grid import CUDAGrid
from gprMax.grid.opencl_grid import OpenCLGrid
from gprMax.materials import ListMaterial, Material, PeplinskiSoil, RangeMaterial
from gprMax.subgrids.grid import SubGridBaseGrid
init()
@@ -41,7 +40,7 @@ from .fields_outputs import write_hdf5_outputfile
from .geometry_outputs import GeometryObjects, GeometryView, save_geometry_views
from .grid.fdtd_grid import FDTDGrid
from .snapshots import save_snapshots
from .utilities.host_info import set_omp_threads
from .utilities.host_info import mem_check_build_all, mem_check_run_all, set_omp_threads
from .utilities.utilities import get_terminal_width
logger = logging.getLogger(__name__)
@@ -206,11 +205,60 @@ class Model:
def build_geometry(self):
logger.info(config.get_model_config().inputfilestr)
# TODO: Make this correctly sets local nx, ny and nz when using MPI (likely use a function inside FDTDGrid/MPIGrid)
# Print info on any subgrids
for subgrid in self.subgrids:
subgrid.print_info()
# Combine available grids
grids = [self.G] + self.subgrids
self._check_for_dispersive_materials(grids)
self._check_memory_requirements(grids)
# TODO: Make this correctly set local nx, ny and nz when using MPI (likely use a function inside FDTDGrid/MPIGrid)
self.G.nx = self.gnx
self.G.ny = self.gny
self.G.nz = self.gnz
self.G.build()
for grid in grids:
grid.build()
grid.dispersion_analysis(self.iterations)
def _check_for_dispersive_materials(self, grids: List[FDTDGrid]):
# Check for dispersive materials (and specific type)
if config.get_model_config().materials["maxpoles"] != 0:
# TODO: This sets materials["drudelorentz"] based only the
# last grid/subgrid. Is that correct?
for grid in grids:
config.get_model_config().materials["drudelorentz"] = any(
[m for m in grid.materials if "drude" in m.type or "lorentz" in m.type]
)
# Set data type if any dispersive materials (must be done before memory checks)
config.get_model_config().set_dispersive_material_types()
def _check_memory_requirements(self, grids: List[FDTDGrid]):
# Check memory requirements to build model/scene (different to memory
# requirements to run model when FractalVolumes/FractalSurfaces are
# used as these can require significant additional memory)
total_mem_build, mem_strs_build = mem_check_build_all(grids)
# Check memory requirements to run model
total_mem_run, mem_strs_run = mem_check_run_all(grids)
if total_mem_build > total_mem_run:
logger.info(
f'\nMemory required (estimated): {" + ".join(mem_strs_build)} + '
f"~{humanize.naturalsize(config.get_model_config().mem_overhead)} "
f"overhead = {humanize.naturalsize(total_mem_build)}"
)
else:
logger.info(
f'\nMemory required (estimated): {" + ".join(mem_strs_run)} + '
f"~{humanize.naturalsize(config.get_model_config().mem_overhead)} "
f"overhead = {humanize.naturalsize(total_mem_run)}"
)
def reuse_geometry(self):
s = (