diff --git a/Network/Model.py b/Network/Model.py new file mode 100644 index 0000000..bd588e7 --- /dev/null +++ b/Network/Model.py @@ -0,0 +1,137 @@ +import torch +import torch.nn as nn + +# Define 1D convolution with kernel size 5 +def conv1d_5(inplanes, outplanes, stride=1): + return nn.Conv1d(inplanes, outplanes, kernel_size=5, stride=stride, + padding=2, bias=False) + +# Transformer-based self-attention module +class TransformerLayer(nn.Module): + def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1): + super(TransformerLayer, self).__init__() + self.attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout, batch_first=True) + self.norm1 = nn.LayerNorm(embed_dim) + self.ffn = nn.Sequential( + nn.Linear(embed_dim, ff_dim), + nn.ReLU(), + nn.Linear(ff_dim, embed_dim) + ) + self.norm2 = nn.LayerNorm(embed_dim) + self.dropout = nn.Dropout(dropout) + + def forward(self, x): + x = x.permute(0, 2, 1) # [B, C, L] -> [B, L, C] + attn_output, _ = self.attention(x, x, x) + x = self.norm1(x + self.dropout(attn_output)) + + ffn_output = self.ffn(x) + x = self.norm2(x + self.dropout(ffn_output)) + + x = x.permute(0, 2, 1) # [B, L, C] -> [B, C, L] + return x + +# Downsampling Block using ResNet-style skip connections +class Block(nn.Module): + def __init__(self, inplanes, planes, stride=1, downsample=None, bn=False): + super(Block, self).__init__() + self.bn = bn + self.conv1 = conv1d_5(inplanes, planes, stride) + self.bn1 = nn.BatchNorm1d(planes) + self.relu = nn.ReLU(inplace=False) + self.conv2 = conv1d_5(planes, planes) + self.bn2 = nn.BatchNorm1d(planes) + self.downsample = downsample + + def forward(self, x): + residual = x + out = self.conv1(x) + if self.bn: + out = self.bn1(out) + out = self.relu(out) + + out = self.conv2(out) + if self.bn: + out = self.bn2(out) + out = self.relu(out) + + if self.downsample is not None: + residual = self.downsample(x) + + out = out + residual + out = self.relu(out) + return out + +# Upsampling Block +class Decoder_block(nn.Module): + def __init__(self, inplanes, outplanes, kernel_size=5, stride=5): + super(Decoder_block, self).__init__() + self.upsample = nn.ConvTranspose1d(inplanes, outplanes, + kernel_size=kernel_size, stride=stride, bias=False) + self.conv1 = conv1d_5(inplanes, outplanes) + self.relu = nn.ReLU(inplace=False) + self.conv2 = conv1d_5(outplanes, outplanes) + + def forward(self, x1, x2): + x1 = self.upsample(x1) + out = torch.cat((x1, x2), dim=1) + out = self.conv1(out) + out = self.relu(out) + out = self.conv2(out) + out = self.relu(out) + return out + +# Main Model +class Model(nn.Module): + def __init__(self, inplanes=1, outplanes=2, layers=[2, 2, 2, 2]): + super(Model, self).__init__() + self.inplanes = inplanes + self.outplanes = outplanes + self.encoder1 = self._make_encoder(Block, 32, layers[0], 5) + self.encoder2 = self._make_encoder(Block, 64, layers[1], 5) + self.encoder3 = self._make_encoder(Block, 128, layers[2], 5) + self.encoder4 = self._make_encoder(Block, 256, layers[3], 4) + + # Self-Attention Layer between Encoder and Decoder + self.self_attention = TransformerLayer(embed_dim=256, num_heads=8, ff_dim=512) + + self.decoder3 = Decoder_block(256, 128, stride=4, kernel_size=4) + self.decoder2 = Decoder_block(128, 64) + self.decoder1 = Decoder_block(64, 32) + self.conv1x1 = nn.ConvTranspose1d(32, outplanes, kernel_size=5, stride=5, bias=False) + + def _make_encoder(self, block, planes, blocks, stride=1): + downsample = None + if self.inplanes != planes or stride != 1: + downsample = nn.Conv1d(self.inplanes, planes, kernel_size=1, stride=stride, bias=False) + layers = [block(self.inplanes, planes, stride, downsample)] + self.inplanes = planes + for _ in range(1, blocks): + layers.append(block(self.inplanes, planes)) + return nn.Sequential(*layers) + + def forward(self, x): + down1 = self.encoder1(x) + down2 = self.encoder2(down1) + down3 = self.encoder3(down2) + down4 = self.encoder4(down3) + + # Apply self-attention layer + attention_out = self.self_attention(down4) + + up3 = self.decoder3(attention_out, down3) + up2 = self.decoder2(up3, down2) + up1 = self.decoder1(up2, down1) + out = self.conv1x1(up1) + return out + +# Test function to verify input-output compatibility +if __name__ == "__main__": + model = Model(inplanes=1, outplanes=1, layers=[3, 3, 3, 3]) + model.eval() + image = torch.randn(1, 1, 1000) + with torch.no_grad(): + output = model(image) + print(output.size()) + + diff --git a/Network/MyDataset.py b/Network/MyDataset.py new file mode 100644 index 0000000..2e129a7 --- /dev/null +++ b/Network/MyDataset.py @@ -0,0 +1,171 @@ +import os +import sys +import torch +import numpy as np +import matplotlib.pyplot as plt +from torch.utils.data import Dataset +from scipy.ndimage import zoom, gaussian_filter1d +import torch.nn.functional as F + +# Add parent directory to path for config import +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +from config import Network_train_Config as cfg + +data_length=cfg.data_length + +def convolve_signals(signal1, signal2): + """ + Perform cyclic convolution using FFT. Output length is max(len(signal1), len(signal2)). + """ + len_signal = max(len(signal1), len(signal2)) + return np.fft.ifft(np.fft.fft(signal1, len_signal) * np.fft.fft(signal2, len_signal)).real + + +def apply_exponential_gain(data, gain_factor): + """ + Apply exponential gain to a signal. + """ + data = np.asarray(data) + indices = np.arange(len(data)) + gain = np.exp(gain_factor * indices) + return data * gain + + +def shift_data_to_end(data, n): + """ + Shift the first n elements of a 1D array to the end. + """ + if n < 0 or n > len(data): + raise ValueError("Shift length n must be within data length range.") + return np.concatenate((data[n:], data[:n])) + + +def shift_data_to_front(data, n): + """ + Shift the last n elements of a 1D array to the front. + """ + if n < 0 or n > len(data): + raise ValueError("Shift length n must be within data length range.") + return np.concatenate((data[-n:], data[:-n])) + + +class MyDataset(Dataset): + def __init__(self, data_file, label_file, impulse_field_file, impulse_sim_file, mode='train', + check=False, noise_coff=cfg.noise_coff, initial_params=None): + super(MyDataset, self).__init__() + self.mode = mode + self.check = check + self.noise_coff = noise_coff + + self.data = np.delete(np.loadtxt(data_file, delimiter=","), [0], axis=0) + self.labels = np.delete(np.loadtxt(label_file, delimiter=","), [0], axis=0) + + if self.mode in ['train', 'apply']: + self.impulse_field = np.loadtxt(impulse_field_file, delimiter=",") + self.impulse_sim = np.loadtxt(impulse_sim_file, delimiter=",") + else: + raise ValueError("Mode must be either 'train' or 'apply'") + + if self.mode == 'apply': + self.initial_model = self.generate_initial_model(*initial_params, total_length=data_length) + + self.data = self.data.T + self.labels = self.labels.T + + def __len__(self): + return self.data.shape[0] + + def __getitem__(self, index): + data_raw = self.data[index] + label_data = self.labels[index] + impulse_field = self.impulse_field + impulse_sim = self.impulse_sim + + data_raw = zoom(data_raw, data_length / len(data_raw)) + label_data = zoom(label_data, data_length / len(label_data)) + impulse_field = zoom(impulse_field, data_length / len(impulse_field)) + impulse_sim = zoom(impulse_sim, data_length / len(impulse_sim)) + + if self.mode == 'train': + data_gained = apply_exponential_gain(data_raw, 0.00) + data_noise1 = np.random.normal(0, self.noise_coff * np.max(abs(data_gained)), size=data_gained.shape) + data_noise2 = np.random.normal(0, self.noise_coff * np.max(abs(data_gained)), size=data_gained.shape) + + data_noise1 = convolve_signals(impulse_field, data_noise1) + data_noise1 = convolve_signals(impulse_sim, data_noise1) + + data_noise2 = convolve_signals(impulse_sim, data_noise2) + + data_gained = convolve_signals(impulse_field, data_gained) + data_gained = shift_data_to_end(data_gained, cfg.shift_distance) + + data_noised = data_gained + self.noise_coff * data_noise1 + self.noise_coff * data_noise2 + data_data = data_noised + + elif self.mode == 'apply': + data_meta = data_raw + data_data = convolve_signals(impulse_sim, data_meta) + data_data = shift_data_to_end(data_data, cfg.shift_distance) + data_data = data_data / np.max(abs(data_data)) + + # Construct initial model + if self.mode == 'train': + initial_model = np.full(data_length, label_data[1]) + elif self.mode == 'apply': + initial_model = self.initial_model + initial_model = zoom(initial_model, data_length / len(initial_model)) + initial_model = gaussian_filter1d(initial_model, sigma=data_length / 5) + + data_data = data_data / np.max(abs(data_data)) * 0.5 + + if self.check: + plt.figure(figsize=(10, 7)) + titles = [ + ('label_data', label_data), + ('initial_model', initial_model), + ('data_raw', data_raw), + ('data_noise1', data_noise1), + ('data_noise2', data_noise2), + ('impulse_field', impulse_field), + ('impulse_sim', impulse_sim), + ('data_without_noise', data_gained), + ('data_noised', data_data), + ] + for i, (title, signal) in enumerate(titles): + plt.subplot(9, 1, i + 1) + plt.plot(signal, label=title, color='blue') + plt.grid(alpha=0.3) + plt.legend() + plt.show() + + # Convert to PyTorch tensors + data_data = torch.from_numpy(data_data).float().unsqueeze(0) + label_data = torch.from_numpy(label_data).float().unsqueeze(0) + initial_model = torch.from_numpy(initial_model).float().unsqueeze(0) + + data_data = data_data + 0.5 + label_data = label_data / cfg.max_permittivity + initial_model = initial_model / cfg.max_permittivity + + input_data = torch.cat((data_data, initial_model), dim=0) + + return input_data, label_data + + def generate_initial_model(self, epsilon, thickness, total_length=data_length): + """ + Generate an initial layered model. + + Parameters: + - epsilon: List of permittivity values for each layer. + - thickness: Corresponding layer thicknesses. + - total_length: Total model length. + + Returns: + - A 1D numpy array of length total_length. + """ + layer_points = (np.array(thickness) / np.sum(thickness) * total_length).astype(int) + diff = total_length - np.sum(layer_points) + layer_points[0] += diff # Adjust rounding difference + + initial_model = np.concatenate([np.full(points, eps) for eps, points in zip(epsilon, layer_points)]) + return initial_model diff --git a/Network/__pycache__/Model.cpython-310.pyc b/Network/__pycache__/Model.cpython-310.pyc new file mode 100644 index 0000000..002e7f5 Binary files /dev/null and b/Network/__pycache__/Model.cpython-310.pyc differ diff --git a/Network/__pycache__/MyDataset.cpython-310.pyc b/Network/__pycache__/MyDataset.cpython-310.pyc new file mode 100644 index 0000000..b1c53cc Binary files /dev/null and b/Network/__pycache__/MyDataset.cpython-310.pyc differ