文件
gpr-sidl-inv/5_data_preprocess.py
葛峻恺 699f32f283 program
Signed-off-by: 葛峻恺 <202115006@mail.sdu.edu.cn>
2025-04-07 12:17:39 +00:00

69 行
2.4 KiB
Python

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.ndimage
from readgssi.dzt import readdzt
from scipy.signal import tukey
from config import Field_data_test_Config as cfg
from config import Path_Config as pcfg
from utils.plot import plot_BSCAN_data
def dewow_windowed(in_array, window_size=40):
"""
Apply a sliding window mean subtraction (dewowing) to remove low-frequency noise.
"""
if len(in_array) < window_size:
return in_array - np.mean(in_array)
cumsum = np.cumsum(np.insert(in_array, 0, 0))
mean_vals = (cumsum[window_size:] - cumsum[:-window_size]) / window_size
mean_vals = np.concatenate((
np.full(window_size // 2, mean_vals[0]),
mean_vals,
np.full(len(in_array) - len(mean_vals) - window_size // 2, mean_vals[-1])
))
return in_array - mean_vals
def remove_bad_channels(data, remove_start, remove_end):
"""
Remove unwanted channels (columns) from a 2D array.
"""
remove_start = max(0, remove_start)
remove_end = min(data.shape[1], remove_end)
return np.delete(data, np.s_[remove_start:remove_end], axis=1)
def taper_field(in_array):
"""
Apply a tapering window to smooth signal edges and reduce artifacts.
"""
length = len(in_array)
window = np.ones(length)
taper_length = in_array.shape[0] // 2
taper = tukey(2 * taper_length, 0.7)[:taper_length]
window[:taper_length] = taper
window = window ** 3
return in_array * window
if __name__ == "__main__":
TEST_FILE = pcfg.CONVERTED_TEST_FILE
bad_trace=cfg.bad_trace
detection_distance=cfg.detection_distance
time_window_length=cfg.time_window_length
data = np.loadtxt(TEST_FILE, delimiter=",", skiprows=0)[1:, :] # Remove first row
data = remove_bad_channels(data, bad_trace[0], bad_trace[1])
data = np.apply_along_axis(dewow_windowed, 0, data)
data = scipy.ndimage.zoom(data, (time_window_length/data.shape[0],detection_distance/data.shape[1]), order=1)
data = np.apply_along_axis(taper_field, 0, data)
data = np.apply_along_axis(taper_field, 0, data)
data = data / np.max(np.abs(data))
# Save processed data
plot_BSCAN_data(data, pcfg.PROCESSED_TEST_FILE_img, line_length=100, time_length=200, ratio=0.5)
pd.DataFrame(data).to_csv(pcfg.PROCESSED_TEST_FILE, index=False)