-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_example.py
More file actions
263 lines (220 loc) · 13.9 KB
/
Copy pathbatch_example.py
File metadata and controls
263 lines (220 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# script for a batch_run, to compute various informations/complexities
from pathlib import Path
from time import time # for timings
#from numpy.random import default_rng # for random numbers generation (new mmethod)
#rng = default_rng() # https://numpy.org/doc/stable/reference/random/index.html
import numpy
#from scipy.stats.stats import pearsonr # will give *normalized* correlation coefficient
from scipy import signal
from scipy.fftpack import fft, ifft, fftfreq, fftshift # faster than numpy fft
from scipy.io import loadmat # to load Matlab files
#numpy.random.seed(1234) # probably not correct for new method
pi = numpy.pi
import entropy.entropy as entropy # 2022-06-02: just for setting parameters (lines 37 and 38 of this script)
import causality_tools # .py file created on 2022-01-31 to clarify scripts
import couples_tools # .py file created on 2022-01-31 to clarify scripts
import couples_IT # .py file created on 2022-03-15 to simplify this batch script <- this is where all the analysis is implemented
# where is the data:
datapath="/home/ngarnier/codes/TE/raw_peaks"
# batch parameters
#
fs = 20 # 10Hz...
filename_prefix = "complexities_all_" # name of the output file wontaining the results; here, we have chosen to compute ApEn/SampEn
# for TE:
lag_values = numpy.arange(1, 10*fs, 1)
stride = -1
# for entropy, ApEn/SampEn, entropy rate and MI:
stride_values = numpy.arange(1, 10*fs, 1) # 2022-08-25: remove 0 for perm entropy
# for bias estimation :
N_shuffles = 0
N_eff = 2000
n_embed = 1
entropy.set_sampling(Theiler=4, N_eff=N_eff, N_real=10)
entropy.set_verbosity(1)
# what to compute:
do_TE = 0
do_cross_MI = 0
do_H = 1 # <- we will compute the entropy, but with the parameter "complexities" (cf line 172), it will be ApEn and SampEn
do_h = 0
do_Hinc = 0
do_MI = 0
# set some indexing depending on (mother,fetus) couple (stressed or control, male or female fetuses, etc)
couples_full=couples_tools.set_couples(do_discard=1) # since 2022-04-11, we discard couples with (suspiciously) low fHR values
# couples_full = couples_full[46:] # short-cut correction to start after the 4th one
N_couples=couples_full.shape[0]
print("working on %d couples, with first couple being %d" %(N_couples, couples_full[0]))
# prepare variables (ndarrays) to store results:
if (do_TE>0):
N_lags=lag_values.shape[0]
print("computing TE with %d lags" %N_lags)
full_TE_mf =numpy.zeros((N_couples, N_lags), dtype=float)
full_TE_mf_std =numpy.zeros((N_couples, N_lags), dtype=float)
full_TE_mf_bias=numpy.zeros((N_couples, N_lags), dtype=float)
full_TE_fm =numpy.zeros((N_couples, N_lags), dtype=float)
full_TE_fm_std =numpy.zeros((N_couples, N_lags), dtype=float)
full_TE_fm_bias=numpy.zeros((N_couples, N_lags), dtype=float)
if (do_cross_MI>0):
N_lags=lag_values.shape[0]
print("computing cross_MI with %d lags" %N_lags)
full_cMI_mf =numpy.zeros((N_couples, N_lags), dtype=float)
full_cMI_mf_std =numpy.zeros((N_couples, N_lags), dtype=float)
full_cMI_mf_bias=numpy.zeros((N_couples, N_lags), dtype=float)
full_cMI_fm =numpy.zeros((N_couples, N_lags), dtype=float)
full_cMI_fm_std =numpy.zeros((N_couples, N_lags), dtype=float)
full_cMI_fm_bias=numpy.zeros((N_couples, N_lags), dtype=float)
if (do_H>0):
N_stride = stride_values.shape[0]
print("computing ApEn/SampEn with %d strides" %N_stride)
full_ApEn_f =numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
full_ApEn_f_std =numpy.zeros((N_couples, N_stride, 1), dtype=float)
full_ApEn_f_bias =numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
full_ApEn_m =numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
full_ApEn_m_std =numpy.zeros((N_couples, N_stride, 1), dtype=float)
full_ApEn_m_bias =numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
full_SampEn_f =numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
full_SampEn_f_std =numpy.zeros((N_couples, N_stride, 1), dtype=float)
full_SampEn_f_bias=numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
full_SampEn_m =numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
full_SampEn_m_std =numpy.zeros((N_couples, N_stride, 1), dtype=float)
full_SampEn_m_bias=numpy.zeros((N_couples, N_stride, n_embed+1), dtype=float)
if (do_MI>0):
N_stride = stride_values.shape[0]
print("computing MI with %d strides" %N_stride)
full_MI =numpy.zeros((N_couples, N_stride), dtype=float)
full_MI_std =numpy.zeros((N_couples, N_stride), dtype=float)
full_MI_bias=numpy.zeros((N_couples, N_stride), dtype=float)
if (do_h>0):
N_stride = stride_values.shape[0]
print("computing h with %d strides" %N_stride)
full_h_f =numpy.zeros((N_couples, N_stride), dtype=float)
full_h_f_std =numpy.zeros((N_couples, N_stride), dtype=float)
full_h_f_bias=numpy.zeros((N_couples, N_stride), dtype=float)
full_h_m =numpy.zeros((N_couples, N_stride), dtype=float)
full_h_m_std =numpy.zeros((N_couples, N_stride), dtype=float)
full_h_m_bias=numpy.zeros((N_couples, N_stride), dtype=float)
if (do_Hinc>0):
N_stride = stride_values.shape[0]
print("computing H_inc with %d strides" %N_stride)
full_H_inc_f =numpy.zeros((N_couples, N_stride), dtype=float)
full_H_inc_f_std =numpy.zeros((N_couples, N_stride), dtype=float)
full_H_inc_f_bias=numpy.zeros((N_couples, N_stride), dtype=float)
full_H_inc_m =numpy.zeros((N_couples, N_stride), dtype=float)
full_H_inc_m_std =numpy.zeros((N_couples, N_stride), dtype=float)
full_H_inc_m_bias=numpy.zeros((N_couples, N_stride), dtype=float)
full_std_inc_f =numpy.zeros((N_couples, N_stride), dtype=float)
full_std_inc2_f =numpy.zeros((N_couples, N_stride), dtype=float)
full_std_inc_m =numpy.zeros((N_couples, N_stride), dtype=float)
full_std_inc2_m =numpy.zeros((N_couples, N_stride), dtype=float)
# main loop over the (mother,fetus) couples/diads:
# this is where the computations are done
i_couple=0
for couple in couples_full:
print("%d " %couple, end="")
# data=couples_tools.load_couple(couple, fs=fs, do_fix_peaks=0) # since 2022-04-11, we do not use "fixpeaks"
# data=couples_tools.load_couple(couple, fs=fs, quantity="RRI", noise_level=0.1)
data=couples_tools.load_couple(couple, do_filter=0, noise_level=0.) # sampled at 1kHz => we'll downsample later
if (do_TE>0):
# TE_mf, TE_fm, TE_mf_std, TE_fm_std, TE_mf_bias, TE_fm_bias = couples_IT.compute_couple_TE(data, fs, lag_values, n_embed=n_embed, stride=stride, N_shuffles=N_shuffles, mask="accel", use_mother=True) # using mother decel mask
TE_mf, TE_fm, TE_mf_std, TE_fm_std, TE_mf_bias, TE_fm_bias = couples_IT.compute_couple_TE(data, fs, lag_values, n_embed=n_embed, stride=stride, N_shuffles=N_shuffles, mask="accel", use_mother=False) # using foetus decel mask
# if mask is "accel" (or "decel"), conditioning is done using accelerations (or decelerations)
# if use_mother is False, conditioning is done using fetal HR data
full_TE_mf [i_couple] = TE_mf
full_TE_mf_std [i_couple] = TE_mf_std
full_TE_mf_bias[i_couple] = TE_mf_bias
full_TE_fm [i_couple] = TE_fm
full_TE_fm_std [i_couple] = TE_fm_std
full_TE_fm_bias[i_couple] = TE_fm_bias
filename="results/"+filename_prefix+"fs_"+str(fs)+"_stride_"+str(stride)+"_shuffle_"+str(N_shuffles)+"_couple_"+str(couple)+".npz"
numpy.savez(filename, TE_mf=TE_mf, TE_fm=TE_fm, TE_mf_std=TE_mf_std, TE_fm_std=TE_fm_std, TE_mf_bias=TE_mf_bias, TE_fm_bias=TE_fm_bias, lag_values=lag_values)
if (do_cross_MI>0):
# cMI_mf, cMI_fm, cMI_mf_std, cMI_fm_std, cMI_mf_bias, cMI_fm_bias = couples_IT.compute_couple_cMI(data, lag_values, stride=stride, N_shuffles=N_shuffles)
cMI_mf, cMI_fm, cMI_mf_std, cMI_fm_std, cMI_mf_bias, cMI_fm_bias = couples_IT.compute_couple_cMI(data, fs, lag_values, stride=stride, N_shuffles=N_shuffles, mask="decel")
full_cMI_mf [i_couple] = cMI_mf
full_cMI_mf_std [i_couple] = cMI_mf_std
full_cMI_mf_bias[i_couple] = cMI_mf_bias
full_cMI_fm [i_couple] = cMI_fm
full_cMI_fm_std [i_couple] = cMI_fm_std
full_cMI_fm_bias[i_couple] = cMI_fm_bias
if (do_H>0): # various type of entropies, including eventually ApEn and SampEn:
# H_f, H_m, H_f_std, H_m_std, H_f_bias, H_m_bias = couples_IT.compute_couple_entropy(data, stride_values, N_shuffles=N_shuffles)
# H_f, H_m, H_f_std, H_m_std, H_f_bias, H_m_bias = couples_IT.compute_couple_entropy(data, fs, stride_values, N_shuffles=N_shuffles, mask="accel", use_mother=False)
# ApEn_f, ApEn_m, ApEn_f_std, ApEn_m_std, ApEn_f_bias, ApEn_m_bias, SampEn_f, SampEn_m, SampEn_f_std, SampEn_m_std, SampEn_f_bias, SampEn_m_bias = couples_IT.compute_couple_entropy(data, fs, stride_values, n_embed=n_embed, N_shuffles=N_shuffles, type="complexities", mask="accel", use_mother=False)
ApEn_f, ApEn_m, ApEn_f_std, ApEn_m_std, ApEn_f_bias, ApEn_m_bias, SampEn_f, SampEn_m, SampEn_f_std, SampEn_m_std, SampEn_f_bias, SampEn_m_bias = couples_IT.compute_couple_entropy(data, fs, stride_values, n_embed=n_embed, N_shuffles=N_shuffles, type="complexities")
full_ApEn_f [i_couple,:] = ApEn_f
full_ApEn_f_std [i_couple] = ApEn_f_std
full_ApEn_f_bias[i_couple,:] = ApEn_f_bias
full_ApEn_m [i_couple,:] = ApEn_m
full_ApEn_m_std [i_couple] = ApEn_m_std
full_ApEn_m_bias[i_couple,:] = ApEn_m_bias
full_SampEn_f [i_couple,:] = SampEn_f
full_SampEn_f_std [i_couple] = SampEn_f_std
full_SampEn_f_bias[i_couple,:] = SampEn_f_bias
full_SampEn_m [i_couple,:] = SampEn_m
full_SampEn_m_std [i_couple] = SampEn_m_std
full_SampEn_m_bias[i_couple,:] = SampEn_m_bias
if (do_h>0): # entropy rate:
H_f, H_m, H_f_std, H_m_std, H_f_bias, H_m_bias = couples_IT.compute_couple_entropy_rate(data, stride_values, N_shuffles=N_shuffles)
full_h_f [i_couple] = H_f
full_h_f_std [i_couple] = H_f_std
full_h_f_bias[i_couple] = H_f_bias
full_h_m [i_couple] = H_m
full_h_m_std [i_couple] = H_m_std
full_h_m_bias[i_couple] = H_m_bias
if (do_Hinc>0):
Hi_f, Hi_m, Hi_f_std, Hi_m_std, Hi_f_bias, Hi_m_bias, std_inc_f, std_inc_m, std_inc2_f, std_inc2_m = couples_IT.compute_couple_Hinc(data, stride_values, N_shuffles=N_shuffles)
full_H_inc_f [i_couple] = Hi_f
full_H_inc_f_std [i_couple] = Hi_f_std
full_H_inc_f_bias[i_couple] = Hi_f_bias
full_H_inc_m [i_couple] = Hi_m
full_H_inc_m_std [i_couple] = Hi_m_std
full_H_inc_m_bias[i_couple] = Hi_m_bias
full_std_inc_f [i_couple] = std_inc_f
full_std_inc2_f [i_couple] = std_inc2_f
full_std_inc_m [i_couple] = std_inc_m
full_std_inc2_m [i_couple] = std_inc2_m
if (do_MI>0): # MI:
full_MI[i_couple,:], full_MI_std[i_couple,:], full_MI_bias[i_couple,:] = couples_IT.compute_couple_MI(data, stride_values, N_shuffles=N_shuffles)
print("... done")
i_couple +=1
# then save results in a (single, large) .npz file:
if (do_TE>0):
filename = filename_prefix+"all_"+"fs_"+str(fs)+"_stride_"+str(stride)+"_n_embed_"+str(n_embed)+"_N_eff_"+str(N_eff)+"_shuffle_"+str(N_shuffles)+"_newcode.npz"
numpy.savez(filename, full_TE_mf=full_TE_mf, full_TE_fm=full_TE_fm,
full_TE_mf_std=full_TE_mf_std, full_TE_fm_std=full_TE_fm_std,
full_TE_mf_bias=full_TE_mf_bias, full_TE_fm_bias=full_TE_fm_bias,
lag_values=lag_values, stride=stride, N_shuffles=N_shuffles)
if (do_cross_MI>0):
filename = "cMI_decel_all"+"_fs_"+str(fs)+"_stride_"+str(stride)+"_N_eff_"+str(N_eff)+"_shuffle_"+str(N_shuffles)+".npz"
numpy.savez(filename, full_cMI_mf=full_cMI_mf, full_cMI_fm=full_cMI_fm,
full_cMI_mf_std=full_cMI_mf_std, full_cMI_fm_std=full_cMI_fm_std,
full_cMI_mf_bias=full_cMI_mf_bias, full_cMI_fm_bias=full_cMI_fm_bias,
lag_values=lag_values, stride=stride, N_shuffles=N_shuffles)
if (do_H>0):
filename = filename_prefix+"all_"+str(fs)+"Hz"+"_N_eff_"+str(N_eff)+"_shuffle_"+str(N_shuffles)+".npz"
numpy.savez(filename, full_ApEn_f=full_ApEn_f, full_ApEn_m=full_ApEn_m,
full_ApEn_f_std=full_ApEn_f_std, full_ApEn_m_std=full_ApEn_m_std,
full_ApEn_f_bias=full_ApEn_f_bias, full_ApEn_m_bias=full_ApEn_m_bias,
full_SampEn_f=full_SampEn_f, full_SampEn_m=full_SampEn_m,
full_SampEn_f_std=full_SampEn_f_std, full_SampEn_m_std=full_SampEn_m_std,
full_SampEn_f_bias=full_SampEn_f_bias, full_SampEn_m_bias=full_SampEn_m_bias,
stride_values=stride_values, N_shuffles=N_shuffles)
if (do_h>0):
filename = "hr_all_"+str(fs)+"Hz"+"_N_eff_"+str(N_eff)+"_n_embed_"+str(n_embed)+"_shuffle_"+str(N_shuffles)+".npz"
numpy.savez(filename, full_h_f=full_h_f, full_h_m=full_h_m,
full_h_m_std=full_h_m_std, full_h_f_std=full_h_f_std,
full_h_m_bias=full_h_m_bias, full_h_f_bias=full_h_f_bias,
stride_values=stride_values, N_shuffles=N_shuffles)
if (do_Hinc>0):
filename = "Hinc_all_"+str(fs)+"Hz"+"_N_eff_"+str(N_eff)+"_shuffle_"+str(N_shuffles)+".npz"
numpy.savez(filename, full_H_inc_f=full_H_inc_f, full_H_inc_m=full_H_inc_m,
full_H_inc_m_std=full_H_inc_m_std, full_H_inc_f_std=full_H_inc_f_std,
full_H_inc_m_bias=full_H_inc_m_bias, full_H_inc_f_bias=full_H_inc_f_bias,
full_std_inc_f=full_std_inc_f, full_std_inc_m=full_std_inc_m,
full_std_inc2_f=full_std_inc2_f, full_std_inc2_m=full_std_inc2_m,
stride_values=stride_values, N_shuffles=N_shuffles)
if (do_MI>0):
filename = "MI_all_"+str(fs)+"Hz"+"_N_eff_"+str(N_eff)+"_shuffle_"+str(N_shuffles)+".npz"
numpy.savez(filename, full_MI=full_MI, full_MI_std=full_MI_std,
full_MI_bias=full_MI_bias,
stride_values=stride_values, N_shuffles=N_shuffles)
print("done.")