Compare commits

...

10 commits

Author SHA1 Message Date
Stephan I. Böttcher
84d3b98a1a add .gitignore and remove */__pycache__ 2024-12-02 15:51:47 +01:00
b2dab2d2db Replace selector.py 2022-12-14 18:58:01 +00:00
6530b914e9 Update het_science.py 2022-12-14 18:49:43 +00:00
6fc60ba748 Counters.py 2022-12-14 18:31:57 +00:00
f216b3f662 Replace Dose.py 2022-10-04 08:58:18 +00:00
5f7eb71a7c Replace Dose.py 2022-09-19 09:40:58 +00:00
88e904fc80 Replace selector.py 2022-09-19 09:39:33 +00:00
9e1766b30b Replace plotcollection.py 2022-09-19 09:38:49 +00:00
92517157bd Replace README.md 2022-08-12 11:18:56 +00:00
c698ebc8f6 Replace README.md 2022-08-12 11:17:44 +00:00
30 changed files with 347 additions and 371 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
__pycache__
.pid*

View file

@ -4,7 +4,11 @@ RAD Webserver
A web service to view quicklook plots of MSL RAD data. Built using [Bokeh](https://bokeh.org/) and the
[RAD Loader](https://gitlab.physik.uni-kiel.de/mslrad/flight_tools).
The current version of the app is running at https://solar-orbiter.physik.uni-kiel.de/mslrad/plotter.
The current version of the app is running at https://solar-orbiter.physik.uni-kiel.de/rad/plotter/.
The dose rate [mGy/day] in the B and E detectors are per day (i.e., 24 hours), not Sol.
The absorbed dose in the silicon B detector is not subtracted by the background contribution by the rovers radioisotope thermoelectric generator (RTG) because it changes during the time.
How to set up
-------------

View file

@ -134,8 +134,9 @@ class PlotCollection(ABC):
self._native = True
return df
else:
self._native = False
return df.resample(f'{self._tres}S').mean()
self._native = True
return df
#return df.resample(f'{self._tres}S').mean()
@property
def is_native_tres(self):

View file

@ -17,6 +17,7 @@ from markupsafe import Markup
#from plots.step_science import *
from plots.Dose import *
from plots.Temp import *
from plots.Counters import *
from plots.LET import *
@ -34,7 +35,7 @@ import MSL.flight_tools as ft
plot_types = [
DoseTimeprofilePlotsB, DoseTimeprofilePlotsE, DoseTimeprofilePlotsB_E
,Test_plot, LETPlots,TempTimeprofilePlots,LETPlots_A1B,LETPlots_A2B
, LETPlots,TempTimeprofilePlots,LETPlots_A1B,LETPlots_A2B , CountersTimeprofilePlots_4,CountersTimeprofilePlots_5,CountersTimeprofilePlots_15
]

325
plots/Counters.py Normal file
View file

@ -0,0 +1,325 @@
import datetime as dt
from abc import ABC, abstractmethod
from typing import Dict, Any
import datetime as dt
import pandas as pd
import scipy.constants as const
import numpy as np
import scipy.constants as const
from time import *
import time
import MSL.flight_tools as ft
import MSL.LET_tools as lt
import MSL.time_tools as Mtt
#import js2py
from backend.plot import Plot, DynamicSpectrumPlot, LinePlot,AccumulatedSpectrumPlot
from backend.plotcollection import PlotCollection
from bokeh.io import show
import os
from bokeh.models import ColumnDataSource, HoverTool, Range1d, Label,CustomJS , LogColorMapper, ColorBar, LogTicker, \
LogTickFormatter, Title, Whisker
from bokeh.events import ButtonClick # for saving data
from bokeh.models.widgets import DataTable, DateFormatter, TableColumn
from bokeh.layouts import grid
from bokeh.models import Button # for saving data
from bokeh.plotting import output_notebook
output_notebook()
class CountersTimeprofilePlots_4(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD_Counters'
name = 'Counters(4)'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dcounters_4': LinePlot(self, data['counters_4'], title='Counter data (l2mc[4])', ylabel='l2mc[4](count rate)') }
t = ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['counters_4'].index])
#print(int(Mtt.posix_to_sol(Mtt.dt_to_posix(t))))
A_sol = data['sol']['sol']
source = ColumnDataSource({'A_sol':A_sol, 'A_time': t ,'Count_rate_(l2mc[4])': data['counters_4'] })
#source = ColumnDataSource(df)
#source = ColumnDataSource(data=dict(x=data['dose_B']['B'].index, y=data['dose_B']['B'].value ))
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
counters = ft.load_data((np.arange(start_sol,end_sol)),load='counters')
msk = ft.get_clean_mask(counters)
counters_= pd.DataFrame(counters['l2mc'],columns = [4],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
accTime_T= pd.DataFrame(counters,columns = ['accTime_T'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
frames = [counters_, accTime_T]
result = pd.concat(frames, axis=1)
result["L2_[4]"] = result[4]/result['accTime_T']
#print(result)
counters_4= pd.DataFrame(result, columns = ["L2_[4]"],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
#print(counters_4)
sol= pd.DataFrame(counters['l2mc'],columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
return({'counters_4': counters_4[msk], 'sol': sol[msk]})
class CountersTimeprofilePlots_5(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD_Counters'
name = 'Counters(5)'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dcounters_5': LinePlot(self, data['counters_5'], title='Counter data (l2mc[5])', ylabel='l2mc[5](count rate)') }
t = ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['counters_5'].index])
#print(int(Mtt.posix_to_sol(Mtt.dt_to_posix(t))))
A_sol = data['sol']['sol']
source = ColumnDataSource({'A_sol':A_sol, 'A_time': t ,'Count_rate_(l2mc[5])': data['counters_5'] })
#source = ColumnDataSource(df)
#source = ColumnDataSource(data=dict(x=data['dose_B']['B'].index, y=data['dose_B']['B'].value ))
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
counters = ft.load_data((np.arange(start_sol,end_sol)),load='counters')
msk = ft.get_clean_mask(counters)
counters_= pd.DataFrame(counters['l2mc'],columns = [5],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
accTime_T= pd.DataFrame(counters,columns = ['accTime_T'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
frames = [counters_, accTime_T]
result = pd.concat(frames, axis=1)
result["L2_[5]"] = result[5]/result['accTime_T']
#print(result)
counters_5= pd.DataFrame(result, columns = ["L2_[5]"],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
#print(counters_4)
sol= pd.DataFrame(counters['l2mc'],columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
return({'counters_5': counters_5[msk], 'sol': sol[msk]})
class CountersTimeprofilePlots_15(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD_Counters'
name = 'Counters(15)'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dcounters_15': LinePlot(self, data['counters_15'], title='Counter data (l2mc[15])', ylabel='l2mc[15](count rate)') }
t = ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['counters_15'].index])
#print(int(Mtt.posix_to_sol(Mtt.dt_to_posix(t))))
A_sol = data['sol']['sol']
source = ColumnDataSource({'A_sol':A_sol, 'A_time': t ,'Count_rate_(l2mc[15])': data['counters_15'] })
#source = ColumnDataSource(df)
#source = ColumnDataSource(data=dict(x=data['dose_B']['B'].index, y=data['dose_B']['B'].value ))
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
counters = ft.load_data((np.arange(start_sol,end_sol)),load='counters')
msk = ft.get_clean_mask(counters)
counters_= pd.DataFrame(counters['l2mc'],columns = [15],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
accTime_T= pd.DataFrame(counters,columns = ['accTime_T'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
frames = [counters_, accTime_T]
result = pd.concat(frames, axis=1)
result["L2_[15]"] = result[15]/result['accTime_T']
#print(result)
counters_15= pd.DataFrame(result, columns = ["L2_[15]"],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
#print(counters_4)
sol= pd.DataFrame(counters['l2mc'],columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(counters['sol']),name ='date'))
return({'counters_15': counters_15[msk], 'sol': sol[msk]})
"""
class DoseTimeprofilePlotsE(PlotCollection):
#Plot Total ionising dose
group = 'Dose-RAD'
name = 'Dose-E'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dose_E': LinePlot(self, data['dose_E']*8.64*1e7, title='RAD_E measurements', ylabel='dose rate [mGy/day]') }
A_sol = data['sol']['sol']
source = ColumnDataSource(({'A_sol':A_sol, 'A_time': ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['dose_E']['E'].index]),'E_dose [mGy/day]': data['dose_E']['E']*8.64*1e7}))
columns = [
TableColumn(field='E_dose [mGy/day]', title="E dose rate [mGy/day]"),
]
table = DataTable(
source=source,
columns=columns,
width=400,
height=600,
sortable=True,
selectable=True,
editable=True,
)
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
dose = ft.load_data((np.arange(start_sol,end_sol)),load='dose', force_timesort=True)
msk = ft.get_clean_mask(dose)
dose_E= pd.DataFrame(dose,columns = ['E'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
sol= pd.DataFrame(dose,columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
return({'dose_E': dose_E[msk], 'sol': sol[msk]})
class DoseTimeprofilePlotsB_E(PlotCollection):
#Plot Total ionising dose
group = 'Dose-RAD'
name = 'Dose-BE'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plot = {'dose_BE': LinePlot(self, data['dose_BE']*8.64*1e7, title='RAD_BE measurements', ylabel='dose rate [mGy/day]')}
A_sol = data['sol']['sol']
source = ColumnDataSource(({'A_sol':A_sol, 'A_time': ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['dose_BE']['B'].index]),'B_dose[mGy/day][per day (i.e., 24 hours), not Sol]': data['dose_BE']['B']*8.64*1e7, 'E_dose[mGy/day][per day (i.e., 24 hours), not Sol]': data['dose_BE']['E']*8.64*1e7}))
columns = [
TableColumn(field='B_dose [mGy/day]', title="B dose rate [mGy/day]"),
TableColumn(field='E_dose [mGy/day]', title="E dose rate [mGy/day]"),
]
table = DataTable(
source=source,
columns=columns,
width=400,
height=600,
sortable=True,
selectable=True,
editable=True,
)
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
dose = ft.load_data((np.arange(start_sol,end_sol)),load='dose', force_timesort=True)
msk = ft.get_clean_mask(dose, high_dose_resolution=False)
dose_BE= pd.DataFrame(dose,columns = ['B','E'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol'])))
dose_BE.index.name = 'date'
#dose_E= pd.DataFrame(dose,columns = ['B'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
sol= pd.DataFrame(dose,columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
return({'dose_BE': dose_BE[msk], 'sol': sol[msk], 'start':start})
class Test_plot(PlotCollection):
name = 'Test Plot'
group = 'Test'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
# create two panels, each showing a line plot
return {
'foo': LinePlot(self, data['foo'],
title='Foo measurements',
ylabel='Temperature [°C]'),
#'bar': LinePlot(self, data['bar'],
# title='Bar measurements',
# ylabel='Voltage [V]'),
}
def load_data(self, start: dt.datetime, end: dt.datetime):
# just generate some example data for the plots:
return {
'foo': pd.DataFrame({
#'a': [2, 2],
'b': [2, 1]
}, index=pd.DatetimeIndex([start, end], name='date'))
#, 'bar': pd.DataFrame({
# 'x': [100, 222],
# 'y': [111, 150]
#}, index=pd.DatetimeIndex([start, end], name='date'))
}
"""
# savebutton = Button(label="Save", button_type="success")
# callback = CustomJS(
# args=dict(source_data=source),
# code="""
# var inds = source_data.selected.indices;
# var data = source_data.data;
# var out = "x, y, var inds, var data\\n";
# for (var i = 0; i < inds.length; i++) {
# out += data['X1'][inds[i]] + "," + data['Y1'][inds[i]] + "," + data['Y2'][inds[i]] +"\\n";
# }
# var file = new Blob([out], {type: 'text/plain'});
# var elem = window.document.createElement('a');
# elem.href = window.URL.createObjectURL(file);
# elem.download = 'selected-data.txt';
# document.body.appendChild(elem);
# elem.click();
# document.body.removeChild(elem);
# """,
# )
# savebutton.js_on_click(callback)
# layout = grid([table, savebutton], ncols=2)
# show(layout)
# out += data['X1'][inds[i]] + "," + data['Y1'][inds[i]] + "," + data['Y2'][inds[i]] +"\\n";
#

View file

@ -32,7 +32,7 @@ class DoseTimeprofilePlotsB(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD-Dose'
group = 'Dose-RAD'
name = 'Dose-B'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
@ -41,64 +41,23 @@ class DoseTimeprofilePlotsB(PlotCollection):
t = ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['dose_B']['B'].index])
#print(int(Mtt.posix_to_sol(Mtt.dt_to_posix(t))))
A_sol = data['sol']['sol']
#print(A_sol)
#, 'A_sol':data['sol']
source = ColumnDataSource({'A_sol':A_sol, 'A_time': t ,'B_dose [mGy/day]': data['dose_B']['B']*8.64*1e7 })
#source = ColumnDataSource(df)
#source = ColumnDataSource(data=dict(x=data['dose_B']['B'].index, y=data['dose_B']['B'].value ))
#print('here',source.data)
#print(source.data)
button = Button(label="Download", button_type="success")
button.js_on_click(CustomJS(args=dict(source=source),code=open(os.path.join(os.path.dirname(__file__),"download.js")).read()))
show(button)
return(plot)
'''
button.js_on_event(ButtonClick, CustomJS(
args=dict(source_data=source),
code="""
var inds = source_data.selected.indices;
var data = source_data.data;
var nrows = source_data.get_length()
var out = "x, y\\n";
for (var i = 0; i < nrows; i++) {
out += data['dose_B']['B'].index + "," + data['dose_B']['B']*8.64*1e7 + "\\n";
}
var file = new Blob([out], {type: 'text/plain'});
var elem = window.document.createElement('a');
elem.href = window.URL.createObjectURL(file);
elem.download = 'selected-data.txt';
document.body.appendChild(elem);
elem.click();
document.body.removeChild(elem);
"""
)
)
'''
"""
def create_plots(self, data):
#plot = {'Dose-B': plt.plot(data['E'])}
print(data['dose_B'])
#temp = pd.DataFrame(data['Frontal'])
plot = {'dose_B': LinePlot(data['time'],data['dose_B']* 1e6 * 3600*24, title='Total Dose', logy=True,ylabel=r"Total Ionizing Dose[μGy hour⁻¹]")}
#plot = {'dose_B': plt.plot(data['time'],data['dose_B']* 1e6 * 3600*24)}
#print(data['B'])
return(plot)
"""
def load_data(self, start: dt.datetime, end: dt.datetime):
#dose = LNDData(start, end).xmas.dose.to_frame()*1e6
#print('here',start)
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
#print('I am here', start_sol, end_sol)
dose = ft.load_data((np.arange(start_sol,end_sol)),load='dose')
msk = ft.get_clean_mask(dose)
#print(msk)
#print("Now here")
dose_B= pd.DataFrame(dose,columns = ['B'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
sol= pd.DataFrame(dose,columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
@ -108,7 +67,7 @@ class DoseTimeprofilePlotsE(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD-Dose'
group = 'Dose-RAD'
name = 'Dose-E'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
@ -140,11 +99,9 @@ class DoseTimeprofilePlotsE(PlotCollection):
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
#print('I am here', start_sol, end_sol)
dose = ft.load_data((np.arange(start_sol,end_sol)),load='dose', force_timesort=True)
msk = ft.get_clean_mask(dose)
#print("Now here")
dose_E= pd.DataFrame(dose,columns = ['E'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
sol= pd.DataFrame(dose,columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
@ -155,15 +112,12 @@ class DoseTimeprofilePlotsB_E(PlotCollection):
"""
Plot Total ionising dose
"""
group = 'RAD-Dose'
group = 'Dose-RAD'
name = 'Dose-BE'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
#print(data)
plot = {'dose_BE': LinePlot(self, data['dose_BE']*8.64*1e7, title='RAD_BE measurements', ylabel='dose rate [mGy/day]')}
A_sol = data['sol']['sol']
source = ColumnDataSource(({'A_sol':A_sol, 'A_time': ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['dose_BE']['B'].index]),'B_dose [mGy/day]': data['dose_BE']['B']*8.64*1e7, 'E_dose [mGy/day]': data['dose_BE']['E']*8.64*1e7}))
source = ColumnDataSource(({'A_sol':A_sol, 'A_time': ([x.strftime("%Y-%m-%d %H:%M:%S")for x in data['dose_BE']['B'].index]),'B_dose[mGy/day][per day (i.e., 24 hours), not Sol]': data['dose_BE']['B']*8.64*1e7, 'E_dose[mGy/day][per day (i.e., 24 hours), not Sol]': data['dose_BE']['E']*8.64*1e7}))
columns = [
TableColumn(field='B_dose [mGy/day]', title="B dose rate [mGy/day]"),
TableColumn(field='E_dose [mGy/day]', title="E dose rate [mGy/day]"),
@ -192,18 +146,15 @@ class DoseTimeprofilePlotsB_E(PlotCollection):
start_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(start)))
end_sol = int(Mtt.posix_to_sol(Mtt.dt_to_posix(end)))
#print('I am here', start_sol, end_sol)
dose = ft.load_data((np.arange(start_sol,end_sol)),load='dose', force_timesort=True)
msk = ft.get_clean_mask(dose)
#print(msk)
msk = ft.get_clean_mask(dose, high_dose_resolution=False)
#print("Now here")
dose_BE= pd.DataFrame(dose,columns = ['B','E'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol'])))
dose_BE.index.name = 'date'
#dose_E= pd.DataFrame(dose,columns = ['B'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
sol= pd.DataFrame(dose,columns = ['sol'],index=pd.DatetimeIndex(Mtt.sol2datetime(dose['sol']),name ='date'))
return({'dose_BE': dose_BE[msk], 'sol': sol[msk]})
return({'dose_BE': dose_BE[msk], 'sol': sol[msk], 'start':start})
class Test_plot(PlotCollection):

Binary file not shown.

Binary file not shown.

View file

@ -1,308 +0,0 @@
import datetime as dt
from abc import ABC, abstractmethod
from typing import Dict, Any
import numpy as np
import pandas as pd
import scipy.constants as const
from etspice import SOLO
from solo_loader.epd import EPDData
from solo_loader.exception import ConfigurationChangeError
from solo_loader.processing.velocity_dispersion import inv_velocity_spectrum, parker_spiral_length
from backend.plot import Plot, DynamicSpectrumPlot, LinePlot
from backend.plotcollection import PlotCollection
from plots.utils import handle_config_changes
class HETDynamicSpectrumPlots(PlotCollection, ABC):
"""
Plots an HET dynamic spectrum
"""
group = 'HET SCI'
titles = {
'sun': 'Sunward',
'asun': 'Anti-Sunward',
'north': 'Northward',
'south': 'Southward'
}
particles = {
'p': 'protons',
'e': 'electrons'
}
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plots = {}
for direction in data:
dfs = data[direction]
# share colorbar with sun plot
share_colorbar = plots['sun'] if direction != 'sun' else None
plots[direction] = DynamicSpectrumPlot(self,
dfs,
title=f'{self.titles[direction]} HET {self.particles[self.species]}',
data_label='diff. flux /\n(cm² sr s keV)⁻¹', logy=True,
share_colorbar=share_colorbar, ylabel='E / MeV')
return plots
def load_data(self, start, end):
epd = EPDData(start, end)
if self.nominal:
fun = lambda x: x.het.science.nominal
else:
fun = lambda x: x.het.science.low_latency
if self.species == 'e':
types = ['B', 'C'] # electrons are only available in "stopping in B" and "stopping in C" trigger
else:
types = ['B', 'C', 'P'] # stopping in B, stopping in C, penetrating
try:
data = [fun(epd)(self.species, t) for t in types]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
data = [fun(part)(self.species, t) for part in handle_config_changes(epd, e) for t in types]
fluxes = {}
for direction in ['sun', 'asun', 'north', 'south']:
dfs = [df[direction].to_flux(edges=True) for df in data]
for df in dfs:
df.columns = pd.IntervalIndex.from_arrays(df.columns.left / 1e3, df.columns.right / 1e3)
df.columns.name = "primary energy / MeV"
fluxes[direction] = dfs
return fluxes
@property
@abstractmethod
def species(self) -> str:
pass
@property
@abstractmethod
def nominal(self) -> bool:
pass
class HETDynamicSpectrumLLProtonPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (LL, Protons)'
nominal = False
species = 'p'
class HETDynamicSpectrumNominalProtonPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (NO, Protons)'
nominal = True
species = 'p'
class HETDynamicSpectrumLLElectronPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (LL, Electrons)'
nominal = False
species = 'e'
class HETDynamicSpectrumNominalElectronPlots(HETDynamicSpectrumPlots):
name = 'Dynamic Spectrum (NO, Electrons)'
nominal = True
species = 'e'
titles_p = {
'sun': 'Sunward HET protons',
'asun': 'Anti-Sunward HET protons',
'north': 'Northward HET protons',
'south': 'Southward HET protons'
}
titles_e = {
'sun': 'Sunward HET electrons',
'asun': 'Anti-Sunward HET electrons',
'north': 'Northward HET electrons',
'south': 'Southward HET electrons'
}
def columns_to_mev(df):
df.columns = ['{:.1f} MeV'.format(energy / 1e3) for energy in df.columns]
class HETTimeProfilesPlotsProtons(PlotCollection):
name = 'Time Profiles Proton'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
return {
key: LinePlot(self, data[key], title=titles_p[key], logy=True,
ylabel='diff. particle flux / (mm² sr s keV)⁻¹')
for key in data
}
def load_data(self, start: dt.datetime, end: dt.datetime):
epd = EPDData(start, end)
data = {}
try:
het_pB = [epd.het.science.low_latency('p', 'B')]
het_pC = [epd.het.science.low_latency('p', 'C')]
het_pP = [epd.het.science.low_latency('p', 'P')]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
het_pB = [part.het.science.low_latency('p', 'B') for part in handle_config_changes(epd, e)]
het_pC = [part.het.science.low_latency('p', 'C') for part in handle_config_changes(epd, e)]
het_pP = [part.het.science.low_latency('p', 'P') for part in handle_config_changes(epd, e)]
for direction in ['sun', 'asun', 'north', 'south']:
p_pds = pd.concat([
pd.concat([b[direction].to_flux(),
c[direction].to_flux(),
p[direction].to_flux()], axis=1)
for b, c, p in zip(het_pB, het_pC, het_pP)]).resample('30min').mean()
columns_to_mev(p_pds)
data[direction] = p_pds
return data
class HETTimeProfilesPlotsElectrons(PlotCollection):
name = 'Time Profiles Electron'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
return {
key: LinePlot(self, data[key], title=titles_e[key], logy=True,
ylabel='diff. particle flux / (mm² sr s keV)⁻¹')
for key in data
}
def load_data(self, start: dt.datetime, end: dt.datetime):
epd = EPDData(start, end)
data = {}
try:
het_eB = [epd.het.science.low_latency('e', 'B')]
het_eC = [epd.het.science.low_latency('e', 'C')]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
het_eB = [part.het.science.low_latency('e', 'B') for part in handle_config_changes(epd, e)]
het_eC = [part.het.science.low_latency('e', 'C') for part in handle_config_changes(epd, e)]
for direction in ['sun', 'asun', 'north', 'south']:
p_pds = pd.concat([
pd.concat([b[direction].to_flux(),
c[direction].to_flux()], axis=1)
for b, c in zip(het_eB, het_eC)]).resample('30min').mean()
columns_to_mev(p_pds)
data[direction] = p_pds
return data
class HETVelocityDispersion(PlotCollection):
name = 'Velocity Dispersion'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
plots = {}
titles = {
'sun protons': 'Sunward Pointing Detector, protons',
'asun protons': 'Anti-Sunward Pointing Detector, protons',
'north protons': 'Northward Pointing Detector, protons',
'south protons': 'Southward Pointing Detector, protons',
'sun electrons': 'Sunward Pointing Detector, electrons',
'asun electrons': 'Anti-Sunward Pointing Detector, electrons',
'north electrons': 'Northward Pointing Detector, electrons',
'south electrons': 'Southward Pointing Detector, electrons'
}
for key in data:
# share colorbar with sun plot
share_colorbar = plots['sun ' + key.split(' ')[1]] if not key.startswith('sun') else None
plots[key] = DynamicSpectrumPlot(self, data[key], title=titles[key],
share_colorbar=share_colorbar, data_label='flux / counts/(s mm² sr keV)',
ylabel='1/v / h/AU')
# for pos in np.linspace(0, 1, 5):
# plot diagonal lines
# plot_diagonal(ax, (pos, 0), slope, color='k')
return plots # and diagonals
def load_data(self, start, end):
epd = EPDData(start, end)
het = epd.het
try:
dfs_p_B = [het.science.low_latency('p', 'B')]
dfs_p_C = [het.science.low_latency('p', 'C')]
dfs_p_P = [het.science.low_latency('p', 'P')]
dfs_e_B = [het.science.low_latency('e', 'B')]
dfs_e_C = [het.science.low_latency('e', 'C')]
except ConfigurationChangeError as e:
# split loading of data between configuration changes
dfs_p_B = [part.ept.science.low_latency('p', 'B') for part in handle_config_changes(epd, e)]
dfs_p_C = [part.ept.science.low_latency('p', 'C') for part in handle_config_changes(epd, e)]
dfs_p_P = [part.ept.science.low_latency('p', 'P') for part in handle_config_changes(epd, e)]
dfs_e_B = [part.ept.science.low_latency('e', 'B') for part in handle_config_changes(epd, e)]
dfs_e_C = [part.ept.science.low_latency('e', 'C') for part in handle_config_changes(epd, e)]
data = {}
for direction in ['sun', 'asun', 'north', 'south']:
data[f'{direction} protons'] = pd.concat([
pd.concat([inv_velocity_spectrum(b[direction].to_flux(edges=True), mass=const.m_p),
inv_velocity_spectrum(c[direction].to_flux(edges=True), mass=const.m_p),
inv_velocity_spectrum(p[direction].to_flux(edges=True), mass=const.m_p)], axis=1)
for b, c, p in zip(dfs_p_B, dfs_p_C, dfs_p_P)]).resample('30min').mean()
data[f'{direction} electrons'] = pd.concat([
pd.concat([inv_velocity_spectrum(b[direction].to_flux(edges=True), mass=const.m_e),
inv_velocity_spectrum(c[direction].to_flux(edges=True), mass=const.m_e)])
for b, c in zip(dfs_e_B, dfs_e_C)]).resample('30min').mean()
# get the position of SolO referred to the sun
position = SOLO.position((end - start) / 2 + start)
# get the total distance from SolO to the sun
distance = np.linalg.norm(position)
# convert from km to au
distance = distance / const.au * 1e3
# get the distance along the parker spiral
slope = parker_spiral_length(distance, v_sw=400)
# multiply by 24 because matplotlib represents dates as days since epoch, not hours
slope *= 24
return data
class HETGCRPlots(PlotCollection):
name = 'GCR counters'
group = 'HET SCI'
def create_plots(self, data: Dict[str, Any]) -> Dict[str, Plot]:
return {
'c_counters': LinePlot(self, data['c_counters'],
title=f'HET C detector counters (isotropic GCR ≳ 15 MeV/nuc)', ylabel='counts / s'),
'gcr_channel': LinePlot(self, data['gcr_channel'],
title=f'HET GCR channel (BCB penetrating GCR ≳ 100 MeV/nuc)', ylabel='counts / s')
}
def load_data(self, start: dt.datetime, end: dt.datetime):
# sum all C high gain and all C low gain channels
c_counters = pd.DataFrame({
channel: pd.concat([
EPDData(start, end).het.housekeeping.l1_counters(unit)[[f'C1{channel}', f'C2{channel}']]
.sum(axis=1).resample('10min').sum(min_count=1) / 600
for unit in ['he1', 'he2']
], axis=1).sum(axis=1, skipna=False)
for channel in ['H', 'L']
})
# GCR channel
gcr = EPDData(start, end).het.science.gcr()
gcr = pd.DataFrame({
key: gcr[key].sum(axis=1, skipna=False).resample('60min').sum(min_count=1) / 3600
for key in gcr
})
for key in gcr:
gcr[f'{key} (smoothed)'] = gcr[key].rolling(10, center=True).mean()
return {
'c_counters': c_counters,
'gcr_channel': gcr
}