Compare commits

..

7 commits

Author SHA1 Message Date
Stephan I. Böttcher
6d099311f9 remove */__pycache__ 2024-12-02 15:54:42 +01:00
mslrad
f92beaf9ed Merge branch 'master' into etsasa 2023-09-04 15:07:07 +02:00
mslrad
93b67198a1 ignore .pids 2023-09-04 15:05:47 +02:00
mslrad
0727e8f585 add patch to bokeh core templates 2023-09-04 14:59:24 +02:00
b2dab2d2db Replace selector.py 2022-12-14 18:58:01 +00:00
6530b914e9 Update het_science.py 2022-12-14 18:49:43 +00:00
6fc60ba748 Counters.py 2022-12-14 18:31:57 +00:00
29 changed files with 342 additions and 310 deletions

2
.gitignore vendored
View file

@ -1,2 +1,2 @@
__pycache__
.pid
.pid*

View file

@ -20,6 +20,7 @@ install:
python3 -m venv --system-site-packages --symlinks $(VENV)
$(VENV)/bin/pip3 install $(PIPP) -r requirements.txt
-[ -f requirements-et.txt ] && $(VENV)/bin/pip3 install $(PIPP) -r requirements-et.txt
patch < bokeh.patch ../env/lib/python*/site-packages/bokeh/core/templates.py
start:
bash -c 'nohup $(VENV)/bin/python3 app.py >> ../plotter.log 2>&1 & echo $$! > .pid'

View file

@ -17,6 +17,7 @@ from markupsafe import Markup
#from plots.step_science import *
from plots.Dose import *
from plots.Temp import *
from plots.Counters import *
from plots.LET import *
@ -34,7 +35,7 @@ import MSL.flight_tools as ft
plot_types = [
DoseTimeprofilePlotsB, DoseTimeprofilePlotsE, DoseTimeprofilePlotsB_E
, LETPlots,TempTimeprofilePlots,LETPlots_A1B,LETPlots_A2B
, LETPlots,TempTimeprofilePlots,LETPlots_A1B,LETPlots_A2B , CountersTimeprofilePlots_4,CountersTimeprofilePlots_5,CountersTimeprofilePlots_15
]

13
bokeh.patch Normal file
View file

@ -0,0 +1,13 @@
--- ../env/lib/python3.11/site-packages/bokeh/core/templates.py 2023-09-04 14:52:37.214044761 +0200
+++ /home/asterix/khaksari/Stephan_radwebserver/templates.py 2022-07-10 21:26:03.808304000 +0200
@@ -39,7 +39,9 @@
from os.path import dirname, join
# External imports
-from jinja2 import Environment, FileSystemLoader, Markup
+from jinja2 import Environment, FileSystemLoader
+from jinja2.utils import markupsafe
+from markupsafe import Markup
#-----------------------------------------------------------------------------
# Globals and constants

325
plots/Counters.py Normal file
View file

