Add VtkUnstructuredGrid class

这个提交包含在:
nmannall
2024-11-08 12:16:43 +00:00
父节点 ad50e2ff91
当前提交 b886e8e3ee

查看文件

@@ -1,6 +1,7 @@
import logging
from abc import abstractmethod
from contextlib import AbstractContextManager
from enum import Enum
from os import PathLike
from pathlib import Path
from types import TracebackType
@@ -19,9 +20,15 @@ class VtkHdfFile(AbstractContextManager):
FILE_EXTENSION = ".vtkhdf"
ROOT_GROUP = "VTKHDF"
# TODO: Can this be moved to using an Enum like root datasets?
# Main barrier: Can't subclass an enum with members and any base
# Enum class would need VERSION and TYPE as members.
VERSION_ATTR = "Version"
TYPE_ATTR = "Type"
class Dataset(str, Enum):
pass
@property
@abstractmethod
def TYPE(self) -> str:
@@ -139,12 +146,72 @@ class VtkHdfFile(AbstractContextManager):
"""
return "/".join([self.ROOT_GROUP, *path])
def _get_root_dataset(self, name: "VtkHdfFile.Dataset") -> h5py.Dataset:
"""Get specified dataset from the root group of the VTKHDF file.
Args:
path: Name of the dataset.
Returns:
dataset: Returns specified h5py dataset.
"""
path = self._build_dataset_path(name)
return self._get_dataset(path)
def _get_dataset(self, path: str) -> h5py.Dataset:
"""Get specified dataset.
Args:
path: Absolute path to the dataset.
Returns:
dataset: Returns specified h5py dataset.
Raises:
KeyError: Raised if the dataset does not exist, or the path
points to some other object, e.g. a Group not a Dataset.
"""
cls = self.file_handler.get(path, getclass=True)
if cls == "default":
raise KeyError("Path does not exist")
elif cls != h5py.Dataset:
raise KeyError(f"Dataset not found. Found '{cls}' instead")
dataset = self.file_handler.get(path)
assert isinstance(dataset, h5py.Dataset)
return dataset
def _write_root_dataset(
self,
name: "VtkHdfFile.Dataset",
data: npt.ArrayLike,
shape: Optional[npt.NDArray[np.intc]] = None,
offset: Optional[npt.NDArray[np.intc]] = None,
xyz_data_ordering=True,
):
"""Write specified dataset to the root group of the VTKHDF file.
Args:
name: Name of the dataset.
data: Data to initialize the dataset.
shape (optional): Size of the full dataset being created.
Can be omitted if data provides the full dataset.
offset (optional): Offset to store the provided data at. Can
be omitted if data provides the full dataset.
xyz_data_ordering (optional): If True, the data will be
transposed as VTKHDF stores datasets using ZYX ordering.
Default True.
"""
path = self._build_dataset_path(name)
self._write_dataset(path, data, shape, offset, xyz_data_ordering)
def _write_dataset(
self,
path: str,
data: npt.ArrayLike,
shape: Optional[npt.NDArray[np.intc]] = None,
offset: Optional[npt.NDArray[np.intc]] = None,
xyz_data_ordering=True,
):
"""Write specified dataset to the VTKHDF file.
@@ -158,6 +225,9 @@ class VtkHdfFile(AbstractContextManager):
Can be omitted if data provides the full dataset.
offset (optional): Offset to store the provided data at. Can
be omitted if data provides the full dataset.
xyz_data_ordering (optional): If True, the data will be
transposed as VTKHDF stores datasets using ZYX ordering.
Default True.
Raises:
ValueError: Raised if the combination of data.shape, shape,
@@ -168,7 +238,8 @@ class VtkHdfFile(AbstractContextManager):
data = np.array([data])
# VTKHDF stores datasets using ZYX ordering rather than XYZ
data = data.transpose()
if xyz_data_ordering:
data = data.transpose()
if shape is not None:
shape = np.flip(shape)
@@ -448,3 +519,189 @@ class VtkImageData(VtkHdfFile):
f" VtkImageData object. {data.shape} != {self.shape}"
)
return super().add_cell_data(name, data, self.shape, offset)
class VtkCellType(np.uint8, Enum):
"""VTK cell types as defined here:
https://vtk.org/doc/nightly/html/vtkCellType_8h_source.html#l00019
"""
# Linear cells
EMPTY_CELL = 0
VERTEX = 1
POLY_VERTEX = 2
LINE = 3
POLY_LINE = 4
TRIANGLE = 5
TRIANGLE_STRIP = 6
POLYGON = 7
PIXEL = 8
QUAD = 9
TETRA = 10
VOXEL = 11
HEXAHEDRON = 12
WEDGE = 13
PYRAMID = 14
PENTAGONAL_PRISM = 15
HEXAGONAL_PRISM = 16
class VtkUnstructuredGrid(VtkHdfFile):
"""File handler for creating a VTKHDF Unstructured Grid file.
File format information is available here:
https://docs.vtk.org/en/latest/design_documents/VTKFileFormats.html#unstructured-grid
"""
class Dataset(VtkHdfFile.Dataset):
CONNECTIVITY = "Connectivity"
NUMBER_OF_CELLS = "NumberOfCells"
NUMBER_OF_CONNECTIVITY_IDS = "NumberOfConnectivityIds"
NUMBER_OF_POINTS = "NumberOfPoints"
OFFSETS = "Offsets"
POINTS = "Points"
TYPES = "Types"
@property
def TYPE(self) -> Literal["UnstructuredGrid"]:
return "UnstructuredGrid"
def __init__(
self,
filename: Union[str, PathLike],
points: npt.NDArray,
cell_types: npt.NDArray[VtkCellType],
connectivity: npt.NDArray,
cell_offsets: npt.NDArray,
comm: Optional[MPI.Comm] = None,
) -> None:
"""Create a new VtkUnstructuredGrid file.
An unstructured grid has N points and C cells. A cell is defined
as a collection of points which is specified by the connectivity
and cell_offsets arguments along with the list of cell_types.
If the file already exists, it will be overriden. Required
attributes (Type and Version) will be written to the file.
The file will be opened using the 'mpio' h5py driver if an MPI
communicator is provided.
Args:
filename: Name of the file (can be a file path). The file
extension will be set to '.vtkhdf'.
points: Array of point coordinates of shape (N, 3).
cell_types: Array of VTK cell types of shape (C,).
connectivity: Array of point IDs that together with
cell_offsets, defines the points that make up each cell.
Each point ID has a value between 0 and (N-1) inclusive
and corresponds to a point in the points array.
cell_offsets: Array listing where each cell starts and ends
in the connectivity array. It has shape (C + 1,).
comm (optional): MPI communicator containing all ranks that
want to write to the file.
Raises:
Value Error: Raised if argument dimensions are invalid.
"""
super().__init__(filename, comm)
if len(cell_offsets) != len(cell_types) + 1:
raise ValueError(
"cell_offsets should be one longer than cell_types."
" I.e. one longer than the number of cells"
)
is_sorted = lambda a: np.all(a[:-1] <= a[1:])
if not is_sorted(cell_offsets):
raise ValueError("cell_offsets should be sorted in ascending order")
if len(connectivity) < cell_offsets[-1]:
raise ValueError("Connectivity array is shorter than final cell_offsets value")
elif len(connectivity) > cell_offsets[-1]:
raise logger.warning(
"Connectivity array longer than final cell_offsets value."
" Some connectivity data will be ignored"
)
self._write_root_dataset(self.Dataset.CONNECTIVITY, connectivity)
self._write_root_dataset(self.Dataset.NUMBER_OF_CELLS, len(cell_types))
self._write_root_dataset(self.Dataset.NUMBER_OF_CONNECTIVITY_IDS, len(connectivity))
self._write_root_dataset(self.Dataset.NUMBER_OF_POINTS, len(points))
self._write_root_dataset(self.Dataset.OFFSETS, cell_offsets)
self._write_root_dataset(self.Dataset.POINTS, points, xyz_data_ordering=False)
self._write_root_dataset(self.Dataset.TYPES, cell_types)
@property
def number_of_cells(self) -> int:
number_of_cells = self._get_root_dataset(self.Dataset.NUMBER_OF_CELLS)
return np.sum(number_of_cells, dtype=np.intc)
@property
def number_of_connectivity_ids(self) -> int:
number_of_connectivity_ids = self._get_root_dataset(self.Dataset.NUMBER_OF_CONNECTIVITY_IDS)
return np.sum(number_of_connectivity_ids, dtype=np.intc)
@property
def number_of_points(self) -> int:
number_of_points = self._get_root_dataset(self.Dataset.NUMBER_OF_POINTS)
return np.sum(number_of_points, dtype=np.intc)
def add_point_data(
self, name: str, data: npt.NDArray, offset: Optional[npt.NDArray[np.intc]] = None
):
"""Add point data to the VTKHDF file.
Args:
name: Name of the dataset.
data: Data to be saved.
offset (optional): Offset to store the provided data at. Can
be omitted if data provides the full dataset.
Raises:
ValueError: Raised if data has invalid dimensions.
"""
shape = np.array(data.shape)
number_of_dimensions = len(shape)
if number_of_dimensions < 1 or number_of_dimensions > 2:
raise ValueError(f"Data must have 1 or 2 dimensions, not {number_of_dimensions}")
elif len(data) != self.number_of_points:
raise ValueError(
"Length of data must match the number of points in the vtkUnstructuredGrid."
f" {len(data)} != {self.number_of_points}"
)
elif number_of_dimensions == 2 and shape[1] != 1 and shape[1] != 3:
raise ValueError(f"The second dimension should have shape 1 or 3, not {shape[1]}")
return super().add_point_data(name, data, shape, offset)
def add_cell_data(
self, name: str, data: npt.NDArray, offset: Optional[npt.NDArray[np.intc]] = None
):
"""Add cell data to the VTKHDF file.
Args:
name: Name of the dataset.
data: Data to be saved.
offset (optional): Offset to store the provided data at. Can
be omitted if data provides the full dataset.
Raises:
ValueError: Raised if data has invalid dimensions.
"""
shape = np.array(data.shape)
number_of_dimensions = len(shape)
if number_of_dimensions < 1 or number_of_dimensions > 2:
raise ValueError(f"Data must have 1 or 2 dimensions, not {number_of_dimensions}.")
elif len(data) != self.number_of_cells:
raise ValueError(
"Length of data must match the number of cells in the vtkUnstructuredGrid."
f" {len(data)} != {self.number_of_cells}"
)
elif number_of_dimensions == 2 and shape[1] != 1 and shape[1] != 3:
raise ValueError(f"The second dimension should have shape 1 or 3, not {shape[1]}")
return super().add_cell_data(name, data, shape, offset)