import os import sys import torch import numpy as np import matplotlib.pyplot as plt from torch.utils.data import Dataset from scipy.ndimage import zoom, gaussian_filter1d import torch.nn.functional as F # Add parent directory to path for config import sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from config import Network_train_Config as cfg data_length=cfg.data_length def convolve_signals(signal1, signal2): """ Perform cyclic convolution using FFT. Output length is max(len(signal1), len(signal2)). """ len_signal = max(len(signal1), len(signal2)) return np.fft.ifft(np.fft.fft(signal1, len_signal) * np.fft.fft(signal2, len_signal)).real def apply_exponential_gain(data, gain_factor): """ Apply exponential gain to a signal. """ data = np.asarray(data) indices = np.arange(len(data)) gain = np.exp(gain_factor * indices) return data * gain def shift_data_to_end(data, n): """ Shift the first n elements of a 1D array to the end. """ if n < 0 or n > len(data): raise ValueError("Shift length n must be within data length range.") return np.concatenate((data[n:], data[:n])) def shift_data_to_front(data, n): """ Shift the last n elements of a 1D array to the front. """ if n < 0 or n > len(data): raise ValueError("Shift length n must be within data length range.") return np.concatenate((data[-n:], data[:-n])) class MyDataset(Dataset): def __init__(self, data_file, label_file, impulse_field_file, impulse_sim_file, mode='train', check=False, noise_coff=cfg.noise_coff, initial_params=None): super(MyDataset, self).__init__() self.mode = mode self.check = check self.noise_coff = noise_coff self.data = np.delete(np.loadtxt(data_file, delimiter=","), [0], axis=0) self.labels = np.delete(np.loadtxt(label_file, delimiter=","), [0], axis=0) if self.mode in ['train', 'apply']: self.impulse_field = np.loadtxt(impulse_field_file, delimiter=",") self.impulse_sim = np.loadtxt(impulse_sim_file, delimiter=",") else: raise ValueError("Mode must be either 'train' or 'apply'") if self.mode == 'apply': self.initial_model = self.generate_initial_model(*initial_params, total_length=data_length) self.data = self.data.T self.labels = self.labels.T def __len__(self): return self.data.shape[0] def __getitem__(self, index): data_raw = self.data[index] label_data = self.labels[index] impulse_field = self.impulse_field impulse_sim = self.impulse_sim data_raw = zoom(data_raw, data_length / len(data_raw)) label_data = zoom(label_data, data_length / len(label_data)) impulse_field = zoom(impulse_field, data_length / len(impulse_field)) impulse_sim = zoom(impulse_sim, data_length / len(impulse_sim)) if self.mode == 'train': data_gained = apply_exponential_gain(data_raw, 0.00) data_noise1 = np.random.normal(0, self.noise_coff * np.max(abs(data_gained)), size=data_gained.shape) data_noise2 = np.random.normal(0, self.noise_coff * np.max(abs(data_gained)), size=data_gained.shape) data_noise1 = convolve_signals(impulse_field, data_noise1) data_noise1 = convolve_signals(impulse_sim, data_noise1) data_noise2 = convolve_signals(impulse_sim, data_noise2) data_gained = convolve_signals(impulse_field, data_gained) data_gained = shift_data_to_end(data_gained, cfg.shift_distance) data_noised = data_gained + self.noise_coff * data_noise1 + self.noise_coff * data_noise2 data_data = data_noised elif self.mode == 'apply': data_meta = data_raw data_data = convolve_signals(impulse_sim, data_meta) data_data = shift_data_to_end(data_data, cfg.shift_distance) data_data = data_data / np.max(abs(data_data)) # Construct initial model if self.mode == 'train': initial_model = np.full(data_length, label_data[1]) elif self.mode == 'apply': initial_model = self.initial_model initial_model = zoom(initial_model, data_length / len(initial_model)) initial_model = gaussian_filter1d(initial_model, sigma=data_length / 5) data_data = data_data / np.max(abs(data_data)) * 0.5 if self.check: plt.figure(figsize=(10, 7)) titles = [ ('label_data', label_data), ('initial_model', initial_model), ('data_raw', data_raw), ('data_noise1', data_noise1), ('data_noise2', data_noise2), ('impulse_field', impulse_field), ('impulse_sim', impulse_sim), ('data_without_noise', data_gained), ('data_noised', data_data), ] for i, (title, signal) in enumerate(titles): plt.subplot(9, 1, i + 1) plt.plot(signal, label=title, color='blue') plt.grid(alpha=0.3) plt.legend() plt.show() # Convert to PyTorch tensors data_data = torch.from_numpy(data_data).float().unsqueeze(0) label_data = torch.from_numpy(label_data).float().unsqueeze(0) initial_model = torch.from_numpy(initial_model).float().unsqueeze(0) data_data = data_data + 0.5 label_data = label_data / cfg.max_permittivity initial_model = initial_model / cfg.max_permittivity input_data = torch.cat((data_data, initial_model), dim=0) return input_data, label_data def generate_initial_model(self, epsilon, thickness, total_length=data_length): """ Generate an initial layered model. Parameters: - epsilon: List of permittivity values for each layer. - thickness: Corresponding layer thicknesses. - total_length: Total model length. Returns: - A 1D numpy array of length total_length. """ layer_points = (np.array(thickness) / np.sum(thickness) * total_length).astype(int) diff = total_length - np.sum(layer_points) layer_points[0] += diff # Adjust rounding difference initial_model = np.concatenate([np.full(points, eps) for eps, points in zip(epsilon, layer_points)]) return initial_model