Signed-off-by: 葛峻恺 <202115006@mail.sdu.edu.cn>
这个提交包含在:
葛峻恺
2025-04-07 12:17:39 +00:00
提交者 Gitee
父节点 06ab241903
当前提交 699f32f283
共有 11 个文件被更改,包括 819 次插入0 次删除

68
5_data_preprocess.py 普通文件
查看文件

@@ -0,0 +1,68 @@
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.ndimage
from readgssi.dzt import readdzt
from scipy.signal import tukey
from config import Field_data_test_Config as cfg
from config import Path_Config as pcfg
from utils.plot import plot_BSCAN_data
def dewow_windowed(in_array, window_size=40):
"""
Apply a sliding window mean subtraction (dewowing) to remove low-frequency noise.
"""
if len(in_array) < window_size:
return in_array - np.mean(in_array)
cumsum = np.cumsum(np.insert(in_array, 0, 0))
mean_vals = (cumsum[window_size:] - cumsum[:-window_size]) / window_size
mean_vals = np.concatenate((
np.full(window_size // 2, mean_vals[0]),
mean_vals,
np.full(len(in_array) - len(mean_vals) - window_size // 2, mean_vals[-1])
))
return in_array - mean_vals
def remove_bad_channels(data, remove_start, remove_end):
"""
Remove unwanted channels (columns) from a 2D array.
"""
remove_start = max(0, remove_start)
remove_end = min(data.shape[1], remove_end)
return np.delete(data, np.s_[remove_start:remove_end], axis=1)
def taper_field(in_array):
"""
Apply a tapering window to smooth signal edges and reduce artifacts.
"""
length = len(in_array)
window = np.ones(length)
taper_length = in_array.shape[0] // 2
taper = tukey(2 * taper_length, 0.7)[:taper_length]
window[:taper_length] = taper
window = window ** 3
return in_array * window
if __name__ == "__main__":
TEST_FILE = pcfg.CONVERTED_TEST_FILE
bad_trace=cfg.bad_trace
detection_distance=cfg.detection_distance
time_window_length=cfg.time_window_length
data = np.loadtxt(TEST_FILE, delimiter=",", skiprows=0)[1:, :] # Remove first row
data = remove_bad_channels(data, bad_trace[0], bad_trace[1])
data = np.apply_along_axis(dewow_windowed, 0, data)
data = scipy.ndimage.zoom(data, (time_window_length/data.shape[0],detection_distance/data.shape[1]), order=1)
data = np.apply_along_axis(taper_field, 0, data)
data = np.apply_along_axis(taper_field, 0, data)
data = data / np.max(np.abs(data))
# Save processed data
plot_BSCAN_data(data, pcfg.PROCESSED_TEST_FILE_img, line_length=100, time_length=200, ratio=0.5)
pd.DataFrame(data).to_csv(pcfg.PROCESSED_TEST_FILE, index=False)