-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_preprocessing.py
More file actions
162 lines (130 loc) · 6.42 KB
/
data_preprocessing.py
File metadata and controls
162 lines (130 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
'''
This Python script preprocesses the ECG data by
(i) fixing the signal length of each sample
(ii) encapsulating them in an HDF5 file
(iii) converting the signals to scalograms in parallelized loops
'''
import numpy as np
import pandas as pd
import h5py
import wfdb
import os
import glob
import neurokit2 as nk
import pywt
import matplotlib.pyplot as plt
from tqdm import tqdm
import time
from joblib import Parallel, delayed
from typing import List
def fix_signal_length(file_list: List[str]) -> np.array:
'''
This function fixes the lengths of the ECG data. The maximum recording length is slightly longer tha 60s, which at a sampling rate of 300 Hz, gives a signal length
of over 18,200. Readings that are given by arrays of length less than 18,000 are padded with zeros to the right.
The function takes as input a directory of all the .mat and .hea files from which the signals are extracted and flattened. Each of the padded
array as stored in a 2d array that is returned as output.
There are a few entries which slightly exceed 60s (hence, have more than 18,000 data points). Such entries have been restricted to array size of
18,000.
Args: file_list - list of file directories
Returns: data_array - Numpy array containing ECG values
'''
data_array = np.zeros((len(file_list), 18000)) # all the different signals are stored along the rows
for i in range(len(file_list)):
record = wfdb.rdrecord(file_list[i]).__dict__
signal = record['p_signal'].flatten() # signals are stored as (N,1) arrays; flatten them to 1d
sig_len = record['sig_len']
if sig_len < 18000:
extra_dim = 18000 - sig_len # extra padding dimensions
signal_padded = np.pad(signal, (0, extra_dim), 'constant', constant_values=0)
data_array[i, :] = signal_padded
elif sig_len == 18000:
data_array[i, :] = signal # no padding if sig_len = 18000
elif sig_len > 18000:
data_array[i, :] = signal[0:18000] # truncate signal length to 18000
return data_array
def make_HDF5(data_array: np.array, file_list: List[str]) -> None:
'''
This function takes the signal array and the file list and packs them into an HDF5 file.
Args: (i) data_array - Numpy array containing ECG values
(ii) file_list - list of file directories
Returns: None
'''
with h5py.File('./data/data.h5', 'w') as f:
for i in range(len(file_list)):
key = wfdb.rdrecord(file_list[i]).__dict__['record_name']
f.create_dataset(key, data=data_array[i, :])
def RandomScalogramGenerator(label_list: pd.DataFrame | pd.Series, data_array: np.array, type: int) -> None:
'''
This function randomly generates scalogram of a chosen type of cardiac rhythm
Type: 0 - Afib, 1 - Normal, 2 - Other, 3 - Noise
Args: (i) label_list - list of encoded ECG labels as Pandas DataFrame or Series
(ii) data_array - Numpy array containing the ECG values
(iii) type - Type of ECG sample to be generated
Returns: None
'''
label_subset = label_list[
label_list['label']==type
].index.to_list()
random_idx = np.random.choice(label_subset, size=1).item()
raw_signal = data_array[random_idx, :]
sampling_times = np.linspace(0.0, 60.0, 18000)
coeff_mat, freq = signal_CWT(raw_signal)
fig, ax = plt.subplots(2, 1, figsize=(12, 8), sharex=True)
ax[0].plot(sampling_times, raw_signal)
pcm = ax[1].pcolormesh(sampling_times, freq, coeff_mat, cmap='jet')
ax[1].set_yscale('log')
plt.show()
def signal_CWT(key: str, sampling_rate: int=300, method: str='hamilton2002', wavelet: str='cmor2.5-1.0') -> None:
'''
This function imports the HDF5 data file and iterates through each signal by key. Each signal is
cleaned using neurokit's nk.ecg_clean() function with 'neurokit' as the default method. The cleaned
signal is then passed through pywt continuous wavelet transform function with the complex Morlet wavelet
used as default. The scalogram plots for each signal are saved in a separate directory as .png images
which are represented by 224 x 224 x 3 matrices.
Args: (i) key - HdF5 key identifying each ECG data file
(ii) sampling_rate - number of samples per second of the signal; defaults to 300 Hz
(iii) method - QRS complex segmentation algorithm; defaults to `hamilton2002`
(iv) wavelet - wavelet transform kernel; defaults to `cmor2.5-1.0`
Returns: None
'''
with h5py.File('./data/data.h5', 'r') as f:
raw_signal = np.array(f[key])
sampling_times = np.linspace(0.0, 60.0, 18000)
clean_signal = nk.ecg_clean(raw_signal, sampling_rate, method)
'''
Use f = scale2frequency(wavelet, scale)/sampling_period to check frequencies corresponding to the chosen scales
'''
scales = np.linspace(10, 750, num=150) # choosing wavelet frequencies between 0.5 Hz to 150 Hz
sampling_period = np.diff(sampling_times).mean()
# continuous wavelet transform
coeff_mat, freqs = pywt.cwt(
clean_signal,
scales=scales,
wavelet=wavelet,
sampling_period=sampling_period
) # the cwt() function returns frequencies and coefficient matrices
coeff_mat = np.abs(coeff_mat)
fig, ax = plt.subplots(figsize=(2.90, 2.91))
pcm = ax.pcolormesh(sampling_times, freqs, coeff_mat, cmap='jet')
ax.set_yscale('log')
ax.axis('off')
plt.savefig('./signal_cwt_images_training/hamilton_filter/' + key + '.png', dpi=100, bbox_inches='tight', pad_inches=0.0) # images saved in new directory
plt.close(fig)
if __name__ == '__main__':
file_list = sorted(glob.glob('./data/*.mat'))
file_list = [os.path.splitext(x)[0] for x in file_list]
'''
The signals are set to a fixed length (padding and trunctation), stored in a 2d array and converted into an
HDF5 file for convenience.
'''
#data_array = fix_signal_length(file_list)
#make_HDF5(data_array, file_list)
'''
The signal to scalogram conversions are performed in parallel using joblib.
'''
with h5py.File('./data/data.h5') as f:
keys = list(f.keys())
t1 = time.time()
Parallel(n_jobs=8)(delayed(signal_CWT)(key) for key in tqdm(keys)) # joblib's parallelized for loop
t2 = time.time()
print('Time taken for execution: ' + str(t2-t1) + ' seconds')