import numpy as np from cdflib import CDF, cdfepoch import datetime as dt from datetime import datetime, timedelta from math import acos, cos import matplotlib.pyplot as plt import os from matplotlib.colors import LogNorm, Normalize import pandas as pd def cdfepoch_to_datetime(epoch_array): dt_list = cdfepoch.to_datetime(epoch_array) return [dt if isinstance(dt, datetime) else dt.astype('datetime64[ms]').astype(datetime) for dt in dt_list] def time_resolution(time_array): deltat = np.diff(time_array) dt_seconds = np.array([d.total_seconds() for d in deltat]) return np.median(dt_seconds) class RAPID: def __init__(self, sc_id='C3', time_period=None, path='D:/Daten/LISA/Studium/CAU/4.Semester/Python programme/Bachelorarbeit/RAPID/', particle_type="electron"): self.particle_type = particle_type.lower() self.ies_energy = np.array([39.2, 50.5, 68.1, 94.5, 127.5, 175.9, 244.1, 336.5]) self.iims_energy = np.array([ 2.770E+01, 7.530E+01, 9.220E+01, 1.597E+02, 3.740E+02, 9.620E+02, 1.885E+03, 4.007E+03 ]) self.sc_id = sc_id self.time_period = time_period self.path = path self.particle_type = particle_type.lower() if self.particle_type == "electron": self.spec = "e3dd" self.flux_var = "Electron_Dif_flux_3D" self.energy_table = self.ies_energy else: self.spec = "i3dm_h" self.flux_var = "Proton_Dif_flux_3DM" self.energy_table = self.iims_energy self.load() def load(self): self.load_rapid() def load_rapid(self): sc = self.sc_id.lower() self.data_rapid = None tt = self.time_period[0] while tt < self.time_period[1]: rapid_path = self.path + f'{tt.year}/' l = os.listdir(rapid_path) fname = f'{sc}_cp_rap_{self.spec}_%.4i%.2i%.2i' % (tt.year, tt.month, tt.day) ll = [fn for fn in l if fname in fn] if len(ll) > 0: ll.sort() rapid_name = ll[-1] self.data_rapid = CDF(rapid_path + rapid_name) tt += timedelta(days=1) print("Ordner:", rapid_path) print("Suchstring:", fname) print("Dateien:", l) print("Matches:", ll) return self.data_rapid def get_rapid_var(self, data_rapid): sc = self.sc_id.upper() suffix = self.spec.upper() self.time_rapid = cdfepoch_to_datetime( data_rapid.varget(f'Time_tags__{sc}_CP_RAP_{suffix}') ) self.az = data_rapid.varget(f'Dimension_Az__{sc}_CP_RAP_{suffix}') self.pol = data_rapid.varget(f'Dimension_Pol__{sc}_CP_RAP_{suffix}') self.energy = data_rapid.varget(f'Dimension_E__{sc}_CP_RAP_{suffix}') self.flux = data_rapid.varget( f'{self.flux_var}__{sc}_CP_RAP_{suffix}' ) self.flux = np.where(self.flux <= 0, np.nan, self.flux) self.flux_sd = data_rapid.varget( f'{self.flux_var}_SD__{sc}_CP_RAP_{suffix}' ) t0, t1 = self.time_period time = np.array(self.time_rapid) mask = (time >= t0) & (time <= t1) self.flux = np.array(self.flux) self.flux_sd = np.array(self.flux_sd) print("time shape:", time.shape) print("flux shape BEFORE:", self.flux.shape) if self.flux.shape[0] == len(time): self.flux = self.flux[mask] self.flux_sd = self.flux_sd[mask] elif self.flux.ndim > 1 and self.flux.shape[1] == len(time): self.flux = self.flux[:, mask] self.flux_sd = self.flux_sd[:, mask] d_rapid = {'sc': self.sc_id, 'time_rapid': self.time_rapid, 'az': self.az, 'pol': self.pol, 'energy': self.energy, 'flux': self.flux, 'flux_sd': self.flux_sd } return d_rapid def convert_angle(self, az, pol, nt): #nt: number of time stamps pol = np.deg2rad(pol) # polar angle az = np.deg2rad(az) # azimuth angle phi, theta = np.meshgrid(az, pol, indexing='ij') #grid of all combinations of az x pol #spherical to kathesian coordinates nx = np.sin(theta) * np.cos(phi) ny = np.sin(theta) * np.sin(phi) nz = np.cos(theta) n_single = np.stack((nx, ny, nz),axis=-1) #n for one time stamp n = np.repeat(n_single[np.newaxis,...],nt, axis =0) #print('normalenvector:', n) return n # IFLOW contains the matrix used to convert space craft coordinates to GSE -> needed for pitch angle calc. class FLOW: def __init__(self, sc_id='C3', time_period=None, path_base='D:/Daten/LISA/Studium/CAU/4.Semester/Python programme/Bachelorarbeit/FLOW/', particle_type="electron"): self.sc_id = sc_id self.time_period = time_period self.particle_type = particle_type.lower() if self.particle_type == "electron": self.prefix = "EFLOW" else: self.prefix = "IFLOW" self.path = path_base + self.prefix + '/' self.load() def load(self): self.load_flow() def load_flow(self): sc = self.sc_id.upper() self.data_flow = None t_start = self.time_period[0] path_flow = self.path + f'{t_start.year}/' l = os.listdir(path_flow) fname = (f'{sc}_CP_RAP_{self.prefix}_GSE__' f'{t_start.year:04d}{t_start.month:02d}{t_start.day:02d}') ll = [fn for fn in l if fname in fn] if len(ll) > 0: ll.sort() flow_name = ll[-1] self.data_flow = CDF(path_flow + flow_name) else: print('keine datei gefunden für:', fname) return self.data_flow def get_flow_var(self, data_flow): sc = self.sc_id.upper() self.time_flow = cdfepoch_to_datetime( data_flow.varget(f'Time_tags__{sc}_CP_RAP_{self.prefix}_GSE') ) self.sc2gse = data_flow.varget( f'SC2GSE__{sc}_CP_RAP_{self.prefix}_GSE' ) return { 'sc': self.sc_id, 'time_flow': np.array(self.time_flow), 'sc2gse': np.array(self.sc2gse) } def get_iflow_var(self, data_iflow): sc=self.sc_id.upper() if self.data_iflow is None: print('nicht geladen') self.time_iflow = cdfepoch_to_datetime(data_iflow.varget(f'Time_tags__{sc}_CP_RAP_IFLOW_GSE')) self.sc2gse = data_iflow.varget(f'SC2GSE__{sc}_CP_RAP_IFLOW_GSE') self.time_iflow = np.array(self.time_iflow) self.sc2gse = np.array(self.sc2gse) #save in dict d_iflow = {'sc': self.sc_id, 'time_iflow': self.time_iflow, 'sc2gse': self.sc2gse } return d_iflow class ORBIT: # file name example: cl_sp_aux_20000823_v02 def __init__(self, sc_id = 'C3', time_period = None, path = 'D:/Daten/LISA/Studium/CAU/4.Semester/Python programme/Bachelorarbeit/ORBIT/'): self.sc_id = sc_id self.time_period = time_period self.path = path self.position = np.zeros((0,3),dtype = np.float64) self.load() self.distance = self.calc_distance(self.position) def load(self): self.load_orbit() #self.load_rapid() def load_orbit(self): self.data_aux = None tt = self.time_period[0] while tt < self.time_period[1]: aux_path = self.path + f'orbit/cl/{tt.year}/' l = os.listdir(aux_path) fname = 'cl_sp_aux_%.4i%.2i%.2i'%(tt.year,tt.month,tt.day) ll = [] for fn in l: if fname in fn: ll.append(fn) if len(ll)>0: #multiple versions -> use last one ll.sort() aux_name = ll[-1] self.data_aux = CDF(aux_path + aux_name) #self.position = np.append(self.position,data_aux.varget('sc_r_xyz_gse__CL_SP_AUX'), axis = 0) tt+= timedelta(days = 1) return self.data_aux def get_orbit_var(self, data_aux): #self.time_2= np.array(data_aux.varget('Epoch__CL_SP_AUX')) self.time_aux = cdfepoch_to_datetime(data_aux.varget('Epoch__CL_SP_AUX')) self.position = data_aux['sc_r_xyz_gse__CL_SP_AUX'] self.L_value = data_aux['L_sc_r_xyz_gse__CL_SP_AUX'] d_orbit = {'time_aux': self.time_aux, 'position': self.position, 'L_value': self.L_value} return d_orbit def calc_distance(self, position): distance = np.sqrt(position[:,0]**2 + position[:,1]**2 + position[:,2]**2) return distance class FGM: #file name expample: c3_cp_fgm_5vps_20020101_v20140305 def __init__(self, sc_id = 'C3', time_period = None, path = 'D:/Daten/LISA/Studium/CAU/4.Semester/Python programme/Bachelorarbeit/FGM/'): self.sc_id = sc_id self.time_period = time_period self.path = path def load_fgm(self): sc=self.sc_id.lower() self.data_fgm = None tt = self.time_period[0] while tt < self.time_period[1]: fgm_path = self.path + f'{tt.year}/' l = os.listdir(fgm_path) #list of all data in one year folder fname = f'{sc}_cp_fgm_5vps_%.4i%.2i%.2i_v'%(tt.year,tt.month,tt.day) ll = [] for fn in l: if fname in fn: ll.append(fn) if len(ll)>0: ll.sort() fgm_name = ll[-1] #print('Here') self.data_fgm = CDF(fgm_path + fgm_name) tt+= timedelta(days = 1) return self.data_fgm def get_fgm_var(self, data_fgm): sc = self.sc_id.lower() #self.time_3 = np.array(data_fgm.varget(f'time_tags__{sc}_CP_FGM_5VPS')) self.time_fgm = cdfepoch_to_datetime(data_fgm.varget(f'time_tags__{sc}_CP_FGM_5VPS')) self.B_mag = np.array(data_fgm.varget(f'B_mag__{sc}_CP_FGM_5VPS')) self.B_vec = np.array(data_fgm.varget(f'B_vec_xyz_gse__{sc}_CP_FGM_5VPS')) self.pos_fgm = np.array(data_fgm.varget(f'sc_pos_xyz_gse__{sc}_CP_FGM_5VPS')) d_fgm = {'sc': self.sc_id, 'time_fgm':self.time_fgm, 'B_mag': self.B_mag, 'B_vec': self.B_vec, 'pos_fgm': self.pos_fgm} return d_fgm #class for calculated pitch angle data of CSA # file name example: C3_CP_RAP_PAD_I3DM_H__20020101_00000_20021231_000000_V210130 class PITCH_CSA: def __init__(self, sc_id = 'C3', time_period = None, path = 'D:/Daten/LISA/Studium/CAU/4.Semester/Python programme/Bachelorarbeit/RAPID/PAD/'): self.sc_id = sc_id self.time_period = time_period self.path = path self.load() def load(self): self.load_pad() def load_pad(self): sc=self.sc_id.upper() self.data_pad = None t_start = self.time_period[0] path_pad = self.path + f'{t_start.year}/' l = os.listdir(path_pad) fname = (f'{sc}_CP_RAP_PAD_I3DM_H__20020101_000000_20021231_000000_V210130') ll = [] for fn in l: if fname in fn: ll.append(fn) if len(ll)>0: ll.sort() pad_name = ll[-1] self.data_pad = CDF(path_pad + pad_name) else: print('keine datei gefunden für:', fname) return self.data_pad def get_pad_var(self, data_pad): sc=self.sc_id.upper() if self.data_pad is None: print("PAD data is None") return None if self.data_pad is None: print('Error load PAD file') self.time_pad = cdfepoch_to_datetime(data_pad.varget(f'Time_tags__{sc}_CP_RAP_PAD_I3DM_H')) #print('time iflow:', self.time_iflow) self.pad = data_pad.varget(f'Dimension_Pitch__{sc}_CP_RAP_PAD_I3DM_H') self.time_pad = np.array(self.time_pad) self.pad = np.array(self.pad) #save in dict d_pad = {'sc': self.sc_id, 'time_pad': self.time_pad, 'pad': self.pad } return d_pad def nearest_neighbor_indice(time_source, time_target): t0 = time_source[0] ts = np.array([(t - t0).total_seconds() for t in time_source]) tt = np.array([(t - t0).total_seconds() for t in time_target]) idx = np.searchsorted(ts, tt) idx = np.clip(idx, 1, len(ts) - 1) left = ts[idx - 1] right = ts[idx] idx -= tt - left < right - tt return idx class WORKSPACE: def __init__(self): self.SC_data = {} def sync_all_by_resolution(self, rapid: RAPID, fgm: FGM, iflow: FLOW): t_r = rapid.time_rapid t_f = fgm.time_fgm t_e = iflow.time_flow dr = time_resolution(t_r) df = time_resolution(t_f) de = time_resolution(t_e) t_ref = t_r #RAPID -> t_ref if t_ref is not t_r: idx_r = nearest_neighbor_indice(t_r, t_ref) rapid_sync = { 'flux': rapid.flux[idx_r], 'flux_sd': rapid.flux_sd[idx_r], 'az': rapid.az, 'pol': rapid.pol, 'energy': rapid.energy } print("idx_r max:", np.max(idx_r)) print("len rapid.flux:", len(rapid.flux)) else: rapid_sync = { 'flux': rapid.flux, 'flux_sd': rapid.flux_sd, 'az': rapid.az, 'pol': rapid.pol, 'energy': rapid.energy } print('RAPID is synced') # FGM -> t_ref if t_ref is not t_f: idx_f = nearest_neighbor_indice(t_f, t_ref) fgm_sync = { 'B_vec': fgm.B_vec[idx_f], 'B_mag': fgm.B_mag[idx_f], 'pos_fgm': fgm.pos_fgm[idx_f] } else: fgm_sync = { 'B_vec': fgm.B_vec, 'B_mag': fgm.B_mag, 'pos_fgm': fgm.pos_fgm } print('FGM is synced') # IFLOW -> t_ref if t_ref is not t_e: idx_e = nearest_neighbor_indice(t_e, t_ref) sc2gse = iflow.sc2gse[idx_e] all_sync = { 'time': t_ref, 'rapid': rapid_sync, 'fgm': fgm_sync, 'sc2gse': sc2gse } print('IFLOW is synced :)') else: sc2gse = iflow.sc2gse all_sync = { 'time': t_ref, 'rapid': rapid_sync, 'fgm': fgm_sync, 'sc2gse': sc2gse } print('IFLOW is synced') return all_sync print('IFLOW is synced') def rapid_n_sc2gse(self, sc_id): data = self.SC_data[sc_id] nt = len(data['time']) n_sc = RAPID.convert_angle(self = None, az = data['az'], pol=data['pol'], nt=nt) sc2gse = data['sc2gse'] n_gse = np.zeros_like(n_sc) for t in range(nt): R = sc2gse[t] n_gse[t] = (-1)*np.einsum('ij,abj->abi', R, n_sc[t]) self.SC_data[sc_id]['n_sc'] = n_sc self.SC_data[sc_id]['n_gse'] = n_gse return n_gse #calc median of time for all files seperate -> biggest delta_t = smallest posible resolution -> reference time def spacecrafts(self, sc_id, t_sync, rapid_sync, fgm_sync, sc2gse, time_period = None): if time_period is not None: t0, t1 = time_period mask = np.array([(t >= t0 and t <= t1) for t in t_sync]) else: mask = slice(None) self.SC_data[sc_id] = {'time': np.array(t_sync)[mask], 'flux': rapid_sync['flux'][mask], 'flux_sd': rapid_sync['flux_sd'][mask], 'pol': rapid_sync['pol'], 'az': rapid_sync['az'], 'energy': rapid_sync['energy'], 'B_vec': fgm_sync['B_vec'][mask], 'B_mag': fgm_sync['B_mag'][mask], 'fgm_pos': fgm_sync['pos_fgm'][mask], 'sc2gse': None if sc2gse is None else sc2gse[mask] } return self.SC_data[sc_id] @staticmethod def time_overlap(t1,t2): return max(t1[0], t2[0])<= min(t1[-1], t2[-1]) def calc_pitch_angle(self, sc_id): data = self.SC_data[sc_id] #nt = data['B_vec'].shape[0] #number of time stamps (synced time) n = data['n_gse'] print('shape n:', n.shape) B_vec = data['B_vec'] B_mag = data['B_mag'] B_vec_exp = B_vec[:, None, None, :] dot = np.sum(n*B_vec_exp, axis = -1) #scalar product cos_pitch = dot/B_mag[:, None, None] #calc norm cos_pitch = np.clip(cos_pitch, -1.0,1.0) pitch_angle = np.degrees(np.arccos(cos_pitch)) self.SC_data[sc_id]['pitch_angle'] = pitch_angle self.SC_data[sc_id]['cos_pitch_angle'] = cos_pitch return pitch_angle, cos_pitch # get intex of datetime object def time_to_index(self, time_array, time_target): print(time_array[0]) print(time_target) diff = [abs(t-np.datetime64(time_target)) for t in time_array] return int(np.argmin(diff)) def plot_flux_of_pitch(self, sc_id, time_target, e_idx): data = self.SC_data[sc_id] idx = self.time_to_index(data['time'], time_target) pitch = data['pitch_angle'][idx]#[..., e_idx] flux = data['flux'][idx][e_idx] print(pitch.shape) print(flux.shape) flux_sd = data['flux_sd'][idx][e_idx] pitch_1d = pitch.flatten() flux_1d = flux.flatten() flux_sd1d = np.abs(flux_sd.flatten()) mask = (np.isfinite(flux_1d) & np.isfinite(pitch_1d) & (flux_1d > 0)) print("flux min:", np.min(flux_1d)) print("flux max:", np.max(flux_1d)) print("pitch min:", np.min(pitch_1d)) print("pitch max:", np.max(pitch_1d)) print("finite flux:", np.sum(np.isfinite(flux_1d))) print("positive flux:", np.sum(flux_1d > 0)) print("finite pitch:", np.sum(np.isfinite(pitch_1d))) return pitch_1d[mask], flux_1d[mask], flux_sd1d[mask] def histo_flux_pitch(self, sc_id, time_target, e_idx): data = self.SC_data[sc_id] idx = self.time_to_index(data['time'], time_target) flux = data['flux'][idx][e_idx] pitch = data['pitch_angle'][idx] #pitch_1d = pitch.flatten() #flux_1d = flux.flatten() flux_1d = flux.flatten() pitch_1d = pitch.flatten() valid = (np.isfinite(flux_1d) & np.isfinite(pitch_1d) & (flux_1d > 0)) flux_1d = flux_1d[valid] pitch_1d = pitch_1d[valid] binedge = np.array([ 0, 20, 40, 60, 80, 100, 120, 140, 160, 180]) nhist, x = np.histogram(pitch_1d, bins=binedge, weights=None) fhist, x = np.histogram(pitch_1d, bins=binedge, weights=flux_1d) mflux = fhist / nhist bcenter = binedge[:-1]+np.diff(binedge)/2 fhist2, x = np.histogram(pitch_1d, bins=binedge, weights=flux_1d**2) varflux = fhist2 / nhist - mflux**2 sflux = np.sqrt(varflux) bcenter = binedge[:-1] + np.diff(binedge) / 2 print("nhist:", nhist) print("sflux:", sflux) return bcenter, mflux, sflux, nhist def histPAD(self, sc_id): self.bin_edges_pa = np.array([0.,20.,40.,60.,80.,100.,120.,140.,160.,180.]) self.dimension_pa = 0.5 * (self.bin_edges_pa[1:] + self.bin_edges_pa[:-1]) data = self.SC_data[sc_id] pitch = data['pitch_angle'] flux = data['flux'] Nt, Ne = flux.shape[:2] Nbins = len(self.dimension_pa) histograms = np.full((Nt, Ne, Nbins), np.nan) for i in range(Nt): pitch_1d = pitch[i].reshape(-1) for j in range(Ne): flux_1d = flux[i, j].reshape(-1) mask = np.isfinite(pitch_1d) & np.isfinite(flux_1d) & (pitch_1d >= 0.0) & (flux_1d >= 0.0) if not np.any(mask): continue pitch_valid = pitch_1d[mask] flux_valid = flux_1d[mask] counts = np.histogram(pitch_valid, bins=self.bin_edges_pa)[0] weighted = np.histogram( pitch_valid, bins=self.bin_edges_pa, weights=flux_valid )[0] hist = weighted / counts hist[counts == 0] = np.nan histograms[i, j] = hist self.pitch_hist = histograms def plot_pitch_timeseries(self, sc_id, start = datetime(2002, 6, 28, 0, ), end = datetime(2002, 6, 28, 23, 0),vmin=None, vmax=None, norm=None, title=None): data = self.SC_data[sc_id] time = np.array(data['time']) time_mask = (time > start) & (time < end) energy = data['energy'] flux_hist = self.pitch_hist pitch_axis = self.dimension_pa Nt, Ne, Nbins = flux_hist.shape fig, ax = plt.subplots( Ne + 2, 1, figsize=(8.0, 12.0), sharex=True, constrained_layout=False ) fig.subplots_adjust(hspace=0.01) #Norm if vmin is None: valid = flux_hist[np.isfinite(flux_hist) & (flux_hist > 0)] vmin = np.nanmin(valid) if len(valid) > 0 else 1e-3 if vmax is None: vmax = np.nanmax(flux_hist) # PAD Plots for i in range(Ne): flux_2d = flux_hist[:, i, :] if norm == 'max_flux': max_per_time = np.nanmax(flux_2d, axis=1, keepdims=True) max_per_time[max_per_time == 0] = np.nan flux_plot = flux_2d / max_per_time n = Normalize(vmin=0, vmax=1) else: flux_plot = flux_2d n = LogNorm(vmin=vmin, vmax=vmax) pcm = ax[i].pcolormesh( pd.to_datetime(time[time_mask]), pitch_axis, flux_plot[time_mask].T, cmap='jet', norm=n, shading='auto' ) print(flux_plot[0]) ax[i].set_yticks([0, 90, 180]) ax[i].set_ylabel(r'$\alpha$ / °') ax[i].text( 0.85, 0.8, f"{energy[i]:.1f} keV", transform=ax[i].transAxes ) # Colorbar cbar_ax = fig.add_axes([0.91, 0.1, 0.02, 0.8]) cbar = fig.colorbar(pcm, cax=cbar_ax) if norm == 'max_flux': cbar.set_label(r'$\frac{J}{J_\text{max}}$') else: cbar.set_label(r'$J_e$ / (cm$^2$ sr s keV)$^{-1}$') # Magnetic Field B = data['B_vec'][time_mask] Bmag = data['B_mag'][time_mask] ax[-2].plot(time[time_mask], B[:,0], label='Bx') ax[-2].plot(time[time_mask], B[:,1], label='By') ax[-2].plot(time[time_mask], B[:,2], label='Bz') ax[-2].plot(time[time_mask], Bmag, label='|B|') ax[-2].set_ylabel("nT") ax[-2].legend() # Orbit R_earth = 6371.0 pos = data['fgm_pos'][time_mask] X = pos[:,0] / R_earth Y = pos[:,1] / R_earth Z = pos[:,2] / R_earth R = np.sqrt(X**2 + Y**2 + Z**2) ax[-1].plot(time[time_mask], X, label=r'$X_{GSE}$') ax[-1].plot(time[time_mask], Y, label=r'$Y_{GSE}$') ax[-1].plot(time[time_mask], Z, label=r'$Z_{GSE}$') ax[-1].plot(time[time_mask], R, label=r'$R$') ax[-1].set_ylabel(r'$R_E$') ax[-1].legend(ncols=2) ax[-1].set_xlabel('Time') if title: fig.suptitle(title) return fig, ax