你已经派生过 gpr-sidl-inv
镜像自地址
https://gitee.com/sduem/gpr-sidl-inv.git
已同步 2025-09-19 01:03:50 +08:00
104
6_extract_impulse.py
普通文件
104
6_extract_impulse.py
普通文件
@@ -0,0 +1,104 @@
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
from scipy.interpolate import interp1d
|
||||
from config import Field_data_test_Config as cfg
|
||||
from config import Path_Config as pcfg
|
||||
|
||||
# taper cut
|
||||
def apply_taper_window(data, start_idx, end_idx, left_frac=0.15, right_frac=0.15):
|
||||
"""
|
||||
left_frac/right_frac: taper ratio (0–1)
|
||||
"""
|
||||
n = len(data)
|
||||
start = int(max(0, min(n, start_idx)))
|
||||
end = int(max(0, min(n, end_idx)))
|
||||
if end <= start:
|
||||
raise ValueError("end_idx must be greater than start_idx.")
|
||||
|
||||
N = end - start
|
||||
L = int(np.clip(int(N * float(left_frac)), 0, N))
|
||||
R = int(np.clip(int(N * float(right_frac)), 0, N - L))
|
||||
|
||||
win = np.ones(N, dtype=float)
|
||||
if L > 0:
|
||||
win[:L] = 0.5 - 0.5 * np.cos(np.linspace(0, np.pi, L)) # 0 -> 1
|
||||
if R > 0:
|
||||
win[-R:] = 0.5 - 0.5 * np.cos(np.linspace(np.pi, 0, R)) # 1 -> 0
|
||||
|
||||
mask = np.zeros(n, dtype=float)
|
||||
mask[start:end] = win
|
||||
return data * mask
|
||||
|
||||
def shift_data_to_end(data, n):
|
||||
if n < 0 or n > len(data):
|
||||
raise ValueError("Shift length n must be between 0 and the data length.")
|
||||
return np.concatenate((data[n:], data[:n]))
|
||||
|
||||
def read_file(file_path, idx):
|
||||
if file_path.endswith('.csv'):
|
||||
data = pd.read_csv(file_path, header=None)
|
||||
one_d_data = data.iloc[1:, idx].values
|
||||
elif file_path.endswith('.txt'):
|
||||
data = np.loadtxt(file_path)
|
||||
one_d_data = data if data.ndim == 1 else data[:, 0]
|
||||
else:
|
||||
raise ValueError("Unsupported file format. Please provide a .csv or .txt file.")
|
||||
return one_d_data
|
||||
|
||||
def normalize_data(data):
|
||||
max_abs_value = np.max(np.abs(data))
|
||||
return data if max_abs_value == 0 else data / max_abs_value
|
||||
|
||||
def interpolate_data(data, target_length=1000):
|
||||
original_length = len(data)
|
||||
x_original = np.linspace(0, 1, original_length)
|
||||
x_target = np.linspace(0, 1, target_length)
|
||||
interpolator = interp1d(x_original, data, kind='cubic')
|
||||
return interpolator(x_target)
|
||||
|
||||
def save_to_csv(data, output_path):
|
||||
pd.DataFrame(data).to_csv(output_path, index=False, header=False)
|
||||
|
||||
def plot_data(original_data, interpolated_data):
|
||||
plt.figure(figsize=(10, 5))
|
||||
plt.plot(original_data, label='Original (after taper)', linestyle='-')
|
||||
plt.plot(
|
||||
np.linspace(0, len(original_data)-1, len(interpolated_data)),
|
||||
interpolated_data, label='Interpolated', linewidth=2
|
||||
)
|
||||
plt.legend(); plt.xlabel('Index'); plt.ylabel('Value')
|
||||
plt.title('Soft-gated (tapered) Window + Interpolation')
|
||||
plt.grid(alpha=0.3)
|
||||
plt.show()
|
||||
|
||||
# —— Main pipeline update: use a soft taper instead of a hard cut ——
|
||||
if __name__ == "__main__":
|
||||
input_file = pcfg.PROCESSED_TEST_FILE
|
||||
output_csv = pcfg.field_impulse
|
||||
idx = cfg.refer_wave_idx
|
||||
static_time = cfg.static_time
|
||||
wavelet_range = cfg.wavelet_range # [start_idx, end_idx]
|
||||
|
||||
# Optional: allow taper ratios to be configured in cfg; fall back to defaults here
|
||||
taper_left_frac = getattr(cfg, 'taper_left_frac', 0.15) # 15% on the left
|
||||
taper_right_frac = getattr(cfg, 'taper_right_frac', 0.15) # 15% on the right
|
||||
|
||||
data = read_file(input_file, idx)
|
||||
normalized_data = -normalize_data(data)
|
||||
|
||||
# Key step: apply a soft-gated window (taper at both ends) to the selected interval; samples outside become 0
|
||||
tapered_data = apply_taper_window(
|
||||
normalized_data,
|
||||
wavelet_range[0], wavelet_range[1],
|
||||
left_frac=taper_left_frac, right_frac=taper_right_frac
|
||||
)
|
||||
|
||||
save_to_csv(tapered_data, './impulse/initial_impulse.csv')
|
||||
|
||||
# Then interpolate to the target length (recommended: interpolate after tapering)
|
||||
interpolated_data = interpolate_data(tapered_data, cfg.time_window_length)
|
||||
interpolated_data = shift_data_to_end(interpolated_data, static_time)
|
||||
|
||||
save_to_csv(interpolated_data, output_csv)
|
||||
plot_data(tapered_data, interpolated_data)
|
在新工单中引用
屏蔽一个用户