diff --git a/6_extract_impulse.py b/6_extract_impulse.py new file mode 100644 index 0000000..d0db9c4 --- /dev/null +++ b/6_extract_impulse.py @@ -0,0 +1,104 @@ +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +from scipy.interpolate import interp1d +from config import Field_data_test_Config as cfg +from config import Path_Config as pcfg + +# taper cut +def apply_taper_window(data, start_idx, end_idx, left_frac=0.15, right_frac=0.15): + """ + left_frac/right_frac: taper ratio (0–1) + """ + n = len(data) + start = int(max(0, min(n, start_idx))) + end = int(max(0, min(n, end_idx))) + if end <= start: + raise ValueError("end_idx must be greater than start_idx.") + + N = end - start + L = int(np.clip(int(N * float(left_frac)), 0, N)) + R = int(np.clip(int(N * float(right_frac)), 0, N - L)) + + win = np.ones(N, dtype=float) + if L > 0: + win[:L] = 0.5 - 0.5 * np.cos(np.linspace(0, np.pi, L)) # 0 -> 1 + if R > 0: + win[-R:] = 0.5 - 0.5 * np.cos(np.linspace(np.pi, 0, R)) # 1 -> 0 + + mask = np.zeros(n, dtype=float) + mask[start:end] = win + return data * mask + +def shift_data_to_end(data, n): + if n < 0 or n > len(data): + raise ValueError("Shift length n must be between 0 and the data length.") + return np.concatenate((data[n:], data[:n])) + +def read_file(file_path, idx): + if file_path.endswith('.csv'): + data = pd.read_csv(file_path, header=None) + one_d_data = data.iloc[1:, idx].values + elif file_path.endswith('.txt'): + data = np.loadtxt(file_path) + one_d_data = data if data.ndim == 1 else data[:, 0] + else: + raise ValueError("Unsupported file format. Please provide a .csv or .txt file.") + return one_d_data + +def normalize_data(data): + max_abs_value = np.max(np.abs(data)) + return data if max_abs_value == 0 else data / max_abs_value + +def interpolate_data(data, target_length=1000): + original_length = len(data) + x_original = np.linspace(0, 1, original_length) + x_target = np.linspace(0, 1, target_length) + interpolator = interp1d(x_original, data, kind='cubic') + return interpolator(x_target) + +def save_to_csv(data, output_path): + pd.DataFrame(data).to_csv(output_path, index=False, header=False) + +def plot_data(original_data, interpolated_data): + plt.figure(figsize=(10, 5)) + plt.plot(original_data, label='Original (after taper)', linestyle='-') + plt.plot( + np.linspace(0, len(original_data)-1, len(interpolated_data)), + interpolated_data, label='Interpolated', linewidth=2 + ) + plt.legend(); plt.xlabel('Index'); plt.ylabel('Value') + plt.title('Soft-gated (tapered) Window + Interpolation') + plt.grid(alpha=0.3) + plt.show() + +# —— Main pipeline update: use a soft taper instead of a hard cut —— +if __name__ == "__main__": + input_file = pcfg.PROCESSED_TEST_FILE + output_csv = pcfg.field_impulse + idx = cfg.refer_wave_idx + static_time = cfg.static_time + wavelet_range = cfg.wavelet_range # [start_idx, end_idx] + + # Optional: allow taper ratios to be configured in cfg; fall back to defaults here + taper_left_frac = getattr(cfg, 'taper_left_frac', 0.15) # 15% on the left + taper_right_frac = getattr(cfg, 'taper_right_frac', 0.15) # 15% on the right + + data = read_file(input_file, idx) + normalized_data = -normalize_data(data) + + # Key step: apply a soft-gated window (taper at both ends) to the selected interval; samples outside become 0 + tapered_data = apply_taper_window( + normalized_data, + wavelet_range[0], wavelet_range[1], + left_frac=taper_left_frac, right_frac=taper_right_frac + ) + + save_to_csv(tapered_data, './impulse/initial_impulse.csv') + + # Then interpolate to the target length (recommended: interpolate after tapering) + interpolated_data = interpolate_data(tapered_data, cfg.time_window_length) + interpolated_data = shift_data_to_end(interpolated_data, static_time) + + save_to_csv(interpolated_data, output_csv) + plot_data(tapered_data, interpolated_data)