@ -0,0 +1,325 @@
import datetime as dt
from abc import ABC, abstractmethod
from typing import Dict, Any
import datetime as dt
import pandas as pd
import scipy.constants as const
import numpy as np
import scipy.constants as const
from time import *
import time
import MSL.flight_tools as ft
import MSL.LET_tools as lt
import MSL.time_tools as Mtt
#import js2py
from backend.plot import Plot, DynamicSpectrumPlot, LinePlot,AccumulatedSpectrumPlot
from backend.plotcollection import PlotCollection
from bokeh.io import show
import os
from bokeh.models import ColumnDataSource, HoverTool, Range1d, Label,CustomJS , LogColorMapper, ColorBar, LogTicker, \
LogTickFormatter, Title, Whisker
from bokeh.events import ButtonClick # for saving data
from bokeh.models.widgets import DataTable, DateFormatter, TableColumn
from bokeh.layouts import grid
from bokeh.models import Button # for saving data
from bokeh.plotting import output_notebook
output_notebook()
class CountersTimeprofilePlots_4(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD_Counters'
name = 'Counters(4)'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dcounters_4': LinePlot(self, data['counters_4'], title='Counter data (l2mc[4])', ylabel='l2mc[4](count rate)') }
t = ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['counters_4'].index])
#print(int(Mtt.posix_to_sol(Mtt.dt_to_posix(t))))
A_sol = data['sol']['sol']
source = ColumnDataSource({'A_sol':A_sol, 'A_time': t ,'Count_rate_(l2mc[4])': data['counters_4'] })
#source = ColumnDataSource(df)
#source = ColumnDataSource(data=dict(x=data['dose_B']['B'].index, y=data['dose_B']['B'].value ))
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
counters = ft.load_data((np.arange(start_sol,end_sol)),load='counters')
msk = ft.get_clean_mask(counters)
counters_= pd.DataFrame(counters['l2mc'],columns = [4],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
accTime_T= pd.DataFrame(counters,columns = ['accTime_T'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
frames = [counters_, accTime_T]
result = pd.concat(frames, axis=1)
result["L2_[4]"] = result[4]/result['accTime_T']
#print(result)
counters_4= pd.DataFrame(result, columns = ["L2_[4]"],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
#print(counters_4)
sol= pd.DataFrame(counters['l2mc'],columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
return({'counters_4': counters_4[msk], 'sol': sol[msk]})
class CountersTimeprofilePlots_5(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD_Counters'
name = 'Counters(5)'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dcounters_5': LinePlot(self, data['counters_5'], title='Counter data (l2mc[5])', ylabel='l2mc[5](count rate)') }
t = ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['counters_5'].index])
#print(int(Mtt.posix_to_sol(Mtt.dt_to_posix(t))))
A_sol = data['sol']['sol']
source = ColumnDataSource({'A_sol':A_sol, 'A_time': t ,'Count_rate_(l2mc[5])': data['counters_5'] })
#source = ColumnDataSource(df)
#source = ColumnDataSource(data=dict(x=data['dose_B']['B'].index, y=data['dose_B']['B'].value ))
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
counters = ft.load_data((np.arange(start_sol,end_sol)),load='counters')
msk = ft.get_clean_mask(counters)
counters_= pd.DataFrame(counters['l2mc'],columns = [5],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
accTime_T= pd.DataFrame(counters,columns = ['accTime_T'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
frames = [counters_, accTime_T]
result = pd.concat(frames, axis=1)
result["L2_[5]"] = result[5]/result['accTime_T']
#print(result)
counters_5= pd.DataFrame(result, columns = ["L2_[5]"],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
#print(counters_4)
sol= pd.DataFrame(counters['l2mc'],columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
return({'counters_5': counters_5[msk], 'sol': sol[msk]})
class CountersTimeprofilePlots_15(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD_Counters'
name = 'Counters(15)'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dcounters_15': LinePlot(self, data['counters_15'], title='Counter data (l2mc[15])', ylabel='l2mc[15](count rate)') }
t = ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['counters_15'].index])
#print(int(Mtt.posix_to_sol(Mtt.dt_to_posix(t))))
A_sol = data['sol']['sol']
source = ColumnDataSource({'A_sol':A_sol, 'A_time': t ,'Count_rate_(l2mc[15])': data['counters_15'] })
#source = ColumnDataSource(df)
#source = ColumnDataSource(data=dict(x=data['dose_B']['B'].index, y=data['dose_B']['B'].value ))
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
counters = ft.load_data((np.arange(start_sol,end_sol)),load='counters')
msk = ft.get_clean_mask(counters)
counters_= pd.DataFrame(counters['l2mc'],columns = [15],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
accTime_T= pd.DataFrame(counters,columns = ['accTime_T'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
frames = [counters_, accTime_T]
result = pd.concat(frames, axis=1)
result["L2_[15]"] = result[15]/result['accTime_T']
#print(result)
counters_15= pd.DataFrame(result, columns = ["L2_[15]"],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
#print(counters_4)
sol= pd.DataFrame(counters['l2mc'],columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
return({'counters_15': counters_15[msk], 'sol': sol[msk]})
"""
class DoseTimeprofilePlotsE(PlotCollection):
#Plot Total ionising dose
group = 'Dose-RAD'
name = 'Dose-E'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dose_E': LinePlot(self, data['dose_E']*8.64*1e7, title='RAD_E measurements', ylabel='dose rate [mGy/day]') }
A_sol = data['sol']['sol']
source = ColumnDataSource(({'A_sol':A_sol, 'A_time': ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['dose_E']['E'].index]),'E_dose [mGy/day]': data['dose_E']['E']*8.64*1e7}))
columns = [
TableColumn(field='E_dose [mGy/day]', title="E dose rate [mGy/day]"),
]
table = DataTable(
source=source,
columns=columns,
width=400,
height=600,
sortable=True,
selectable=True,
editable=True,
)
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
dose = ft.load_data((np.arange(start_sol,end_sol)),load='dose', force_timesort=True)
msk = ft.get_clean_mask(dose)
dose_E= pd.DataFrame(dose,columns = ['E'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
sol= pd.DataFrame(dose,columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
return({'dose_E': dose_E[msk], 'sol': sol[msk]})
class DoseTimeprofilePlotsB_E(PlotCollection):
#Plot Total ionising dose
group = 'Dose-RAD'
name = 'Dose-BE'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dose_BE': LinePlot(self, data['dose_BE']*8.64*1e7, title='RAD_BE measurements', ylabel='dose rate [mGy/day]')}
A_sol = data['sol']['sol']
source = ColumnDataSource(({'A_sol':A_sol, 'A_time': ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['dose_BE']['B'].index]),'B_dose[mGy/day][per day (i.e., 24 hours), not Sol]': data['dose_BE']['B']*8.64*1e7, 'E_dose[mGy/day][per day (i.e., 24 hours), not Sol]': data['dose_BE']['E']*8.64*1e7}))
columns = [
TableColumn(field='B_dose [mGy/day]', title="B dose rate [mGy/day]"),
TableColumn(field='E_dose [mGy/day]', title="E dose rate [mGy/day]"),
]
table = DataTable(
source=source,
columns=columns,
width=400,
height=600,
sortable=True,
selectable=True,
editable=True,
)
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
dose = ft.load_data((np.arange(start_sol,end_sol)),load='dose', force_timesort=True)
msk = ft.get_clean_mask(dose, high_dose_resolution=False)
dose_BE= pd.DataFrame(dose,columns = ['B','E'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol'])))
dose_BE.index.name = 'date'
#dose_E= pd.DataFrame(dose,columns = ['B'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
sol= pd.DataFrame(dose,columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
return({'dose_BE': dose_BE[msk], 'sol': sol[msk], 'start':start})
class Test_plot(PlotCollection):
name = 'Test Plot'
group = 'Test'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
# create two panels, each showing a line plot
return {
'foo': LinePlot(self, data['foo'],
title='Foo measurements',
ylabel='Temperature [°C]'),
#'bar': LinePlot(self, data['bar'],
# title='Bar measurements',
# ylabel='Voltage [V]'),
}
def load_data(self, start: dt.datetime, end: dt.datetime):
# just generate some example data for the plots:
return {
'foo': pd.DataFrame({
#'a': [2, 2],
'b': [2, 1]
}, index=pd.DatetimeIndex([start, end], name='date'))
#, 'bar': pd.DataFrame({
# 'x': [100, 222],
# 'y': [111, 150]
#}, index=pd.DatetimeIndex([start, end], name='date'))
}
"""
# savebutton = Button(label="Save", button_type="success")
# callback = CustomJS(
# args=dict(source_data=source),
# code="""
# var inds = source_data.selected.indices;
# var data = source_data.data;
# var out = "x, y, var inds, var data\\n";
# for (var i = 0; i < inds.length; i++) {
# out += data['X1'][inds[i]] + "," + data['Y1'][inds[i]] + "," + data['Y2'][inds[i]] +"\\n";
# }
# var file = new Blob([out], {type: 'text/plain'});
# var elem = window.document.createElement('a');
# elem.href = window.URL.createObjectURL(file);
# elem.download = 'selected-data.txt';
# document.body.appendChild(elem);
# elem.click();
# document.body.removeChild(elem);
# """,
# )
# savebutton.js_on_click(callback)
# layout = grid([table, savebutton], ncols=2)
# show(layout)
# out += data['X1'][inds[i]] + "," + data['Y1'][inds[i]] + "," + data['Y2'][inds[i]] +"\\n";
#

Binary file not shown.

Binary file not shown.

View file

@ -1,308 +0,0 @@
import datetime as dt
from abc import ABC, abstractmethod
from typing import Dict, Any
import numpy as np
import pandas as pd
import scipy.constants as const
from etspice import SOLO
from solo_loader.epd import EPDData
from solo_loader.exception import ConfigurationChangeError
from solo_loader.processing.velocity_dispersion import inv_velocity_spectrum, parker_spiral_length
from backend.plot import Plot, DynamicSpectrumPlot, LinePlot
from backend.plotcollection import PlotCollection
from plots.utils import handle_config_changes
class HETDynamicSpectrumPlots(PlotCollection, ABC):
"""
Plots an HET dynamic spectrum
"""
group = 'HET SCI'
titles = {
'sun': 'Sunward',
'asun': 'Anti-Sunward',
'north': 'Northward',
'south': 'Southward'
}
particles = {
'p': 'protons',
'e': 'electrons'
}
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plots = {}
for direction in data:
dfs = data[direction]
# share colorbar with sun plot
share_colorbar = plots['sun'] if direction != 'sun' else None
plots[direction] = DynamicSpectrumPlot(self,
dfs,
title=f'{self.titles[direction]} HET {self.particles[self.species]}',
data_label='diff. flux /\n(cm² sr s keV)⁻¹', logy=True,
share_colorbar=share_colorbar, ylabel='E / MeV')
return plots
def load_data(self, start, end):
epd = EPDData(start, end)
if self.nominal:
fun = lambda x: x.het.science.nominal
else:
fun = lambda x: x.het.science.low_latency
if self.species == 'e':
types = ['B', 'C'] # electrons are only available in "stopping in B" and "stopping in C" trigger
else:
types = ['B', 'C', 'P'] # stopping in B, stopping in C, penetrating
try:
data = [fun(epd)(self.species, t) for t in types]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
data = [fun(part)(self.species, t) for part in handle_config_changes(epd, e) for t in types]
fluxes = {}
for direction in ['sun', 'asun', 'north', 'south']:
dfs = [df[direction].to_flux(edges=True) for df in data]
for df in dfs:
df.columns = pd.IntervalIndex.from_arrays(df.columns.left / 1e3, df.columns.right / 1e3)
df.columns.name = "primary energy / MeV"
fluxes[direction] = dfs
return fluxes
@property
@abstractmethod
def species(self) -> str:
pass
@property
@abstractmethod
def nominal(self) -> bool:
pass
class HETDynamicSpectrumLLProtonPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (LL, Protons)'
nominal = False
species = 'p'
class HETDynamicSpectrumNominalProtonPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (NO, Protons)'
nominal = True
species = 'p'
class HETDynamicSpectrumLLElectronPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (LL, Electrons)'
nominal = False
species = 'e'
class HETDynamicSpectrumNominalElectronPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (NO, Electrons)'
nominal = True
species = 'e'
titles_p = {
'sun': 'Sunward HET protons',
'asun': 'Anti-Sunward HET protons',
'north': 'Northward HET protons',
'south': 'Southward HET protons'
}
titles_e = {
'sun': 'Sunward HET electrons',
'asun': 'Anti-Sunward HET electrons',
'north': 'Northward HET electrons',
'south': 'Southward HET electrons'
}
def columns_to_mev(df):
df.columns = ['{:.1f} MeV'.format(energy / 1e3) for energy in df.columns]
class HETTimeProfilesPlotsProtons(PlotCollection):
name = 'Time Profiles Proton'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
return {
key: LinePlot(self, data[key], title=titles_p[key], logy=True,
ylabel='diff. particle flux / (mm² sr s keV)⁻¹')
for key in data
}
def load_data(self, start: dt.datetime, end: dt.datetime):
epd = EPDData(start, end)
data = {}
try:
het_pB = [epd.het.science.low_latency('p', 'B')]
het_pC = [epd.het.science.low_latency('p', 'C')]
het_pP = [epd.het.science.low_latency('p', 'P')]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
het_pB = [part.het.science.low_latency('p', 'B') for part in handle_config_changes(epd, e)]
het_pC = [part.het.science.low_latency('p', 'C') for part in handle_config_changes(epd, e)]
het_pP = [part.het.science.low_latency('p', 'P') for part in handle_config_changes(epd, e)]
for direction in ['sun', 'asun', 'north', 'south']:
p_pds = pd.concat([
pd.concat([b[direction].to_flux(),
c[direction].to_flux(),
p[direction].to_flux()], axis=1)
for b, c, p in zip(het_pB, het_pC, het_pP)]).resample('30min').mean()
columns_to_mev(p_pds)
data[direction] = p_pds
return data
class HETTimeProfilesPlotsElectrons(PlotCollection):
name = 'Time Profiles Electron'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
return {
key: LinePlot(self, data[key], title=titles_e[key], logy=True,
ylabel='diff. particle flux / (mm² sr s keV)⁻¹')
for key in data
}
def load_data(self, start: dt.datetime, end: dt.datetime):
epd = EPDData(start, end)
data = {}
try:
het_eB = [epd.het.science.low_latency('e', 'B')]
het_eC = [epd.het.science.low_latency('e', 'C')]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
het_eB = [part.het.science.low_latency('e', 'B') for part in handle_config_changes(epd, e)]
het_eC = [part.het.science.low_latency('e', 'C') for part in handle_config_changes(epd, e)]
for direction in ['sun', 'asun', 'north', 'south']:
p_pds = pd.concat([
pd.concat([b[direction].to_flux(),
c[direction].to_flux()], axis=1)
for b, c in zip(het_eB, het_eC)]).resample('30min').mean()
columns_to_mev(p_pds)
data[direction] = p_pds
return data
class HETVelocityDispersion(PlotCollection):
name = 'Velocity Dispersion'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plots = {}
titles = {
'sun protons': 'Sunward Pointing Detector, protons',
'asun protons': 'Anti-Sunward Pointing Detector, protons',
'north protons': 'Northward Pointing Detector, protons',
'south protons': 'Southward Pointing Detector, protons',
'sun electrons': 'Sunward Pointing Detector, electrons',
'asun electrons': 'Anti-Sunward Pointing Detector, electrons',
'north electrons': 'Northward Pointing Detector, electrons',
'south electrons': 'Southward Pointing Detector, electrons'
}
for key in data:
# share colorbar with sun plot
share_colorbar = plots['sun ' + key.split(' ')[1]] if not key.startswith('sun') else None
plots[key] = DynamicSpectrumPlot(self, data[key], title=titles[key],
share_colorbar=share_colorbar, data_label='flux / counts/(s mm² sr keV)',
ylabel='1/v / h/AU')
# for pos in np.linspace(0, 1, 5):
# plot diagonal lines
# plot_diagonal(ax, (pos, 0), slope, color='k')
return plots # and diagonals
def load_data(self, start, end):
epd = EPDData(start, end)
het = epd.het
try:
dfs_p_B = [het.science.low_latency('p', 'B')]
dfs_p_C = [het.science.low_latency('p', 'C')]
dfs_p_P = [het.science.low_latency('p', 'P')]
dfs_e_B = [het.science.low_latency('e', 'B')]
dfs_e_C = [het.science.low_latency('e', 'C')]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
dfs_p_B = [part.ept.science.low_latency('p', 'B') for part in handle_config_changes(epd, e)]
dfs_p_C = [part.ept.science.low_latency('p', 'C') for part in handle_config_changes(epd, e)]
dfs_p_P = [part.ept.science.low_latency('p', 'P') for part in handle_config_changes(epd, e)]
dfs_e_B = [part.ept.science.low_latency('e', 'B') for part in handle_config_changes(epd, e)]
dfs_e_C = [part.ept.science.low_latency('e', 'C') for part in handle_config_changes(epd, e)]
data = {}
for direction in ['sun', 'asun', 'north', 'south']:
data[f'{direction} protons'] = pd.concat([
pd.concat([inv_velocity_spectrum(b[direction].to_flux(edges=True), mass=const.m_p),
inv_velocity_spectrum(c[direction].to_flux(edges=True), mass=const.m_p),
inv_velocity_spectrum(p[direction].to_flux(edges=True), mass=const.m_p)], axis=1)
for b, c, p in zip(dfs_p_B, dfs_p_C, dfs_p_P)]).resample('30min').mean()
data[f'{direction} electrons'] = pd.concat([
pd.concat([inv_velocity_spectrum(b[direction].to_flux(edges=True), mass=const.m_e),
inv_velocity_spectrum(c[direction].to_flux(edges=True), mass=const.m_e)])
for b, c in zip(dfs_e_B, dfs_e_C)]).resample('30min').mean()
# get the position of SolO referred to the sun
position = SOLO.position((end - start) / 2 + start)
# get the total distance from SolO to the sun
distance = np.linalg.norm(position)
# convert from km to au
distance = distance / const.au * 1e3
# get the distance along the parker spiral
slope = parker_spiral_length(distance, v_sw=400)
# multiply by 24 because matplotlib represents dates as days since epoch, not hours
slope *= 24
return data
class HETGCRPlots(PlotCollection):
name = 'GCR counters'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
return {
'c_counters': LinePlot(self, data['c_counters'],
title=f'HET C detector counters (isotropic GCR ≳ 15 MeV/nuc)', ylabel='counts / s'),
'gcr_channel': LinePlot(self, data['gcr_channel'],
title=f'HET GCR channel (BCB penetrating GCR ≳ 100 MeV/nuc)', ylabel='counts / s')
}
def load_data(self, start: dt.datetime, end: dt.datetime):
# sum all C high gain and all C low gain channels
c_counters = pd.DataFrame({
channel: pd.concat([
EPDData(start, end).het.housekeeping.l1_counters(unit)[[f'C1{channel}', f'C2{channel}']]
.sum(axis=1).resample('10min').sum(min_count=1) / 600
for unit in ['he1', 'he2']
], axis=1).sum(axis=1, skipna=False)
for channel in ['H', 'L']
})
# GCR channel
gcr = EPDData(start, end).het.science.gcr()
gcr = pd.DataFrame({
key: gcr[key].sum(axis=1, skipna=False).resample('60min').sum(min_count=1) / 3600
for key in gcr
})
for key in gcr:
gcr[f'{key} (smoothed)'] = gcr[key].rolling(10, center=True).mean()
return {
'c_counters': c_counters,
'gcr_channel': gcr
}