Compare commits

..

No commits in common. "master" and "kiruna-consti" have entirely different histories.

16 changed files with 8 additions and 3977 deletions

View file

@ -1,261 +0,0 @@
#! /usr/bin/python3
import matplotlib.pyplot as plt
import numpy as np
import scipy.optimize as opt
from numpy import random as rd
import scipy.optimize as opt
import argparse
import sys
import struct
from datetime import datetime
parser = argparse.ArgumentParser(prog=sys.argv[0], description="Searches stdin or FILE line by line for P, H and HDORN lines")
parser.add_argument('-f', '--file')
args = parser.parse_args()
global fp
#global plot_flag
#global live
fp = args.file
#plot_flag = args.plot
#live = args.live
syear = int(input("Startyear: "))
smonth = int(input("Startmonth: "))
sday = int(input("Startday: "))
shour = int(input("Starthour: "))
smin = int(input("Startmin: "))
eyears = input("Endyeadr (leave empty to use start value): ")
if eyears:
eyear = int(eyears)
else:
eyear = syear
emonths = input("Endmonth (leave empty to use start value): ")
if emonths:
emonth = int(emonths)
else:
emonth = smonth
edays = input("Endday (leave empty to use start value): ")
if edays:
eday = int(edays)
else:
eday = sday
ehours = input("Endhour (leave empty to use start value): ")
if ehours:
ehour = int(ehours)
else:
ehour = shour
emins = input("Endminute (leave empty to use start value): ")
if emins:
emin = int(emins)
else:
emin = smin
starttime = int(datetime(syear, smonth, sday, shour, smin).strftime('%s'))
endtime = int(datetime(eyear, emonth, eday, ehour, emin).strftime('%s'))
#starttime = int(input("Starttime in unix time:"))
#endtime = int(input("Endtime in unix time:"))
col = int(input("Coloumn"))
#xs, ys = np.loadtxt("/home/constantin/user/Constantin/Universität/BEXUS/Data_Analysis/SETH/2025-10-04-inside-37e.hk", usecols=(0,col),unpack=True)
xs, ys = np.loadtxt(fp, usecols=(0,col),unpack=True)
#xs2, m3, m4 = np.loadtxt("Data_two_goes_here.txt", usecols=(0,1,2),unpack=True)
while xs[0] < starttime:
xs = xs[1:]
ys = ys[1:]
i=0
while i< len(xs) and xs[i] < endtime:
i+=1
#xs = xs[:i+1]
#ys = ys[:i+1]
for i in range(len(xs)):
xs[i] = xs[i] - starttime
def f(x,a,b,c,d):
return a*(np.exp(x*b+d))+c
#Fit für f if needed
popt,pcov=opt.curve_fit(f,xs,ys,p0=[-1,-0.0001,43,2])
#popt,pcov=opt.curve_fit(f,xs,ys)
print(popt)
a,b,c,d=popt
#Data_preparation
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel("x_label/x_label_unit")
ax.set_ylabel("y_label/$y_label_unit_latex$")
#plt.plot(xs2,rs,'r+',label="Measurement")
plt.plot(xs,ys,label="linear connection")
plt.plot(xs,f(xs,a,b,c,d),'b',label="Modell")
plt.show()
#Transformation for secondary axis
#def d2r(x):
# return x*500
#def r2d(x):
# return x/500
#Second_plot
#fig = plt.figure()d2r
#ax = fig.add_subplot(111)
#ax.set_xlabel("frquency/Hz")
#ax2=ax.secondary_yaxis('right',functions=(r2d,d2r))
#ax2.set_ylabel("phase shift/$^\circ$")
#plt.plot(xs,ys*500,'b+',label="Measurement (Phaseshift)")
#plt.plot(xs,ys*500,label="linear connection (Phaseshift)")
##plt.plot(xs,f(xs,a,b,c),'b',label="Modell")
#plt.legend()
#plt.grid()
#plt.savefig("Filename.png", dpi=300)
#plt.pyplot.savefig("Muenzwurf_00{}.png".format(), dpi=300)
#plt.close
#for i in range(len(xs)):
# if xs[i] > 960:
# ys[i] = -ys[i]
#fig = plt.figure()
#ax = fig.add_subplot(111)
#ax.set_xlabel("frquence/hz")
#ax.set_ylabel("phase shift/$^\circ$")
#plt.plot(xs,ys,'r+',label="Measurement")
#plt.plot(xs,ys,label="linear connection")
##plt.plot(xs,f(xs,a,b,c),'b',label="Modell")
#plt.legend()
#plt.grid()
#plt.savefig("Phaseshift_pm.png", dpi=300)
##plt.pyplot.savefig("Muenzwurf_00{}.png".format(), dpi=300)
#plt.close
#throw_series()
exit()
#U_y_hat/U_x_hat*R_2
xmin = 0
xmax = 40 #* np.pi
num = 1024
xs = np.linspace(xmin, xmax, num=num)
# FFT demo
#ys0 = 2/5*np.sin(6*xs)
#ys1 = 3/5*np.sin(2*xs-0.5)
ys1 = np.sin(2*xs)
#ys11 = 4/5*np.sin(2*xs)
#ys2 = -np.sin(0.5*xs+1)
ys3 = np.sin(xs)
#fk2 = ys0 + ys1 + ys2# + ys3
fk2 = ys1 + ys3
fk = np.full(num, 0., dtype=complex)
fk[2] = 1.5
#fk = np.sin(xs, dtype=complex)
ys = np.fft.fft(fk2)
#f = plt.figure()
plt.figure(figsize=(12,8))
plt.plot(xs, ys.real, label="Realteil")
plt.plot(xs, ys.imag, label="Imaginärteil")
plt.legend()
plt.savefig("fft_result_eigen.png")
plt.close()
plt.figure(figsize=(12,8))
plt.plot(xs, fk2.real, label="Realteil")
plt.plot(xs, fk2.imag, label="Imaginärteil")
plt.legend()
plt.savefig("fft_input_eigen.png")
plt.close()
ys2 = np.fft.ifft(ys)
plt.figure(figsize=(12,8))
plt.plot(xs, ys2.real, label="Realteil")
plt.plot(xs, ys2.imag, label="Imaginärteil")
plt.legend()
plt.savefig("ifft_result_eigen.png")
plt.close()
exit()
# Plots
#Rumspielen
ys = np.sin(xs)
plt.plot(xs, ys)
#plt.savefig("our_first_plot.png")
plt.close()
ys = np.sin(2*xs + 1)
plt.plot(xs, ys)
#plt.savefig("our_second_plot.png")
plt.close()
ys = np.sin(2*xs + 1)
plt.plot(xs, ys)
ys = np.sin(2*xs)
plt.plot(xs, ys)
#plt.savefig("our_third_plot.png")
plt.close()
ys0 = 2/5*np.sin(6*xs)
ys1 = 3/5*np.sin(2*xs-0.5)
ys11 = 4/5*np.sin(2*xs)
ys2 = -0*np.sin(0.5*xs+1)
ys3 = 0*np.sin(xs)
fig, axs = plt.subplots(nrows=5, sharex=True, sharey=True, figsize=(5, 13))
axs[0].set_ylim([-2.2, 2.2])
axs[0].plot(xs, ys0)
axs[1].plot(xs, ys1, label="foo")
#axs[1].plot(xs, ys11, label="bar")
axs[2].plot(xs, ys2)
axs[3].plot(xs, ys3)
axs[4].plot(xs, ys0 + ys1 + ys2 + ys3, linewidth=4, color="maroon")
#axs[0].legend()
axs[1].legend()
#axs[2].legend()
#axs[3].legend()
axs[0].axhline(0, linestyle="--", color="k", linewidth=1, alpha=0.4)
axs[1].axhline(0, linestyle="--", color="k", linewidth=1, alpha=0.4)
axs[2].axhline(0, linestyle="--", color="k", linewidth=1, alpha=0.4)
axs[3].axhline(0, linestyle="--", color="k", linewidth=1, alpha=0.4)
plt.tight_layout()
plt.savefig("img_fourier_001.png")
plt.close()

465
MHA.py
View file

@ -1,465 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Nov 7 12:22:11 2025
@author: milanmarx
"""
import math
import datetime
import numpy as np
import argparse
import os
"""
Dieses skript lässt sich ausnahmsweise über die Komandozeile ausführen
wandelt eine AHA Datei in MHA datei um.
MHA ist immernoch ein Detektor = eine zeile aber mit housekeeping anhang wie UTC, temperatur bgo ...
"""
###############################################################################
# Temperaturberechnung von Nicolas aus seth_HK
###############################################################################
def degC(a):
"""
Rechnet ADC Wert vom NTC in Celsius um
"""
R1, R25, B25, res = 10e3, 10e3, 3940, 0x1000 # Vorwiderstand, Widerstand bei 25 grad, materialgröße in kelvin, auflösung
R = R1 * a / (res - (a & (res - 1))) # berechne aktuellen Widerstand aus ADC
T = B25 / (math.log(R / R25) + B25 / 298) - 273 # temperatur berechnen
return T
def hdorn(line):
"""
Parst eine HDORN-Zeile und liefert NUR die BGO-Temperatur Tbgo01 (Slice 0, Block 4, Wert d[3]).
Gibt eine Liste mit einem einzigen Wert [Tbgo01] zurück oder [] falls nicht vorhanden.
"""
ll = line.split() # split in HDORN, slice nr, 64 Werte
if len(ll) != 66 or ll[0] != "HDORN":
return [] # ungültige Zeile
try:
sl = int(ll[1]) # Slice-Nummer (0 oder 1)
dorn_data = [int(num) for num in ll[1:]] # extrahiert die daten
except Exception:
return []
# Nur Slice 0 ist relevant ( ich betrachte nur TBGO_01)
if sl != 0:
return []
# 8 Blöcke a 8 Werte extrahieren
dorn_data = [d & 0xfff for d in dorn_data]
dorn_data = [dorn_data[8 * i + 1 : 8 * i + 1 + 8] for i in range(8)]
# Nur Block 4 (PA + BGO)
d = dorn_data[4]
tbgo01 = degC(d[3]) # BGO-Temperatur aus Kanal 3 im Block 4
return [tbgo01]
def bate(l):
"""
Temperatur & Druck aus P-Zeilen (hex-dekodiert)
Rückgabe: [temp_C, pressure_hPa]
Falls Plausibilitätsprüfung fehlschlägt [None, None]
"""
parts = l.split()
if len(parts) < 8:
return [None, None]
try:
word = [int(num, 16) for num in parts[3:7]]
d = [int(num, 16) for num in parts[7:]]
except ValueError:
return [None, None]
# P-Zeile enthält zu wenige Werte → ungültig
if len(word) != 4 or len(d) < 2:
return [None, None]
"""
try:
word = [int(num, 16) for num in parts[3:7]]
d = [int(num, 16) for num in parts[7:]]
except ValueError:
return [None, None]
"""
c = [
word[0] >> 1,
((word[2] & 63) << 6) | (word[3] & 63),
word[3] >> 6,
word[2] >> 6,
((word[0] & 1) << 10) | (word[1] >> 6),
word[1] & 63
]
ut1 = 8 * c[4] + 20224
dT = d[1] - ut1 #frag mich nicht was hier passiert...
off = c[1] * 4 + ((c[3] - 512) * dT) / 4096
sens = c[0] + (c[2] * dT) / 1024 + 24576
x = (sens * (d[0] - 7168)) / 16384 - off
temp = (200 + dT * (c[5] + 50) / 1024) / 10
pressure = (x * 10 / 32 + 2500) / 10 + 3
# Plausibilitätsprüfung
if not (-50 < temp < 80) or not (pressure < 1200):
return [None, None]
return [temp, pressure]
###############################################################################
# Polynom für Temperatur-Korrekturfaktor von Ava
###############################################################################
pol = [2.36849137e-03, -1.06835334e-01, -5.25201198e+01, 5.30730515e+03]
def polynom(T, a, b, c, d):
"""
Polynom a*T^3 + b*T^2 + c*T + d
"""
return a * T**3 + b * T**2 + c * T + d
def temp_korr_faktor(T, pol):
"""
Korrekturfaktor = p(0) / p(T).
"""
a, b, c, d = pol
p0 = polynom(0, a, b, c, d)
pT = polynom(T, a, b, c, d)
return p0 / pT if pT != 0 else 1.0
###############################################################################
# Druck zu Höhe (Standardatmosphäre) von Hannes
###############################################################################
def pressure_to_height(P):
"""
Rechnet Druck P (Pa) in geometrische Höhe (m) um.
Schichtmodell nach US Standard Atmosphere 1976.
"""
if P is None or P <= 0:
return None
# Konstanten
R_star = 8.31432
g0 = 9.80665
g0_prime = 9.80665
P0 = 101325
r0 = 6356766
T0 = 288.15
M0 = 28.9644e-3
# Atmosphärenschichten (Basishöhe, -temp, -druck, Gradient L)
layers = [
{"h_base": 0, "T_base": T0, "P_base": P0, "L": -0.0065},
{"h_base": 11000, "L": 0},
{"h_base": 20000, "L": 0.001},
{"h_base": 32000, "L": 0.0028},
{"h_base": 47000, "L": 0},
{"h_base": 51000, "L": -0.0028},
{"h_base": 71000, "L": -0.002},
]
def molecular_scale_temp(H, H_base, T_base, L):
return T_base + L * (H - H_base)
def pressure(H, H_base, T_base, P_base, L):
if L == 0:
return P_base * np.exp(-g0_prime * M0 * (H - H_base) / (R_star * T_base))
else:
return P_base * (T_base / (T_base + L * (H - H_base))) ** (g0_prime * M0 / (R_star * L))
# Basistemperatur/-druck je Schicht fortschreiben
for i in range(len(layers)):
if i == 0:
layers[i]["T_base"] = T0
layers[i]["P_base"] = P0
else:
layers[i]["T_base"] = molecular_scale_temp(layers[i]["h_base"], layers[i-1]["h_base"], layers[i-1]["T_base"], layers[i-1]["L"])
layers[i]["P_base"] = pressure(layers[i]["h_base"], layers[i-1]["h_base"], layers[i-1]["T_base"], layers[i-1]["P_base"], layers[i-1]["L"])
# In der passenden Schicht Höhe aus P bestimmen
for i, layer in enumerate(layers):
h_base = layer["h_base"]
T_base = layer["T_base"]
P_base = layer["P_base"]
L = layer["L"]
if i < len(layers) - 1 and P <= P_base and P > layers[i + 1]["P_base"]:
if L == 0:
H = R_star * T_base / g0_prime / M0
H_diff = H * math.log(P_base / P)
else:
exponent = (P / P_base) ** (-R_star * L / (g0_prime * M0))
H_diff = (T_base * (exponent - 1)) / L
break
elif i == len(layers) - 1:
if L == 0:
H = R_star * T_base / g0_prime / M0
H_diff = H * math.log(P_base / P)
else:
exponent = (P / P_base) ** (-R_star * L / (g0_prime * M0))
H_diff = (T_base * (exponent - 1)) / L
geopot_height = h_base + H_diff
height = r0 * geopot_height / (g0 / g0_prime * r0 - geopot_height)
return height
###############################################################################
# Parser- und Exportfunktionen für MHA-Ausgabe
#(ist nach zeitstempel sortierte und erweiterte AHA Datei für den Übergang zur MHM)
###############################################################################
def parse_aha_file(filepath):
"""
Liest AHA-Datei, verknüpft EDBs mit Tbgo, Druck und UTC-Zeit.
WICHTIG: UTC wird erst bei C64-Zeilen aktiviert aufgrund der Empfehlung nur nach C64 zu binnen.
Ich weiß dass dies nicht ganz die korrekte weise ist aber durch das sortieren fallen halt die C64 zeilen weg.
Rückgabe je EDB: [zeitstempel, name, messwert, Tbgo, Druck, UTC, KorrFaktor]
"""
pending_utc = None # UTC aus H-Zeile
current_utc = None # aktive UTC-Zeit (wird erst bei C64 gesetzt)
current_tbgo = None
current_pressure = None
# Kanalzuordnung (Slice, Channel) zu Detektorkennung
channel_map = {
(1, 0): "A17", (1, 1): "A10", (1, 2): "A19", (1, 3): "B1", (1, 4): "A5",
(1, 5): "A6", (1, 6): "A14", (1, 7): "A15", (1, 8): "A16", (1, 9): "A7",
(1, 10): "A4", (1, 11): "C1", (1, 12): "B2", (1, 13): "A3", (1, 14): "A9",
(1, 15): "A18", (1, 16): "A13", (1, 17): "A20", (1, 18): "A1", (1, 19): "A2",
(1, 20): "B3", (1, 21): "A8", (1, 22): "A12", (1, 23): "A11",
(0, 0): "E2", (0, 1): "E7", (0, 2): "E20", (0, 3): "D1", (0, 4): "E14",
(0, 5): "E15", (0, 6): "E5", (0, 7): "E6", (0, 8): "E1", (0, 9): "E10",
(0, 10): "E13", (0, 11): "C2", (0, 12): "D2", (0, 13): "E18", (0, 14): "E12",
(0, 15): "E3", (0, 16): "E4", (0, 17): "E19", (0, 18): "E16", (0, 19): "E17",
(0, 20): "D3", (0, 21): "E11", (0, 22): "E9", (0, 23): "E8"
}
results = []
with open(filepath, "r", encoding="utf-8", errors="ignore") as file:
for line in file:
line = line.strip()
if not line:
continue
# Druck aus P-Zeilen
if line.startswith("P"):
vals = bate(line)
if vals and all(v is not None for v in vals):
_, current_pressure = vals
continue
if line.startswith("HDORN"):
temps = hdorn(line)
if temps: # prüfen, ob Liste nicht leer ist
current_tbgo = temps[0] # enthält Tbgo01 (in °C)
continue
# H-Zeile UTC
if line.startswith("H "):
s = line.split()
try:
m = int(float(s[1]))
pending_utc = datetime.datetime.fromtimestamp(m, datetime.timezone.utc)
except:
pass
continue
# C64 UTC wird aktiv
if line.startswith("C64"):
current_utc = pending_utc
continue
# EDB-Messungen
if not line.startswith("EDB "):
continue
parts = line.split()
if len(parts) != 11:
continue
try:
zeitstempel = int(float(parts[1]))
slice_nr = int(float(parts[2]))
kanal_nr = int(float(parts[3]))
messwert = float(parts[-1]) / 0x20000 # Skalierung
except:
continue
name = channel_map.get((slice_nr, kanal_nr))
if not name:
continue
# VALIDIERUNG: EDB nur übernehmen wenn alles vorhanden ist
invalid = (
current_tbgo is None or
current_pressure is None or
current_utc is None or
name is None or
messwert is None or
math.isnan(messwert)
)
if invalid:
continue
korr = temp_korr_faktor(current_tbgo, pol)
results.append([zeitstempel, name, messwert, current_tbgo, current_pressure, current_utc, korr])
# Sortierung inkl. Wrap (kommt unten die funktion )
results = sort_with_wraparound(results)
return results
def sort_with_wraparound(data):
"""
Erkennt Wrap-Arounds im 32-bit Zeitstempel.
- Nach Wrap-Erkennung: 500 Zeilen lang wird geprüft ob >3e9. Wenn ja, werden sie in den block davor geschrieben
- Danach 700 Zeilen lang keine erneute Wrap-Prüfung (Alternieren abpuffern)
- Jeder Block wird für sich sortiert; Ausgabe: Block1, Block2, ...
"""
if not data:
return []
blocks = [[]]
last_val = data[0][0] # zeitstempel
i = 1
skip_wrap_check = 0
while i < len(data):
zst = data[i][0] # zeitstempel des aktuellen eintrags
# Wrap erkennen (nur wenn nicht im Ignore-Fenster)
if skip_wrap_check == 0 and zst < last_val and last_val > 3_000_000_000 and zst < 100_000_000:
# potenzieller Wrap
block1 = blocks[-1]
block2 = []
# Schaue die nächsten 500 Zeilen: große Nachzügler (>3e9) noch Block 1 zuordnen, Rest Block 2
lookahead = data[i:i + 500]
for entry in lookahead:
if entry[0] > 3_000_000_000:
block1.append(entry)
else:
block2.append(entry)
blocks[-1] = block1
blocks.append(block2)
# i um 500 vorziehen, 700 Zeilen Wrap-Check pausieren
i += 500
skip_wrap_check = 700
if i < len(data):
last_val = data[i][0]
continue
# normal einsortieren: aktueller eintrag in aktuellen block schreiben
blocks[-1].append(data[i])
last_val = zst
if skip_wrap_check > 0:
skip_wrap_check -= 1 # der wrap around überspringer wird wieder zurück gezählt
i += 1
# Blöcke jeweils aufsteigend nach Zeitstempel
for b in blocks:
b.sort(key=lambda x: x[0])
# ursprünglicher Blockreihenfolge
merged = [item for block in blocks for item in block]
return merged
def export_to_MHA(data, filename):
"""
Schreibt die neue MHA-Datei (eine Zeile pro Detektor:
Spalten:
1. EDB
2. Zeitstempel
3. UTC
4. Detektorname
5. Tbgo_C
6. Druck_hPa
7. Hoehe_m (aus Druck berechnet)
8. KorrFaktor
9. Messwert_mV
"""
with open(filename, "w", encoding="utf-8") as f:
f.write("EDB Zeitstempel UTC Kanal Tbgo_C Druck_hPa Hoehe_m KorrFaktor Messwert_mV\n")
for zst, name, messwert, Tbgo, Druck, UTC, Korr in data:
# UTC-Format
utc_str = UTC.strftime("%Y-%m-%dT%H:%M:%SZ") if UTC else "NaT"
# Druck & Höhe: ganze Zahlen
Druck_int = int(round(Druck)) if Druck is not None else 0
hoehe = pressure_to_height(Druck_int * 100) if Druck_int > 0 else 0
hoehe_int = int(round(hoehe)) if hoehe is not None else 0
# Tbgo & Korr gerundet, Messwert exakt
Tbgo_str = f"{Tbgo:.2f}" if Tbgo is not None else "NaN"
Korr_str = f"{round(Korr, 2):.2f}" if Korr is not None else "NaN"
messwert_str = str(messwert) if messwert is not None else "NaN"
f.write(f"EDB {zst} {utc_str} {name} {Tbgo_str} {Druck_int} {hoehe_int} {Korr_str} {messwert_str}\n")
def main():
parser = argparse.ArgumentParser( description="Konvertiert AHA-Dateien zu MHA-Dateien mit Temperatur-, Druck- und Höhenkorrekturen.")
parser.add_argument("input", help="Pfad zur AHA-Eingabedatei")
parser.add_argument("-o", "--output", help="Name der Ausgabedatei (optional). Wenn nicht angegeben, wird '.MHA' angehängt.")
args = parser.parse_args()
infile = args.input
# Falls keine Output-Datei angegeben ist dann .MHA automatisch anhängen
if args.output:
outfile = args.output
else:
root, ext = os.path.splitext(infile)
outfile = root + ".MHA"
print(f"Lese AHA-Datei: {infile}")
data = parse_aha_file(infile)
print(f"Schreibe MHA-Datei: {outfile}")
export_to_MHA(data, outfile)
print("MHA-Datei wurde erfolgreich erstellt:", outfile)
if __name__ == "__main__":
main()

173
MHM.py
View file

@ -1,173 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 8 12:48:37 2025
@author: milanmarx
"""
import os
import datetime
import argparse
###############################################################################
# Event-Gruppierung
###############################################################################
def group_events(data, time_threshold=7):
"""
Gruppiert Zeilen zu Events, basierend auf der Differenz im Zeitstempel.
Zwei Zeilen gehören ins gleiche Event, wenn der Zeitunterschied <= time_threshold ist.
hier als 7 gewählt siehe hierfür seth_bin_hist_statistic.py
"""
events = []
current_event = []
last_time = None
for zeile in data:
zeitstempel = zeile[1]
if last_time is None:
current_event = [zeile]
elif zeitstempel - last_time <= time_threshold:
current_event.append(zeile)
else:
if len(current_event) > 0:
events.append(current_event)
current_event = [zeile]
last_time = zeitstempel
events.append(current_event)
return events
###############################################################################
# Parser für vorhandene MHA-Datei (mit MHA.py erstellbar aus AHA)
###############################################################################
def parse_mha_file(filepath):
"""
Liest eine fertige MHA-Datei ein und gibt die Daten in strukturierter Form zurück
Erwartete MHA-Spalten:
EDB Zeitstempel UTC Kanal Tbgo_C Druck_hPa Hoehe_m KorrFaktor Messwert_mV
"""
data = []
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
if line.startswith("EDB Zeitstempel") or not line.strip(): # header überspringen
continue
parts = line.split()
if len(parts) < 9 or parts[0] != "EDB":
continue
try:
zeitstempel = int(parts[1])
utc = None if parts[2] == "NaT" else datetime.datetime.strptime(parts[2], "%Y-%m-%dT%H:%M:%SZ")
kanal = parts[3]
Tbgo = float(parts[4]) if parts[4] != "NaN" else None
Druck = float(parts[5]) if parts[5] != "NaN" else None
Hoehe = float(parts[6]) if parts[6] != "NaN" else None
Korr = float(parts[7]) if parts[7] != "NaN" else None
Messwert = float(parts[8]) if parts[8] != "NaN" else None
except Exception:
continue
data.append(["EDB", zeitstempel, utc, kanal, Tbgo, Druck, Hoehe, Korr, Messwert])
return data
###############################################################################
# Export in MHM (jetzt ist jede Zeile ein Event)
###############################################################################
def export_to_mhm(events, filename):
"""
Exportiert gruppierte Events aus MHA-Daten in MHM-Format.
Falls ein Event mehrere Messungen desselben Kanals enthält,
wird der größte Messwert (Maximum) übernommen.
"""
kanal_order = (
[f"A{i}" for i in range(1, 21)] +
["B1", "B2", "B3", "C1", "C2", "D1", "D2", "D3"] +
[f"E{i}" for i in range(1, 21)]
)
with open(filename, "w", encoding="utf-8", errors = "ignore") as f:
header = ["EDM", "Zeitstempel", "Anzahl_Kanaele", "Druck", "Hoehe_m", "Temperatur", "UTC", "KorrFaktor"] + kanal_order
f.write(" ".join(header) + "\n")
for ev in events:
if not ev:
continue
# Basiswerte aus erster Zeile des Events
z0 = ev[0]
zeitstempel = z0[1]
utc = z0[2].strftime("%Y-%m-%dT%H:%M:%SZ") if z0[2] else "NaT"
druck = int(round(z0[5])) if z0[5] is not None else 0
hoehe = int(round(z0[6])) if z0[6] is not None else 0
temp = round(z0[4], 2) if z0[4] is not None else 0
korr = round(z0[7], 2) if z0[7] is not None else 1.0
# Kanalwerte initialisieren als Dictionary mit Listen
messwerte = {k: [] for k in kanal_order}
# Alle Messungen in Event sammeln
for _, _, _, kanal, *_rest, messwert in ev:
if kanal in messwerte and messwert is not None:
messwerte[kanal].append(round(messwert, 2))
# Maximalwert je Kanal bestimmen (0 falls keine Messung)
messwerte = {k: (max(v) if v else 0) for k, v in messwerte.items()}
# Anzahl aktiver Kanäle (ungleich 0)
anzahl_kanaele = sum(1 for v in messwerte.values() if v != 0)
# Zeile schreiben
line = (
["EDM", str(zeitstempel), str(anzahl_kanaele),
str(druck), str(hoehe), f"{temp:.2f}", utc, f"{korr:.2f}"]
+ [f"{messwerte[k]:.2f}" if messwerte[k] != 0 else "0" for k in kanal_order]
)
f.write(" ".join(line) + "\n")
###############################################################################
# Main
###############################################################################
def main():
parser = argparse.ArgumentParser(description="Konvertiert MHA-Dateien zu MHM-Dateien (Event pro Zeile).")
parser.add_argument("input",help="Pfad zur MHA-Datei")
parser.add_argument("-o", "--output", help="Ausgabedatei (optional). Falls nicht angegeben, wird '.MHM' angehängt.")
args = parser.parse_args()
infile = args.input
# Falls kein Output angegeben: .MHM anhängen
if args.output:
outfile = args.output
else:
root, ext = os.path.splitext(infile)
outfile = root + ".MHM"
print(f"Lese MHA-Datei: {infile}")
data = parse_mha_file(infile)
print(f"{len(data)} Zeilen eingelesen")
print("Bilde Events...")
events = group_events(data)
print(f"{len(events)} Events erkannt")
print(f"Schreibe MHM-Datei: {outfile}")
export_to_mhm(events, outfile)
print(f"MHM-Datei erfolgreich erstellt: {outfile}")
if __name__ == "__main__":
main()

View file

@ -12,7 +12,7 @@ def magnitutde(vector):
sum += entry**2
return np.sqrt(sum)
def residuals(coeff, vecs, type="mag"):
def residuals(coeff, vec, type="mag"):
"""
Function returns the difference between circle and ellipsis
"""
@ -20,15 +20,9 @@ def residuals(coeff, vecs, type="mag"):
r = (0.538+0.503)/2 # mean between Kiel and Kiruna
else:
r = 1
x, y, z = [], [], []
for vec in vecs:
x.append(vec[0])
y.append(vec[1])
z.append(vec[2])
x, y, z = np.array(x), np.array(y), np.array(z)
x, y, z = vec
a, b, c, x0, y0, z0 = coeff
c, z0 = 1, 0
return (r*r) - a*(x-x0)*(x-x0) - b*(y-y0)*(y-y0) - c*(z-z0)*(z-z0)
return a*(x-x0)*(x-x0) + b*(y-y0)*(y-y0) + c*(z-z0)*(z-z0) - (r*r)
def main():
@ -52,11 +46,10 @@ def main():
acc[2] = -acc[2]/acc_sens
acc_initial = [1.0, 1.0, 1.0, 0.1, 0.1, 0.1]
mag_initial = [7., 7., 1., 0.05, 0.42, 0.0]
mag_initial = [1.0, 1.0, 1.0, 0.1, 0.1, 0.1]
acc_parms = least_squares(residuals, acc_initial, args=(np.array(accs), "type=acc"), method='lm')
mag_parms = least_squares(residuals, mag_initial, args=(np.array(mags), "type=mag"), method='lm')
acc_parms = least_squares(residuals, acc_initial, args=(acc, "acc"))
mag_parms = least_squares(residuals, mag_initial, args=(mag, "mag"))
print("Mag [a,b,c,x0,y0,z0]:", [float(a) for a in mag_parms.x])
print("Acc [a,b,c,x0,y0,z0]:", [float(a) for a in acc_parms.x])

View file

@ -1,140 +0,0 @@
#! /usr/bin/python3
from matplotlib.ticker import FormatStrFormatter
import matplotlib.pyplot as plt
import matplotlib.dates as md
import struct
import datetime
import math
import numpy as np
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('filepath')
args = parser.parse_args()
fp = args.filepath
def calibrate_vector(vector, what:str):
"""
what is either mag or acc
The uints are turned to ints, then you divide by the sensitivity to get physical units.
Alignment with new coordinate system is done where gravity is (0,0,-1).
The fit coefficients determined in BA are applied.
"""
if what == "mag":
mag_sens = 6842 # LSB/gauss
mag_calib = [0.43294291744819174, 0.0022384941038381787, 1.0, 0.0037734950605782824, -14.215784558905854, 0.1] # fit coefficients
mag_raw = [struct.unpack('<h', struct.pack('<H', int(num)))[0] for num in vector]
mag_raw = [-mag_raw[0]/mag_sens, -mag_raw[1]/mag_sens, mag_raw[2]/mag_sens]
calibrated_vector = [(mag_raw[0]-mag_calib[3])*math.sqrt(mag_calib[0]), (mag_raw[1]-mag_calib[4])*math.sqrt(mag_calib[1]), (mag_raw[2]-mag_calib[5])*math.sqrt(mag_calib[2])]
if what == "acc":
acc_sens = 16000 # 1000 LSB/g and it is only a 12 bit number so divide by 16 (1hex bit) equivalent to shifting 4 bits
acc_calib = [0.972148, 0.968608, 0.974061, -0.00109779, -0.0154984, 0.0135722]
acc_raw = [struct.unpack('<h', struct.pack('<H', int(num)))[0] for num in vector] # turns int into hex, turns hex into signed int
acc_raw = [acc_raw[0]/acc_sens, acc_raw[1]/acc_sens, -acc_raw[2]/acc_sens]
calibrated_vector = [(acc_raw[0]-acc_calib[3])*math.sqrt(acc_calib[0]), (acc_raw[1]-acc_calib[4])*math.sqrt(acc_calib[1]), (acc_raw[2]-acc_calib[5])*math.sqrt(acc_calib[2])]
return calibrated_vector
def Rz(phi):
"""
Rotation matrix of angle phi around z axis
"""
return np.array([[np.cos(phi), -np.sin(phi), 0],
[np.sin(phi), np.cos(phi), 0],
[0 , 0, 1]])
def Ry(theta):
"""
Rotation matrix of angle theta around y axis
"""
return np.array([[np.cos(theta), 0, np.sin(theta)],
[0, 1, 0],
[-np.sin(theta), 0, np.cos(theta)]])
def calculate_heading(mag, phi, theta):
"""
Uses the gravity vector's phi and theta to rotate mag vector in coord system with xy parallel to ground
and z pointing to zenith. x axis is direction in which the telescope is looking, mag vector is magnetic north
thus, the heading is simply the angle between x and north. Translate this to compass.
"""
declination = 0 # Kiruna
mag_world = np.matmul(Rz(-phi), np.matmul(Ry(theta-np.pi), np.matmul(Rz(phi), mag)))
mag_world_psi = np.arctan2(mag_world[1], mag_world[0]) + declination # angle mag vector in wf, x axis in wf + delta because TN is delta left of the MN (=mag vector)
if mag_world_psi <= 0:
mag_world_psi = 2*np.pi + mag_world_psi
#mag_world_psi = abs(mag_world_psi) # because compass counts clock-wise and atan2 counter CW
else:
pass
#mag_world_psi = 2*np.pi - mag_world_psi
heading = mag_world_psi * 180/np.pi # heading in degrees (0deg = True North)
return heading
def calculate_ads(acc, mag):
if np.nan in acc or np.nan in mag:
return np.nan, np.nan, np.nan
pitch = math.atan(acc[0]/acc[2])*180/np.pi # math package returns in radians
roll = math.atan(acc[1]/acc[2])*180/np.pi
phi = math.atan2(acc[1], acc[0])
g_length_xy = math.sqrt(acc[0]**2+acc[1]**2)
theta = math.atan2(g_length_xy, acc[2])
heading = calculate_heading(mag, phi, theta)
return heading, roll, pitch # swapped pitch and roll bc ads x points to left of telescope
with open(fp, errors="ignore") as file:
heading = {}
interval_mag = []
interval_acc = []
start = datetime.datetime(2001,1,1)
cntr = 0
skip = True
for l in file:
if not skip and start == datetime.datetime(2001,1,1) and l[0] == "H":
start = datetime.datetime.fromtimestamp(int(l.split()[1]), datetime.timezone.utc)
if start != 0: # only perform when correct timestamp
if not skip and l[:8] == 'I2C-MAG ':
try:
mag_raw = [int(num) for num in l.split()[2:]]
mag = calibrate_vector(mag_raw, "mag")
interval_mag.append(mag_raw)
except:
interval_mag.append([np.nan, np.nan, np.nan])
if not skip and l[:8] == 'I2C-ACC ':
try:
acc_raw = [int(num) for num in l.split()[2:]]
acc = calibrate_vector(acc_raw, "acc")
interval_acc.append(acc)
except:
interval_acc.append([np.nan, np.nan, np.nan])
if l[:4] == "C64 ":
if skip:
skip = False
for i in range(len(interval_mag)):
cntr += 1
head, pitch, roll = calculate_ads(interval_acc[i], interval_mag[i])
heading[start + datetime.timedelta(milliseconds=cntr*100)] = [head, pitch, roll]
interval_acc = []
interval_mag = []
vals = list(heading.values())
t = list(heading)
fig, (ax1, ax2) = plt.subplots(2,1, sharex=True)
fmtdate = md.DateFormatter('%m-%d-%H:%M:%S')
ax2.xaxis.set_major_formatter(fmtdate)
fig.suptitle(fp.split("/")[-1])
ax1.plot(t, [att[0] for att in vals], ".")
ax2.plot(t, [att[1] for att in vals], ".", label="Pitch")
ax2.plot(t, [att[2] for att in vals], ".", label="Roll")
ax2.set_xlabel("Time in MM-DD-HH:MM:SS", fontsize=16)
ax2.tick_params(axis="x", labelsize=14, labelrotation=25)
ax1.set_ylabel("Heading in °", fontsize=16)
ax1.tick_params(axis="y", labelsize=14)
ax2.set_ylabel("Angle in °", fontsize=16)
ax2.tick_params(axis="y", labelsize=14)
ax1.grid(); ax2.grid()
ax2.legend()
plt.tight_layout()
plt.show()

138
power.py
View file

@ -1,138 +0,0 @@
#! /usr/bin/python3
from matplotlib.ticker import FormatStrFormatter
import matplotlib.pyplot as plt
import matplotlib.dates as md
from matplotlib.ticker import MaxNLocator
import struct
import datetime
import math
import numpy as np
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('filepath')
args = parser.parse_args()
fp = args.filepath
def degC(a):
R1, R25, B25, res = 10e3, 10e3, 3940, 0x1000
R = R1 * a / (res - (a & (res-1)))
T = B25/(math.log(R/R25) + B25/298) - 273
return T
def hdorn(l):
# HDORN 0 74 95 83 81 83 74 88 90 78 88 84 95 77 91 74 81 77 82 94 73 78 97 76 72 0 3555 1347 3551 2048 1437 3082 1239 3832 4095 4095 1500 1478 1540 1247 3105 324 327 324 323 323 323 323 323 3564 3566 3564 3564 3565 3565 3565 3564 2612 2756 2495 2631 130 2120 1289 1114
ll = l.split() # ll: list of line contents
if len(ll) != 66:
return np.nan, np.nan
try:
sl = int(ll[1]) # slice
dorn_data = [int(num) for num in ll[1:]]
except:
return np.nan, np.nan
dorn_data = [d & 0xfff for d in dorn_data]
dorn_data = [dorn_data[8*i+1 : 8*i+1 + 8] for i in range(8)] # turns dorn_data into 1 list of 8 lists with 8 entries
HK = []
for i in range(8):
d = dorn_data[i]
if i == 3:
HK.append(f"""
{sl} 3 HK ADC
Tadc = {degC(d[5]):6.2f} °C, GND = {d[0]:6.3f} V,
Vff = {d[1]:6.3f} V, Vnn = {d[2]:6.3f} V, Vpp = {d[3]:6.3f} V, Vdig = {d[4]:6.3f} V,
Vcc = {d[6]:6.3f} V, Vss = {d[7]:6.3f} V.
""")
if i == 4:
HK.append(f"""
{sl} 4 HK PA
Tpa = {degC(d[4]):6.2f} °C, Tbgo = {degC(d[3]):6.2f} °C, {degC(d[5]):6.2f} °C,
Vbias = {d[2]:6.3f} V, Ibias = {d[1]:6.1f} nA, {d[0]:6.1f} nA,
Vcc = {d[7]:6.3f} V, Vss = {d[6]:6.3f} V.
""")
if i == 7 and sl == 0:
HK.append(f"""
{sl} 7 HK PWR
na = {repr(d[:4])}
Tpwr = {degC(d[6]):.1f} °C,
Vprim = {d[5]:.1f} V, Iprim = {d[4]:.1f} mA,
Ibias = {d[7]:.1f} nA.\n
""")
Vprim = d[5]*16.3*3.3/4096
Iprim = d[4]*2000*3.3/4096
elif i == 7 and sl == 1:
HK.append(f"""
{sl} 7 HK PWR
na = {repr(d[:4])}
Text = {degC(d[6]):.1f} °C,
Ibias = {d[4]:.1f} nA.
Ibias = {d[7]:.1f} nA, Vbias = {d[5]:.1f} V.\n
""")
return Vprim, Iprim
with open(fp, errors="ignore") as file:
skip = True
Vprim = []
Iprim = []
time = []
for l in file:
if not skip and l[:2] == "H ":
try:
ts = datetime.datetime.fromtimestamp(int(l.split()[1]), datetime.timezone.utc)
if ts.year == 2025:
time.append(ts)
except:
pass
if not skip and l[:6] == 'HDORN ': # two lines, sl0 and sl1 per interval
try:
if l.split()[1] == "0":
Vp, Ip = hdorn(l)
Vprim.append(Vp)
Iprim.append(Ip)
except:
Vprim.append(np.nan)
Iprim.append(np.nan)
if l[:4] == "C64 ":
if skip:
skip = False
t, v, i = len(time), len(Vprim), len(Iprim)
#print(t,v,i)
while t > v or t > i:
time.pop()
t = len(time)
while v > t:
Vprim.pop()
v = len(Vprim)
while i > t:
Iprim.pop()
i = len(Iprim)
Iprim_fl = []
Iprim_fl.append("nan")
Iprim_fl.append("nan")
Iprim_fl.append("nan")
for i in range(3, len(Iprim)-3):
Iprim_fl.append((Iprim[i-3]+Iprim[i-2] +2*Iprim[i-1]+5*Iprim[i]+2*Iprim[i+1]+Iprim[i+2]++Iprim[i+3])/13)
Iprim_fl.append("nan")
Iprim_fl.append("nan")
Iprim_fl.append("nan")
fig, ax1 = plt.subplots()
fmtdate = md.DateFormatter('%H:%M:%S')
ax1.xaxis.set_major_formatter(fmtdate)
ax2 = ax1.twinx()
ax1.plot(time, Vprim, ".k", ms=4)
ax2.plot(time, Iprim_fl, ".r", ms=4)
#ax2.set_yticks(np.arange(150, 220, 5))
ax1.set_ylabel("Voltage in V", fontsize=16, color="k")
ax1.tick_params(axis="y", labelsize=14, labelcolor="k")
ax2.set_ylabel("Current in mA", fontsize=16, color="r")
ax2.tick_params(axis="y", labelsize=14, labelcolor="r")
ax1.set_xlabel("Time in HH:MM:SS", fontsize=16)
ax1.tick_params(axis="x", labelsize=14)
ax2.yaxis.set_major_locator(MaxNLocator(nbins=10))
#ax2.grid(True, which='major', axis='y')
plt.suptitle('Battery voltage', fontsize=16)
plt.grid()
plt.show()

View file

@ -1,56 +0,0 @@
#! /usr/bin/python3
import argparse
import numpy as np
import struct
import matplotlib.pyplot as plt
import math
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--file')
args = parser.parse_args()
fp = args.file
mag_lines = []
acc_lines = []
mag_calib = [7., 7., 1., 0.05, 0.42, 0.0]
mag_sens = 6842 # LSB/gauss
acc_sens = 16000 # 1000 LSB/g
with open(fp) as f:
for l in f:
if l[:7] == "I2C-MAG":
mag_line = [struct.unpack('<h', struct.pack('<H', int(num)))[0] for num in l.split()[-3:]]
mag_line[0] = -mag_line[0]/mag_sens
mag_line[1] = -mag_line[1]/mag_sens
mag_line[2] = mag_line[2]/mag_sens
mag = [(mag_line[0]-mag_calib[3])*math.sqrt(mag_calib[0]), (mag_line[1]-mag_calib[4])*math.sqrt(mag_calib[1]), (mag_line[2]-mag_calib[5])*math.sqrt(mag_calib[2])]
mag_lines.append(mag)
if l[:7] == "I2C-ACC":
acc_line = [struct.unpack('<h', struct.pack('<H', int(num)))[0] for num in l.split()[-3:]]
acc_line[0] = acc_line[0]/acc_sens
acc_line[1] = acc_line[1]/acc_sens
acc_line[2] = -acc_line[2]/acc_sens
acc_lines.append(acc_line)
fig = plt.figure()
ax = fig.add_subplot(projection='3d')
xs = []
ys = []
zs = []
for vec in mag_lines:
xs.append(vec[0])
ys.append(vec[1])
zs.append(vec[2])
ax.scatter(xs, ys, zs) # from the docs: The z-positions. Either an array of the same length as xs and ys or a single value
ax.set_xlabel('X in Gs')
ax.set_ylabel('Y in Gs')
ax.set_zlabel('Z in Gs')
plt.show()

View file

@ -1,202 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Oct 19 18:31:14 2025
@author: milanmarx
"""
import datetime
import matplotlib.pyplot as plt
UTC_CUTOFF_HOUR = 19 # alles ab dieser Stunde wird ignoriert
# "mean" Mittelwert der Zeile
# Zahl N-tes Element (1-basiert)
# "all" Summe aller counts
# "bgo_1" (Kanäle 5, 14, 22) Mittelwert
# "bgo_2" (Kanäle 30, 39, 47) Mittelwert
#
C64_MODE = "bgo_1"
# für P-Zeilen zu Temperatur, Druck von Nicolas
def bate(l):
parts = l.split()
if len(parts) < 8:
return [None, None]
try:
word = [int(num, 16) for num in parts[3:7]]
d = [int(num, 16) for num in parts[7:]]
except ValueError:
return [None, None]
c = [
word[0] >> 1,
((word[2] & 63) << 6) | (word[3] & 63),
word[3] >> 6,
word[2] >> 6,
((word[0] & 1) << 10) | (word[1] >> 6),
word[1] & 63
]
ut1 = 8 * c[4] + 20224
dT = d[1] - ut1
off = c[1] * 4 + ((c[3] - 512) * dT) / 4096
sens = c[0] + (c[2] * dT) / 1024 + 24576
x = (sens * (d[0] - 7168)) / 16384 - off
temp = (200 + dT * (c[5] + 50) / 1024) / 10
pressure = (x * 10 / 32 + 2500) / 10 + 3
if not (-50 < temp < 80) or not (pressure < 1200):
return [None, None]
return [temp, pressure]
# Druck zu Höhe geschätzt
def pressure_to_altitude(p_mbar):
"""Berechnet Höhe in Meter aus Druck (mbar)."""
return 44330.0 * (1 - (p_mbar / 1013.25) ** 0.1903)
# Parser für C64- und P-Zeilen
def parse_aha(filename, c64_mode="mean"):
times = []
counts = []
heights = []
current_time = None
current_pressure = None
with open(filename, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
# Zeit (UTC)
if line.startswith("H "):
s = line.split()
try:
ts = int(float(s[1]))
current_time = datetime.datetime.fromtimestamp(ts, datetime.timezone.utc)
except Exception:
print(f"Ungültige H-Zeile: {line}")
continue
# Druck
if line.startswith("P "):
_, p = bate(line)
if p is not None:
current_pressure = p
continue
# C64-Zeilen
if line.startswith("C64"):
if current_time is None or current_pressure is None:
continue
if current_time.hour >= UTC_CUTOFF_HOUR:
continue
parts = line.split()[1:]
if len(parts) < 2:
continue
try:
numeric_parts = [float(x) for x in parts[:-1]] # letzter Wert ignoriert (weil das ja zeit ist)
except Exception:
continue
# Auswahlmodus
value = None
if c64_mode == "mean":
value = sum(numeric_parts) / len(numeric_parts)
elif c64_mode == "all":
value = sum(numeric_parts)
elif c64_mode == "bgo_1":
indices = [4, 13, 21] # Positionen 5, 14, 22
try:
selected = [numeric_parts[i] for i in indices]
value = sum(selected) / 3
except IndexError:
continue
elif c64_mode == "bgo_2":
indices = [29, 38, 46] # Positionen 30, 39, 47
try:
selected = [numeric_parts[i] for i in indices]
value = sum(selected) / 3
except IndexError:
continue
else:
try:
idx = int(c64_mode) - 1
if 0 <= idx < len(numeric_parts):
value = numeric_parts[idx]
except Exception:
continue
if value is None:
continue
# Alle Counts in Counts/Sekunde umrechnen
value /= 12.0
height = pressure_to_altitude(current_pressure)
times.append(current_time)
counts.append(value)
heights.append(height)
return times, counts, heights
# Plot Counts vs. Höhe
def plot_counts_vs_height(counts, heights, mode):
if not counts or not heights:
print("Keine gültigen Daten gefunden.")
return
plt.figure(figsize=(10, 6))
plt.scatter(heights, counts, marker="x", alpha=0.7)
# Titel abhängig vom Modus
mode_labels = {
"mean": "Mittelwert",
"all": "Summe aller Kanäle",
"bgo_1": "BGO_1 (Kanäle 5,14,22)",
"bgo_2": "BGO_2 (Kanäle 30,39,47)",
}
title_part = mode_labels.get(mode, f"Element {mode}")
plt.title(f"C64 {title_part} (Counts pro Sekunde) vs. Höhe")
plt.xlabel("Höhe (m)")
plt.ylabel("Counts pro Sekunde")
plt.grid(True)
plt.tight_layout()
plt.show()
if __name__ == "__main__":
datei = "sdcard_filter_2025-10-08_CUT.AHA"
times, counts, heights = parse_aha(datei, c64_mode=C64_MODE)
print(f"{len(counts)} gültige C64-Zeilen gefunden (Modus: {C64_MODE})")
plot_counts_vs_height(counts, heights, C64_MODE)

View file

@ -1,350 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 18 17:15:44 2025
@author: milanmarx
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
"""
Dieses Skript liest eine MHM datei ein, filtert die events mit ausgewählten cuts anhand der BGO,
plottet diese cuts und exportiert die gefilterte MHM datei wieder
"""
###############################################################################
# MHM einlesen und schreiben
###############################################################################
def read_mhm_file(filepath, verbose=True):
"""
Liest eine MHM-Datei ein und gibt data frame zurück
"""
df = pd.read_csv(filepath, sep=r"\s+")
#wenn verbose, dann Ausgabe wie viele events (zeilen) eingelesen
if verbose:
print(f"Datei '{filepath}' eingelesen.")
print(f"Anzahl eingelesener Events (Zeilen): {len(df)}")
return df
def save_filtered_mhm(df, filepath_out):
"""
Speichert einen data frame wieder als MHM-Datei
"""
df.to_csv(
filepath_out,
sep=" ",
index=False,
float_format="%.6g" #0.0 wird zu nur 0
)
print(f"Gefilterte Datei gespeichert unter: {filepath_out}")
###############################################################################
# Filter Logik und Plot
###############################################################################
def plot_and_filter_bgo(
df,
show_plots=True,
show_cuts=True,
log_axes=False,
apply_threshold = True,
bins=400,
xmax=5000,
ymax=5000,
verbose=True,
combined_bgo_plot=True # True = 2x3 in einem Plot, False = 6 Einzelplots
):
"""
Plottet BGO-Kanalpaare (B1-B2, B1-B3, B2-B3, D1-D2, D1-D3, D2-D3)
und filtert Events nach definierten Cuts.
"""
thr_map = {
"A1": 15, "A2": 15, "A3": 15, "A4": 15, "A5": 16, "A6": 15, "A7": 15, "A8": 15, "A9": 15, "A10": 15,
"A11": 15, "A12": 15, "A13": 15, "A14": 15, "A15": 15, "A16": 15, "A17": 15, "A18": 15, "A19": 15, "A20": 15,
"E1": 15, "E2": 15, "E3": 15, "E4": 15, "E5": 15, "E6": 15, "E7": 15, "E8": 15, "E9": 15, "E10": 15,
"E11": 15, "E12": 15, "E13": 15, "E14": 16, "E15": 15, "E16": 15, "E17": 15, "E18": 15, "E19": 15, "E20": 15,
"B1": 0, "B2": 0, "B3": 0, # BGO auf null aber ich filter ja mit dem ersten cut
"D1": 0, "D2": 0, "D3": 0,
"C1": 13, "C2": 12
}
df_filtered = df.copy()
# Negative Werte auf 0 setzen
for col in ["B1", "B2", "B3", "D1", "D2", "D3"]:
if col in df_filtered.columns:
df_filtered.loc[df_filtered[col] < 0, col] = 0
# Threshold anwenden
if apply_threshold:
for ch, thr in thr_map.items(): # Für jeden Kanal mit definiertem Threshold
if ch in df_filtered.columns:
before = (df_filtered[ch] > 0).sum() # Anzahl >0 vor dem Thresholding
df_filtered.loc[df_filtered[ch] < thr, ch] = 0 # Werte unterhalb der Schwelle auf 0
after = (df_filtered[ch] > 0).sum() # Anzahl >0 nach dem Thresholding
print(f"Threshold angewendet auf {ch}: {before} --> {after} gültige Werte (≥ {thr})")
# CUT 1: Alle 6 Kanäle müssen > 5.822 mV sein
cut1_mask = (df_filtered[["B1", "B2", "B3", "D1", "D2", "D3"]] > 5.822).all(axis=1)
# CUT 2: Paar-Cuts
B1, B2, B3 = df_filtered["B1"], df_filtered["B2"], df_filtered["B3"]
D1, D2, D3 = df_filtered["D1"], df_filtered["D2"], df_filtered["D3"]
mask_B12 = (B2 > 0.75*(B1 - 13)) & (B2 < B1/0.75 + 13)
mask_B13 = (B3 > 0.75*(B1 - 13)) & (B3 < B1/0.75 + 13)
mask_B23 = (B3 > 0.75*(B2 - 13)) & (B3 < B2/0.75 + 13)
mask_D12 = (D2 > 0.75*(D1 - 13)) & (D2 < D1/0.75 + 13)
mask_D13 = (D3 > 0.75*(D1 - 13)) & (D3 < D1/0.75 + 13)
mask_D23 = (D3 > 0.75*(D2 - 13)) & (D3 < D2/0.75 + 13)
# event muss alle 6 erfüllen
cut2_mask = mask_B12 & mask_B13 & mask_B23 & mask_D12 & mask_D13 & mask_D23
# Gefiltertes DataFrame
df_out = df_filtered[cut1_mask & cut2_mask].copy()
# Kanäle, die vom BGO-Cut ausgenommen sind weil das die Eck dioden sind
excluded_channels = [
"A1","A3","A16","A18","A19","A20",
"E1","E3","E16","E18","E19","E20"
]
# Ausnahme: Wenn ein eck Kanal ein Signal hat, wird das Event nicht verworfen
exception_mask = df_filtered[excluded_channels].gt(0).any(axis=1)
# Final gefiltert: Events, die die BGO Cuts erfüllen ODER einen eck Kanal haben
df_out = df_filtered[(cut1_mask & cut2_mask) | exception_mask].copy()
# verbose ausgabe
if verbose:
print("Cut-Statistik:")
n_total = len(df_filtered)
n_after_all = len(df_out)
print(f"Gesamtzahl Events vor Cuts : {n_total}")
print(f"Gesamtzahl Events nach allen Cuts : {n_after_all}")
print("Events, die Cut 1 (alle Kanäle > 5.82 mV) UND den jeweiligen Paar-Cut erfüllen:")
pair_masks = [
("B1-B2", mask_B12),
("B1-B3", mask_B13),
("B2-B3", mask_B23),
("D1-D2", mask_D12),
("D1-D3", mask_D13),
("D2-D3", mask_D23),
]
for name, pair_mask in pair_masks:
mask_combined = cut1_mask & pair_mask
n_pair = np.count_nonzero(mask_combined)
print(f" {name:5s}: {n_pair:6d} Events")
# plot
if show_plots:
pairs = [
("B1", "B2"), ("B1", "B3"), ("B2", "B3"),
("D1", "D2"), ("D1", "D3"), ("D2", "D3")
]
# gemeinsame Log/linear-Bins vorbereiten (werden für beide Modi benutzt)
def get_bins():
if log_axes:
bins_x = np.logspace(np.log10(1), np.log10(xmax), bins)
bins_y = np.logspace(np.log10(1), np.log10(ymax), bins)
else:
bins_x = bins_y = bins
return bins_x, bins_y
if combined_bgo_plot:
# 2x3 Subplots
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.flatten()
# Alle Achsen quadratisch skalieren
for ax in axes:
ax.set_aspect("equal", adjustable="box")
scale_str = "log axes" if log_axes else "linear axes"
fig.suptitle(
f"BGO channel pairs after cuts ({scale_str})",
fontsize=25
)
fig.text(
0.5, 0.93,
r"Cuts: $B_i,D_i$ > 5.82 mV | $0.75\cdot (B_i - 13mV) < B_j < B_i/0.75 + 13mV$",
ha="center", color="red", fontsize=20
)
hist_meshes = []
for ax, (ch_i, ch_j) in zip(axes, pairs):
x = df_filtered[ch_i].astype(float)
y = df_filtered[ch_j].astype(float)
mask_valid = (x > 0) & (y > 0)
x, y = x[mask_valid], y[mask_valid]
bins_x, bins_y = get_bins()
h = ax.hist2d(
x,
y,
bins=[bins_x, bins_y],
cmap="viridis",
norm=LogNorm(vmin=1)
)
hist_meshes.append(h[3])
ax.set_title(f"{ch_i} vs {ch_j}", fontsize=15)
ax.set_xlabel(f"{ch_i} in mV", fontsize=15)
ax.set_ylabel(f"{ch_j} in mV", fontsize=15)
# Tick-Labels größer
ax.tick_params(axis='both', which='major', labelsize=14)
ax.tick_params(axis='both', which='minor', labelsize=12)
if log_axes:
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlim(1, xmax)
ax.set_ylim(1, ymax)
else:
ax.set_xlim(0, xmax)
ax.set_ylim(0, ymax)
if show_cuts:
# vertikale und horizontale Linie bei 5.82 mV (Cut 1)
ax.plot([5.82, 5.82], [0, ymax], "r--", lw=3)
ax.plot([0, xmax], [5.82, 5.82], "r--", lw=3)
# Linien für Paar-Cuts:
if log_axes:
xi = np.logspace(np.log10(1), np.log10(xmax), 400)
else:
xi = np.linspace(0, xmax, 400)
lower = 0.75*(xi - 13)
upper = xi/0.75 + 13
ax.plot(xi, lower, "r-", lw=3)
ax.plot(xi, upper, "r-", lw=3)
fig.subplots_adjust(right=0.88)
cbar_ax = fig.add_axes([0.90, 0.12, 0.02, 0.75])
cbar = fig.colorbar(hist_meshes[0], cax=cbar_ax)
cbar.set_label("Counts (log)", fontsize=20)
cbar.ax.tick_params(labelsize=14)
plt.show()
else:
# Einzelplots für jedes Paar
for ch_i, ch_j in pairs:
x = df_filtered[ch_i].astype(float)
y = df_filtered[ch_j].astype(float)
mask_valid = (x > 0) & (y > 0)
x, y = x[mask_valid], y[mask_valid]
plt.figure(figsize=(7, 6))
bins_x, bins_y = get_bins()
h = plt.hist2d(
x,
y,
bins=[bins_x, bins_y],
cmap="viridis",
norm=LogNorm(vmin=1)
)
plt.title(f"{ch_i} vs {ch_j}", fontsize=18)
plt.xlabel(f"{ch_i} in mV", fontsize=16)
plt.ylabel(f"{ch_j} in mV", fontsize=16)
# Tick-Labels größer
ax = plt.gca()
ax.tick_params(axis='both', which='major', labelsize=14)
ax.tick_params(axis='both', which='minor', labelsize=12)
if log_axes:
plt.xscale("log")
plt.yscale("log")
plt.xlim(1, xmax)
plt.ylim(1, ymax)
else:
plt.xlim(0, xmax)
plt.ylim(0, ymax)
if show_cuts:
# vertikale/horizontale 6.62 mV-Linien
plt.plot([5.82, 5.82], [0, ymax], "r--", lw=3)
plt.plot([0, xmax], [5.82, 5.82], "r--", lw=3)
# Paar-Cuts
if log_axes:
xi = np.logspace(np.log10(1), np.log10(xmax), 400)
else:
xi = np.linspace(0, xmax, 400)
lower = 0.75*(xi - 13)
upper = xi/0.75 + 13
plt.plot(xi, lower, "r-", lw=3)
plt.plot(xi, upper, "r-", lw=3)
cbar = plt.colorbar(h[3], label="Counts (log)")
cbar.set_label("Counts (log)", fontsize=20)
cbar.ax.tick_params(labelsize=14)
plt.tight_layout()
plt.show()
return df_out
###############################################################################
# Main
###############################################################################
def main():
datei = "2025-08-14-longtime-23e-kurz.MHM"
df = read_mhm_file(datei, verbose=True)
filtered_df = plot_and_filter_bgo(
df,
show_plots=True,
show_cuts=True,
log_axes=False,
bins=1500, #bei logskale ruhig 150 bins und x/y max 5000
xmax=500,
ymax=500,
verbose=True,
)
save_filtered_mhm(filtered_df, "2025-08-14-longtime-23e-kurz_Filtered.MHM")
if __name__ == "__main__":
main()

View file

@ -1,162 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 20 10:25:33 2025
@author: milanmarx
"""
import datetime
import pandas as pd
import matplotlib.pyplot as plt
from collections import defaultdict, Counter
def parse_aha_file(filepath):
"""
Liest eine .AHA-Datei und gibt:
results als Liste [name, messwert, druck, zeit]
counts_df als DataFrame mit Spalten ["UTC", "Channel", "Count"]
zurück.
"""
def bate(l):
parts = l.split()
if len(parts) < 8:
return [None, None]
try:
word = [int(num, 16) for num in parts[3:7]]
d = [int(num, 16) for num in parts[7:]]
except ValueError:
return [None, None]
c = [
word[0] >> 1,
((word[2] & 63) << 6) | (word[3] & 63),
word[3] >> 6,
word[2] >> 6,
((word[0] & 1) << 10) | (word[1] >> 6),
word[1] & 63
]
ut1 = 8 * c[4] + 20224
dT = d[1] - ut1
off = c[1] * 4 + ((c[3] - 512) * dT) / 4096
sens = c[0] + (c[2] * dT) / 1024 + 24576
x = (sens * (d[0] - 7168)) / 16384 - off
temp = (200 + dT * (c[5] + 50) / 1024) / 10
pressure = (x * 10 / 32 + 2500) / 10 + 3
if not (-50 < temp < 80) or not (pressure < 1200):
return [None, None]
return [temp, pressure]
# Kanalzuordnung
channel_map = {
(1, 0): "A17", (1, 1): "A10", (1, 2): "A19", (1, 3): "B1 (BGO_1)", (1, 4): "A5",
(1, 5): "A6", (1, 6): "A14", (1, 7): "A15", (1, 8): "A16", (1, 9): "A7",
(1, 10): "A4", (1, 11): "C1 (HET-A)", (1, 12): "B2 (BGO_1)", (1, 13): "A3", (1, 14): "A9",
(1, 15): "A18", (1, 16): "A13", (1, 17): "A20", (1, 18): "A1", (1, 19): "A2",
(1, 20): "B3 (BGO_1)", (1, 21): "A8", (1, 22): "A12", (1, 23): "A11",
(0, 0): "E2", (0, 1): "E7", (0, 2): "E20", (0, 3): "D1 (BGO_2)", (0, 4): "E14",
(0, 5): "E15", (0, 6): "E5", (0, 7): "E6",
(0, 8): "E1", (0, 9): "E10", (0, 10): "E13", (0, 11): "C2 (HET-B)", (0, 12): "D2 (BGO_2)",
(0, 13): "E18", (0, 14): "E12", (0, 15): "E3",
(0, 16): "E4", (0, 17): "E19", (0, 18): "E16", (0, 19): "E17", (0, 20): "D3 (BGO_2)",
(0, 21): "E11", (0, 22): "E9", (0, 23): "E8"
}
results = []
temperature = pressure = None
first_valid_temp = first_valid_press = None
zeit = None
with open(filepath, "r", encoding="utf-8") as file:
for line in file:
line = line.strip()
if not line:
continue
if line.startswith("P"):
l = bate(line)
if l and all(v is not None for v in l):
temperature, pressure = l
if first_valid_temp is None:
first_valid_temp = temperature
if first_valid_press is None:
first_valid_press = pressure
continue
if line.startswith("H "):
try:
zeit = datetime.datetime.fromtimestamp(int(float(line.split()[1])), datetime.timezone.utc)
except ValueError:
continue
continue
if not line.startswith("EDB "):
continue
parts = line.split()
if len(parts) != 11:
continue
try:
slice_nr = int(float(parts[2]))
kanal_nr = int(float(parts[3]))
messwert = float(parts[-1]) / 0x20000
except ValueError:
continue
if slice_nr not in (0, 1) or not (0 <= kanal_nr <= 23):
continue
name = channel_map.get((slice_nr, kanal_nr))
if not name:
continue
temp_used = temperature if temperature is not None else first_valid_temp
press_used = pressure if pressure is not None else first_valid_press
results.append([name, messwert, press_used, zeit])
# Counts nach UTC-Zeit und Kanal
counts = defaultdict(Counter)
for name, _, _, zeit in results:
if zeit is not None:
counts[zeit][name] += 1
# In DataFrame umwandeln
rows = []
for t, chans in counts.items():
for name, c in chans.items():
rows.append({"UTC": t, "Channel": name, "Count": c})
counts_df = pd.DataFrame(rows)
return results, counts_df
def plot_channel_counts(counts_df, channel_name):
"""
Plottet die Counts eines bestimmten Kanals gegen UTC-Zeit.
"""
df = counts_df[counts_df["Channel"] == channel_name].sort_values("UTC")
if df.empty:
print(f"Keine Daten für Kanal {channel_name}.")
return
plt.figure(figsize=(10, 5))
plt.plot(df["UTC"], df["Count"], marker="x")
plt.title(f"Count-Verlauf für Kanal {channel_name}")
plt.xlabel("UTC-Zeit")
plt.ylabel("Count")
plt.grid(True)
plt.tight_layout()
plt.show()
results, counts_df = parse_aha_file("sdcard_filter_2025-10-08_CUT.AHA")
plot_channel_counts(counts_df, "D1 (BGO_2)")

View file

@ -1,401 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Oct 16 12:54:35 2025
@author: milanmarx
"""
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import LogNorm
def read_mhm_file(filepath):
"""Liest eine MHM-Datei ein und gibt einen DataFrame zurück."""
return pd.read_csv(filepath, sep ="\s+")
def plot_histogram_from_mhm(
data_or_path,
plot_channels,
condition_channels=None,
bins=200,
require_all=True,
apply_bgo_correction=False,
):
"""
Plottet Histogramme aus einer MHM-Datei
data_or_path: Gib den dateipfad oder datei an
plot_channels: Gib die zu plottenden Kanäle an
condition_channels: Welche Kanäle müssen in den zulässigen events aktiv gewesen sein?
bins: bin zahl für das hist
require_all: alle angegebenden Bedingungskanäle müssen im event sein wenn True
apply_bgo_correction: Wenn True, multipliziert BGO-Kanäle mit Faktor von Ava Temp korrektur
"""
factor_column="KorrFaktor" # Spaltenname mit dem Korrekturfaktor von Ava
bgo_channels = ["B1", "B2", "B3", "D1", "D2", "D3"] # Definiert die BGO-Kanäle
if isinstance(data_or_path, pd.DataFrame): #entweder dateipfad oder data frame
df = data_or_path.copy()
filepath = "sdcard_filter_2025-10-08_CUT.AHA"
else:
df = read_mhm_file(data_or_path)
filepath = data_or_path # Merkt sich den ursprünglichen Pfad für Plot-Titel
if apply_bgo_correction: #wenn aktiv, werden BGO werte um temperaturfaktor korrigiert
for ch in bgo_channels:
if ch in df.columns and factor_column in df.columns:
df[ch] = df[ch] * df[factor_column]
# Wenn keine Bedingungskanäle übergeben wurden, wird ein leeres Set verwendet.
# Wenn welche angegeben sind, werden sie in ein Set umgewandelt, um doppelte Einträge zu vermeiden.
if condition_channels is None:
condition_channels = set()
else:
condition_channels = set(condition_channels)
# Wenn Bedingungskanäle existieren (z. B. "B1", "B2"), wird geprüft:
# - require_all=True: alle Kanäle müssen ein Signal > 0 haben
# - require_all=False: mindestens einer davon muss > 0 sein
if condition_channels:
valid_channels = [ch for ch in condition_channels if ch in df.columns] # Nur vorhandene Spalten berücksichtigen
if require_all:
mask = (df[valid_channels] > 0).all(axis=1) # Alle Kanäle aktiv
else:
mask = (df[valid_channels] > 0).any(axis=1) # Mindestens einer aktiv
df = df[mask]
# Wenn nach der Filterung keine Events mehr übrig sind, wird abgebrochen
if df.empty:
print("Keine passenden Events gefunden Plot wird übersprungen.")
return
# Erstelle ein Dictionary mit den Werten der gewünschten Plot-Kanäle.
# Nur Werte > 0 werden berücksichtigt
channel_values = {ch: [] for ch in plot_channels}
for ch in plot_channels:
if ch in df.columns:
vals = df[ch].values
vals = vals[vals > 0]
channel_values[ch].extend(vals)
fig, ax = plt.subplots(figsize=(12, 7))
# Farbpalette für verschiedene Kanäle (zyklisch wiederverwendet)
base_colors = [
'#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd',
'#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf'
]
color_map = {ch: base_colors[i % len(base_colors)] for i, ch in enumerate(plot_channels)}
# Wertebereich der x-Achse festlegen (Signal in mV)
min_val, max_val = 0, 150
range_padding = (max_val - min_val) * 0.05
hist_range = (min_val - range_padding, max_val + range_padding)
#Histogramme plotten
handles, labels = [], []
for ch in plot_channels:
values = channel_values[ch]
if len(values) > 0:
# Erstellt logarithmisches Histogramm für jeden Kanal
hist_line = ax.hist(
values, bins=bins, range=hist_range,
label=ch, color=color_map[ch],
histtype='step', linewidth=1.2, log=True
)[2][0]
handles.append(hist_line)
labels.append(ch)
# Achsenbeschriftungen und Titel
ax.set_xlabel("Signal [mV]", fontsize = 14)
ax.set_ylabel("Counts", fontsize = 14)
ax.set_title(f"Histogram from file: {filepath}", fontsize = 14)
# Die Kanäle werden in zwei Gruppen unterteilt (oben/unten, also Slice 0/1)
group1_order = [f"A{i}" for i in range(1, 21)] + [f"B{i}" for i in range(1, 4)] + ["C1"]
group2_order = [f"E{i}" for i in range(1, 21)] + [f"D{i}" for i in range(1, 4)] + ["C2"]
# Hilfsfunktion, um die Handles sortiert nach Kanalnamen zu gruppieren
def group_handles(order):
grouped_h, grouped_l = [], []
for name in order:
for h, l in zip(handles, labels):
if l == name:
grouped_h.append(h)
grouped_l.append(l)
return grouped_h, grouped_l
# Legenden für obere und untere Detektoren erzeugen
group1_handles, group1_labels = group_handles(group1_order)
group2_handles, group2_labels = group_handles(group2_order)
# Zwei getrennte Legenden an der rechten Seite des Plots platzieren
legend1 = ax.legend(group1_handles, group1_labels, loc='center left',
bbox_to_anchor=(1.02, 0.55), frameon=False, title="Slice 0 ", fontsize = 11)
legend2 = ax.legend(group2_handles, group2_labels, loc='center left',
bbox_to_anchor=(1.18, 0.55), frameon=False, title="Slice 1 ", fontsize = 11)
#Legend-Titel (Überschriften) größer machen
legend1.get_title().set_fontsize(16)
legend2.get_title().set_fontsize(16)
ax.add_artist(legend1)
# Layout anpassen und Plot anzeigen
plt.tight_layout()
plt.show()
plt.close('all')
def plot_and_filter_bgo(
df, # Eingabe-DataFrame
show_plots=True, # Ob Plots erzeugt und angezeigt werden sollen
show_cuts=True, # Ob die Cut-Linien in den Plots gezeichnet werden sollen
apply_threshold=False, # Ob die Schwellenwerte aus thr_map angewendet werden
log_axes=False, # Schaltet log-Skala + log-Bins für die Histogramme ein/aus
log_counts=True, # Schaltet logarithmische Farbskala (Counts) im 2D-Hist ein/aus
bins=400, # Anzahl der Bins pro Achse (bei log_axes: pro Dekade gleich verteilt)
xmax=5000, # Plot-Grenze in x-Richtung
ymax=5000, # Plot-Grenze in y-Richtung
):
"""
Plottet BGO-Kanalpaare (B1-B2, B1-B3, B2-B3, D1-D2, D1-D3, D2-D3)
mit logarithmischer Farbskala (Counts) und optional logarithmischer Achse.
Zwei Cuts:
1. D1, D2, D3, B1, B2, B3 > 10 mV
2. 0.85*(Di - 30) < Dj < Di/0.85 + 30 (und analog für Bi/Bj)
Nur Events, die beide Cuts erfüllen, werden zurückgegeben.
"""
# Threshold Map
thr_map = {
"A1": 13, "A2": 13, "A3": 13, "A4": 13, "A5": 13, "A6": 13, "A7": 13, "A8": 13, "A9": 13, "A10": 13,
"A11": 13, "A12": 13, "A13": 13, "A14": 13, "A15": 13, "A16": 13, "A17": 13, "A18": 13, "A19": 13, "A20": 13,
"E1": 13, "E2": 13, "E3": 13, "E4": 13, "E5": 13, "E6": 13, "E7": 13, "E8": 13, "E9": 13, "E10": 13,
"E11": 13, "E12": 13, "E13": 13, "E14": 13, "E15": 13, "E16": 13, "E17": 13, "E18": 13, "E19": 13, "E20": 13,
"B1": 0, "B2": 0, "B3": 0, # BGO auf null aber ich filter ja später 13mV
"D1": 0, "D2": 0, "D3": 0,
"C1": 15, "C2": 12
}
df_filtered = df.copy() # Arbeitskopie, damit das Original-DataFrame unverändert bleibt
# Keine negativen Werte zulassen
for col in ["B1", "B2", "B3", "D1", "D2", "D3"]:
if col in df_filtered.columns: # Nur wenn die Spalte existiert
df_filtered.loc[df_filtered[col] < 0, col] = 0 # Negative Werte auf 0 setzen
# Threshold anwenden
if apply_threshold:
for ch, thr in thr_map.items(): # Für jeden Kanal mit definiertem Threshold
if ch in df_filtered.columns:
before = (df_filtered[ch] > 0).sum() # Anzahl >0 vor dem Thresholding
df_filtered.loc[df_filtered[ch] < thr, ch] = 0 # Werte unterhalb der Schwelle auf 0
after = (df_filtered[ch] > 0).sum() # Anzahl >0 nach dem Thresholding
print(f"Threshold angewendet auf {ch}: {before}{after} gültige Werte (≥ {thr})")
# CUT 1: Alle BGO-Signale > 10 mV
cut1_mask = (df_filtered[["B1", "B2", "B3", "D1", "D2", "D3"]] > 10).all(axis=1)
# erzeugt eine boolesche Maske über alle Zeilen:
# True nur dann, wenn in B1..B3 und D1..D3 jeweils > 10 mV ist.
# CUT 2: Bereichsformeln für jedes Paar
B1, B2, B3 = df_filtered["B1"], df_filtered["B2"], df_filtered["B3"] # Kurzreferenzen für B-Kanäle
D1, D2, D3 = df_filtered["D1"], df_filtered["D2"], df_filtered["D3"] # Kurzreferenzen für D-Kanäle
# Für jedes Paar i,j: 0.85*(i - 30) < j < i/0.85 + 30
mask_B12 = (B2 > 0.85*(B1 - 30)) & (B2 < B1/0.85 + 30)
mask_B13 = (B3 > 0.85*(B1 - 30)) & (B3 < B1/0.85 + 30)
mask_B23 = (B3 > 0.85*(B2 - 30)) & (B3 < B2/0.85 + 30)
mask_D12 = (D2 > 0.85*(D1 - 30)) & (D2 < D1/0.85 + 30)
mask_D13 = (D3 > 0.85*(D1 - 30)) & (D3 < D1/0.85 + 30)
mask_D23 = (D3 > 0.85*(D2 - 30)) & (D3 < D2/0.85 + 30)
cut2_mask = mask_B12 & mask_B13 & mask_B23 & mask_D12 & mask_D13 & mask_D23
# Nur True, wenn ALLE sechs Paarbedingungen erfüllt sind.
# falls man den cut nur mit D machen will :
"""
cut1_mask = (df_filtered[[ "D1", "D2", "D3"]] > 13).all(axis=1)
# erzeugt eine boolesche Maske über alle Zeilen:
# True nur dann, wenn in B1..B3 und D1..D3 jeweils > 13 mV ist.
# CUT 2: Bereichsformeln für jedes Paar
# Kurzreferenzen für B-Kanäle
D1, D2, D3 = df_filtered["D1"], df_filtered["D2"], df_filtered["D3"] # Kurzreferenzen für D-Kanäle
# Für jedes Paar i,j: 0.85*(i - 30) < j < i/0.85 + 30
mask_D12 = (D2 > 0.85*(D1 - 30)) & (D2 < D1/0.85 + 30)
mask_D13 = (D3 > 0.85*(D1 - 30)) & (D3 < D1/0.85 + 30)
mask_D23 = (D3 > 0.85*(D2 - 30)) & (D3 < D2/0.85 + 30)
cut2_mask = mask_D12 & mask_D13 & mask_D23
# Nur True, wenn ALLE sechs Paarbedingungen erfüllt sind.
"""
total_cut_mask = cut1_mask & cut2_mask
# Gesamtmaske: beide Cut-Gruppen müssen erfüllt sein.
# Plot: B1B2, B1B3, B2B3, D1D2, D1D3, D2D3
if show_plots:
pairs = [("B1", "B2"), ("B1", "B3"), ("B2", "B3"), # Liste der Paare
("D1", "D2"), ("D1", "D3"), ("D2", "D3")]
for (ch_i, ch_j) in pairs: # Schleife über alle Paare
x = df_filtered[ch_i].astype(float) # x-Daten als float
y = df_filtered[ch_j].astype(float) # y-Daten als float
# Nur positive Werte für log erlauben
valid = (x > 0) & (y > 0)
x = x[valid] # Auf gültige Werte filtern
y = y[valid]
fig, ax = plt.subplots(figsize=(7, 6))
# Bins wählen
if log_axes: # Falls log-Skala gewünscht:
bins_x = np.logspace(np.log10(1), np.log10(xmax), bins) # Logarithmisch verteilte Binkanten X
bins_y = np.logspace(np.log10(1), np.log10(ymax), bins) # Logarithmisch verteilte Binkanten Y
else:
bins_x = bins_y = bins # Linear: nur Anzahl übergeben (automatische Kanten)
# Histogramm
h = ax.hist2d(
x, y, # Daten
bins=[bins_x, bins_y], # Bin-Kanten (logarithmisch oder Anzahl)
cmap="viridis", # Farbkarte
norm=LogNorm(vmin=1) if log_counts else None # Farbnorm: Log der Zählwerte (Counts), vmin=1
)
# Farbskala
cbar = plt.colorbar(h[3], ax=ax) # Colorbar an Achse hängen; h[3] ist das QuadMesh
cbar.set_label("Counts", fontsize=12) # Beschriftung der Colorbar
ax.set_xlabel(f"{ch_i} [mV]", fontsize=13) # x-Achsenlabel
ax.set_ylabel(f"{ch_j} [mV]", fontsize=13) # y-Achsenlabel
ax.set_title(f"{ch_i} vs {ch_j}", fontsize=13)# Plot-Titel
# Achsen
if log_axes: # Falls log-Skala:
ax.set_xscale("log") # x-Achse logarithmisch
ax.set_yscale("log") # y-Achse logarithmisch
ax.set_xlim(1, xmax) # Bereich (nicht bei 0 anfangen, log undefiniert)
ax.set_ylim(1, ymax)
else:
ax.set_xlim(0, xmax) # Linear: 0 bis max
ax.set_ylim(0, ymax)
# Cut-Linien overlay
if show_cuts: # Nur wenn gewünscht
ax.plot([10, 10], [1 if log_axes else 0, ymax], "r--", lw=1.2) # Vertikale Linie bei x=20 mV
ax.plot([1 if log_axes else 0, xmax], [10, 10], "r--", lw=1.2) # Horizontale Linie bei y=20 mV
xi = (np.logspace(np.log10(1), np.log10(xmax), 400) # x-Stützstellen für Cut-Bänder
if log_axes else
np.linspace(0, xmax, 400))
lower = 0.75*(xi - 30) # Untere Cut-Grenze
upper = xi/0.75 + 30 # Obere Cut-Grenze
ax.plot(xi, lower, "r-", lw=1) # Untere Grenze zeichnen
ax.plot(xi, upper, "r-", lw=1) # Obere Grenze zeichnen
plt.tight_layout()
plt.show()
# Rückgabe: gefilterter Datensatz
df_result = df_filtered[total_cut_mask].copy() # Nur Zeilen, die alle Cuts bestehen
print(f"Verbleibende Events nach allen Cuts: {len(df_result)} / {len(df)}")
return df_result # Gefiltertes DataFrame zurückgeben
if __name__ == "__main__":
Dioden_alle = ["A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9", "A10", "A11", "A12", "A13", "A14", "A15", "A16", "A17", "A18", "A19", "A20",
"E1", "E2", "E3", "E4", "E5", "E6", "E7", "E8", "E9", "E10", "E11", "E12", "E13", "E14", "E15", "E16", "E17", "E18", "E19", "E20"]
Dioden_oben = ["A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9", "A10", "A11", "A12", "A13", "A14", "A15", "A16", "A17", "A18", "A19", "A20"]
Dioden_unten = ["E1", "E2", "E3", "E4", "E5", "E6", "E7", "E8", "E9", "E10", "E11", "E12", "E13", "E14", "E15", "E16", "E17", "E18", "E19", "E20"]
BGO = ["B1", "B2", "B3", "D1", "D2", "D3"]
HET = ["C2", "C1"]
All = ["A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9", "A10", "A11", "A12", "A13", "A14", "A15", "A16", "A17", "A18", "A19", "A20",
"E1", "E2", "E3", "E4", "E5", "E6", "E7", "E8", "E9", "E10", "E11", "E12", "E13", "E14", "E15", "E16", "E17", "E18", "E19", "E20",
"B1", "B2", "B3", "D1", "D2", "D3", "C2", "C1"]
datei = "2025-08-14-longtime-23eMHA.MHM"
# Datei einlesen
df = read_mhm_file(datei)
filtered_df = plot_and_filter_bgo(
df,
show_plots=True,
show_cuts=True,
apply_threshold=True,
log_axes=True,
log_counts=True,
bins=150, # empfohlen: 150 für log, 1500 für normal
xmax=5000,
ymax=5000,
)
#hist erstellen
plot_histogram_from_mhm(
filtered_df,
plot_channels=All,
apply_bgo_correction=False,
condition_channels= BGO,
require_all= True,
bins=200
)

View file

@ -1,171 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Nov 20 12:26:18 2025
@author: milanmarx
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.odr import ODR, Model, RealData
"""
Dieses Skript liest eine Count-Table.csv ein,
wertet die Winkelabhängigkeit der Fluence aus,
führt einen ODR-Fit mit durch und plottet das Ergebnis.
"""
def analyze_fluence_vs_angle(
input_csv,
bin_width_deg=10
):
"""
Rückgabe:
dict mit
"grouped" : DataFrame mit gebinnten Mittelwerten
"fit_params" : Dictionary mit Fitparametern
"""
# CSV-Datei einlesen
df = pd.read_csv(input_csv, sep=";")
# Fluence berechnen
df["Fluence"] = df["fluence"]
# Poisson-Fehler der Counts auf die Fluenz
df["Fluence_err"] = np.sqrt(df["counts"]) / df["G_pair_MC_cm2_sr"]
# Winkel übernehmen
df["theta_deg"] = df["theta_MC_mode_deg"]
df["theta_err_deg"] = df["theta_MC_std_deg"]
# Winkel-Binning
df["theta_bin"] = (df["theta_deg"] / bin_width_deg).round() * bin_width_deg
# Gruppierung pro Winkel-Bin
# Mittelwerte innerhalb jedes Bins
grouped = df.groupby("theta_bin").agg(
theta_mean_deg=("theta_deg", "mean"),
theta_err_deg=("theta_err_deg", "mean"),
fluence_mean=("Fluence", "mean"),
fluence_err_mean=("Fluence_err", "mean")
).reset_index(drop=True)
winkel = grouped["theta_mean_deg"].values
fluence = grouped["fluence_mean"].values
angle_err = grouped["theta_err_deg"].values
fluence_err = grouped["fluence_err_mean"].values
# 7. Fitbereich fest auf unter 65°
mask = winkel <= 65
fit_winkel = winkel[mask]
fit_fluence = fluence[mask]
fit_angle_err = angle_err[mask]
fit_fluence_err = fluence_err[mask]
def f_odr(B, x):
A, n = B
return A * np.abs(np.cos(np.radians(x))) ** n
# Startwerte
A0 = np.nanmax(fit_fluence)
n0 = 2.0
p0 = [A0, n0]
# ODR-Datenobjekt
data = RealData(
fit_winkel,
fit_fluence,
sx=fit_angle_err,
sy=fit_fluence_err
)
# ODR-Fit
model = Model(f_odr)
odr = ODR(data, model, beta0=p0)
out = odr.run()
# Fit-Ergebnis extrahieren
A, n = out.beta
A_err, n_err = out.sd_beta
fit_result = {
"A": A,
"n": n,
"A_err": A_err,
"n_err": n_err
}
# Ausgabe der Fitparameter
print("\nODR-Fit (A · |cos(θ)|^n, θ ≤ 60°):")
for k, v in fit_result.items():
print(f" {k} = {v}")
# Plot
plt.figure(figsize=(9, 6))
# Messpunkte mit Fehlerbalken
plt.errorbar(
winkel,
fluence,
xerr=angle_err,
yerr=fluence_err,
fmt="o",
capsize=3,
markersize=6,
label="average Fluence"
)
# Fit-Kurve
theta_plot = np.linspace(0, 110, 600)
fit_curve = A * np.abs(np.cos(np.radians(theta_plot))) ** n
plt.plot(
theta_plot,
fit_curve,
"-",
linewidth=2,
label=f"Fit: A·|cos(θ)|ⁿ\nn = {n:.2f} ± {n_err:.2f}"
)
# Achsen und Layout
plt.xlabel("Zenith angle θ in °", fontsize=20)
plt.ylabel(r"Fluence in $\frac{1}{\mathrm{cm}^2\,\mathrm{sr}}$", fontsize=20)
plt.title(
f"A·|cos(θ)|ⁿ-Fit (bin width: {bin_width_deg}°, Fit ≤ 60°)",
fontsize=20
)
plt.ylim(bottom=0)
plt.xlim(-2, 110)
plt.tick_params(axis="both", labelsize=18, length=6)
plt.minorticks_on()
plt.grid(True)
plt.legend(fontsize=14)
plt.tight_layout()
plt.show()
return {
"grouped": grouped,
"fit_params": fit_result
}
def main():
analyze_fluence_vs_angle("AE_counts_2025-08-14-longtime-23e-kurz_Filtered.csv")
if __name__ == "__main__":
main()

View file

@ -1,271 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 15 13:49:28 2025
@author: milanmarx
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
"""
Liest die beiden Geometrie Tabellen ein und eine MHM datei.
Aus diesen wird eine neue Tabelle mit counts und Fluence
"""
###############################################################################
# MHM einlesen
###############################################################################
def load_mhm(filename):
"""
liest event datei ein mit Endung .MHM (zum Verständnis MHM.py anschauen)
"""
# Einlesen der Rohdaten ohne Header
df = pd.read_csv(filename, sep=r"\s+", header=None, low_memory=False)
#hier unwichtige Daten
meta_cols = [
"EDM","Zeit","Kanaele","Druck","Hoehe","Temp","UTC","Korr"
]
# Alle Detektorkanäle definieren
A_cols = [f"A{i}" for i in range(1, 21)]
B_cols = ["B1","B2","B3"]
C_cols = ["C1","C2"]
D_cols = ["D1","D2","D3"]
E_cols = [f"E{i}" for i in range(1, 21)]
# Gesamte Spaltenliste
df.columns = meta_cols + A_cols + B_cols + C_cols + D_cols + E_cols
# Detektorkanäle zu float konvertieren
det_cols = A_cols + B_cols + C_cols + D_cols + E_cols
df[det_cols] = df[det_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0)
return df
###############################################################################
#Koinzidenz Zähler
###############################################################################
def count_coincidences(df):
"""
Zählt alle Koinzidenzen zwischen:
AE (20 × 20 mögliche Kombinationen)
EC2 (20 mögliche Kombinationen)
"""
AE_counts = np.zeros((20, 20), dtype=int)
EC2_counts = np.zeros(20, dtype=int)
for _, row in df.iterrows():
# Alle AWerte
A_values = [row[f"A{i}"] for i in range(1, 21)]
E_values = [row[f"E{i}"] for i in range(1, 21)]
# Maximalen A-Kanal finden
maxA_value = max(A_values)
A_active = []
if maxA_value > 0:
maxA_index = A_values.index(maxA_value) + 1
A_active = [maxA_index] # nur 1 Kanal
# Maximalen E-Kanal finden
maxE_value = max(E_values)
E_active = []
if maxE_value > 0:
maxE_index = E_values.index(maxE_value) + 1
E_active = [maxE_index] # nur 1 Kanal
# C2 wie bisher
C2_active = row["C2"] > 0
# Sonderfall: nur C2 aktiv dann ignorieren
if C2_active and not A_active and not E_active:
continue
# AE Koinzidenz (nur 1×1 Kombination)
for a in A_active:
for e in E_active:
AE_counts[a-1, e-1] += 1
# EC2 Koinzidenzen
if C2_active:
for e in E_active:
EC2_counts[e-1] += 1
return AE_counts, EC2_counts
###############################################################################
# Geometrietabellen einlesen und Count Tabelle machen
###############################################################################
def build_AE_count_table(AE_geom_file, AE_counts):
"""
Kombiniert die geometrischen Monte-Carlo-Daten der AE-Paare
(mit den tatsächlich gemessenen Counts (AE_counts).
"""
# Geometrie-Tabelle laden
df_geom = pd.read_csv(AE_geom_file, sep=";")
rows = []
for _, row in df_geom.iterrows():
# Mehrere Koincidenzen in einem Eintrag dann Liste erzeugen
# ich will ja jetzt die counts für jede Koinzidenz
coincs = row["coincidence"].replace(" ", "").split(",")
for c in coincs:
# Axx-Eyy trennen
Aname, Ename = c.split("-")
a = int(Aname[1:])
e = int(Ename[1:])
counts = AE_counts[a-1, e-1]
rows.append({
"coincidence": c,
"phi_lab_deg": row["phi_lab_deg"],
"theta_span_lab_deg": row["theta_span_lab_deg"],
"G_pair_MC_cm2_sr": row["G_pair_MC_cm2_sr"],
"theta_MC_mode_deg": row["theta_MC_mode_deg"],
"theta_MC_mean_deg": row["theta_MC_mean_deg"],
"theta_MC_std_deg": row["theta_MC_std_deg"],
"counts": counts
})
# Gesamttabelle aller AE-Koinzidenzen
df2 = pd.DataFrame(rows)
# Gruppieren nach identischem Geometriefaktor um dann Mittelwerte/Std zu berechnen
g = df2.groupby("G_pair_MC_cm2_sr")["counts"]
df2["counts_mean"] = df2["G_pair_MC_cm2_sr"].map(g.mean().to_dict())
df2["counts_std"] = df2["G_pair_MC_cm2_sr"].map(g.std(ddof=0).to_dict())
# Fluence = gemessene counts / Geometriefaktor
df2["fluence"] = df2["counts"] / df2["G_pair_MC_cm2_sr"]
return df2
def build_CE_count_table(CE_geom_file, EC2_counts):
"""
Baut eine Tabelle aller EC2-Coincidenzen anhand der Geometrie-Tabelle.
"""
df_geom = pd.read_csv(CE_geom_file, sep=";")
rows = []
for _, row in df_geom.iterrows():
c = row["coincidence"].replace(" ", "")
Ename, _ = c.split("-")
e = int(Ename[1:]) # Nummer extrahieren
counts = EC2_counts[e-1]
rows.append({
"coincidence": c,
"phi_lab_deg": row["phi_lab_deg"],
"G_pair_MC_cm2_sr": row["G_pair_MC_cm2_sr"],
"theta_MC_mode_deg": row["theta_MC_mode_deg"],
"theta_MC_mean_deg": row["theta_MC_mean_deg"],
"theta_MC_std_deg": row["theta_MC_std_deg"],
"counts": counts
})
df2 = pd.DataFrame(rows)
# Gruppieren nach Geometriefaktor
g = df2.groupby("G_pair_MC_cm2_sr")["counts"]
df2["counts_mean"] = df2["G_pair_MC_cm2_sr"].map(g.mean().to_dict())
df2["counts_std"] = df2["G_pair_MC_cm2_sr"].map(g.std(ddof=0).to_dict())
df2["fluence"] = df2["counts"] / df2["G_pair_MC_cm2_sr"]
return df2
###############################################################################
# Heatmap plot der counts AE
###############################################################################
def plot_AE_heatmap(AE_counts, title="AE Coincidence Heatmap Filtered"):
"""
Erzeugt eine 20×20 Heatmap aller A/E-Coincidenzen.
"""
plt.figure(figsize=(10, 9))
plt.imshow(AE_counts, origin='lower', cmap='viridis')
# Colorbar
cbar = plt.colorbar()
cbar.set_label("Counts", fontsize=20)
cbar.ax.tick_params(labelsize=16)
# Achsenbeschriftungen
plt.xlabel("E-Kanal", fontsize=22)
plt.ylabel("A-Kanal", fontsize=22)
# Titel
plt.title(title, fontsize=24)
# Tick-Labels vergrößern
plt.xticks(
np.arange(20),
[f"E{i}" for i in range(1, 21)],
rotation=45,
fontsize=20
)
plt.yticks(
np.arange(20),
[f"A{i}" for i in range(1, 21)],
fontsize=20
)
plt.tight_layout()
plt.show()
###############################################################################
# Main
###############################################################################
def main():
#MHM-Datei einlesen
df_mhm = load_mhm("sdcard_filter_2025-10-08_Float_CUT_Filtered.MHM")
#Koinzidenzen zählen
AE_counts, EC2_counts = count_coincidences(df_mhm)
#Tabellen aus Geometrie + Counts erzeugen
df_AE = build_AE_count_table("AE_test.csv", AE_counts)
df_CE = build_CE_count_table("CE_test.csv", EC2_counts)
#Exportieren
#
df_AE.to_csv("AE_counts_sdcard_filter_2025-10-08_Float_CUT_Filtered.csv", sep=";", index=False)
df_CE.to_csv("CE_counts_sdcard_filter_2025-10-08_Float_CUT_Filtered.csv", sep=";", index=False)
print("AE_counts_table.csv und CE_counts_table.csv erzeugt.")
#Visualisierung
plot_AE_heatmap(AE_counts)
if __name__ == "__main__":
main()

View file

@ -1,818 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 15 10:45:59 2025
@author: milanmarx
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as patches
"""
Dieses Skript gibt alle geometrisch interessanten Daten für jede Dioden Koinzidenz in einer Tabelle an
Hierbei werden manche Größen rein geometrisch, andere über die MC Simmulation bestimmt
Der Output sind zwei tabellen für jeweils A-E und E-C2 Koinzidenzen
"""
###############################################################################
# Geometrie Definition
###############################################################################
# Koordinatensystem (Dioden-Frame):
# - x-Achse: entlang der Reihen (rows)
# - y-Achse: quer dazu, entlang der Spalten (columns: links/mittig/rechts)
# - z-Achse: senkrecht zu den Diodenebenen (E bei z=0, A bei z=L)
# Später wird dieses System um die y-Achse rotiert, um den Labor-Frame zu bekommen.
# Abstand zwischen E- und A-Diodenebene:
L = 56.7 # E liegt bei z=0, A liegt bei z=L
# Abstand von der E-Diodenebene zur C2-Ebene:
L_C = L / 2.0 # = 28.35 mm
# Größe der Dioden
Dx = 10.0 # Breite in x-Richtung (mm)
Dy = 20.0 # Länge in y-Richtung (mm)
#diese vier sind nur für die unterste plot funktion
gap_y = 7.0
gap_x = 4.2
abstand_x = Dx + gap_x
abstand_y = Dy + gap_y
A_pix_cm2 = (Dx * Dy) / 100.0 # Sensitive Fläche in cm^2
# Abstand zwischen Diodenreihen in x-Richtung (bedingt durch das Gehäuse der Dioden)
x_step = 14.2
x_positions = np.arange(8) * x_step # 8 reihen
# y-Positionen der drei Diodenspalten (Columns) in mm:
y_left, y_mid, y_right = -27.0, 0.0, +27.0
# Kreisdetektor C2 :
# Radius in mm
# Verschoben in x-Richtung
C2_radius = 36.49 / 2.0 # mm
C2_x, C2_y = 49.7, 0.0 # Mittelpunkt des Kreises in der Ebene z = L_C
###############################################################################
# Dioden Position in Koordinatensystem
###############################################################################
def build_diode_map(prefix="A"):
"""
Erstellt eine Zuordnung von Diodennamen zu deren Position (x, y) im Dioden-Koordinatensystem.
Rückgabe
Dictionary der Form { 'A1': (x_mm, y_mm), 'A2': (...), ... }.
"""
mapping = {}
# Anordnung der Dioden in drei Spalten:
# Linke Spalte hat 6 Dioden, mittlere 8, rechte 6.
# Linke Spalte (6 Dioden), alle mit y = y_left
left_nums = [1, 4, 7, 10, 13, 16]
for i, num in enumerate(left_nums, start=1):
# i läuft über 1..6, x_positions[i] liefert die entsprechende x-Row-Position
mapping[f"{prefix}{num}"] = (float(x_positions[i]), float(y_left))
# Mittlere Spalte (8 Dioden), alle mit y = y_mid
mid_nums = [19, 2, 5, 8, 11, 14, 17, 20]
for i, num in enumerate(mid_nums):
mapping[f"{prefix}{num}"] = (float(x_positions[i]), float(y_mid))
# Rechte Spalte (6 Dioden), alle mit y = y_right
right_nums = [3, 6, 9, 12, 15, 18]
for i, num in enumerate(right_nums, start=1):
mapping[f"{prefix}{num}"] = (float(x_positions[i]), float(y_right))
return mapping
# Dict für A- und E-Dioden:
mapA, mapE = build_diode_map("A"), build_diode_map("E")
def compute_dc_dr(a_name, e_name):
"""
Berechnet delta in collums and rows
Liefert:
dc : Spaltenoffset (A minus E) in Einheiten des Column-Abstands (27 mm)
dr : Reihenoffset (A minus E) in Einheiten des Row-Abstands (14.2 mm)
dx_mm : Differenz der x-Positionen (Ax - Ex) in mm
dy_mm : Differenz der y-Positionen (Ay - Ey) in mm
"""
# Positionen der A- und E-Diode aus den Maps:
xA, yA = mapA[a_name]
xE, yE = mapE[e_name]
dx_mm = xA - xE
dy_mm = yA - yE
# Diskreter Row-Offset in Einheiten des Schrittmaßes x_step
dr = int(round(dx_mm / x_step))
# Diskreter Column-Offset in Einheiten des y-Abstands zwischen Spalten:
col_step = y_mid - y_left
dc = int(round(dy_mm / col_step)) if col_step != 0 else 0
return dc, dr, dx_mm, dy_mm
###############################################################################
# Geometrische bestimmung
###############################################################################
# Der "Dioden-Frame" wird durch eine Rotation um die y-Achse in das
# Labor-Koordinatensystem überführt. Der Rotationswinkel deg beschreibt
# dabei die Neigung des Detektors gegenüber dem Laborsystem.
def _rotation_matrix_y(deg): # rotationsmatrix
th = np.radians(deg)
return np.array([
[ np.cos(th), 0, np.sin(th)],
[ 0, 1, 0 ],
[-np.sin(th), 0, np.cos(th)]
])
def phi_angle_lab(dx, dy, L=L, deg=45.0):
"""
Berechnet den Azimutwinkel phi im Labor-Koordinatensystem in Grad.
(betrachtet Mittelpunkt zu Mittelpunkt)
phi ist Winkel von x Achse in richtung y achse (mathematisch positiv)
"""
v = np.array([dx, dy, L], dtype=float) # Richtungsvektor im Dioden-Frame
Ry = _rotation_matrix_y(deg)
v_lab = Ry @ v # ins Labor gedreht
x_lab, y_lab, _ = v_lab
phi = np.degrees(np.arctan2(y_lab, x_lab)) # liefert [-180..180]
return float(phi)
def theta_angle_lab(dx, dy, L=L, deg=45.0):
"""
Berechnet den Polarwinkel theta im Labor-Frame nach Rotation um die y-Achse.
(betrachtet Mittelpunkt zu Mittelpunkt)
theta ist der Winkel zwischen dem Richtungsvektor und der z-Achse
"""
v = np.array([dx, dy, L], dtype=float)
v /= np.linalg.norm(v) # Normierung des Richtungsvektors
Ry = _rotation_matrix_y(deg)
v_lab = Ry @ v # Rotation ins Labor-Frame
cos_theta = np.clip(v_lab[2] / np.linalg.norm(v_lab), -1.0, 1.0)
return float(np.degrees(np.arccos(cos_theta)))
def theta_min_max_lab(dx, dy, L=L, Dx=Dx, Dy=Dy, deg=45.0):
"""
Schätzt minimalen und maximalen Polarwinkel θ im Labor-Frame für eine
rechteckige Diode (betrachte nächsten Punkt an Null und Ecken)
"""
# Halbe Pixelbreiten
hx = Dx / 2.0
hy = Dy / 2.0
# Grenzen des Rechtecks relativ zum Ursprung
xdiff_min, xdiff_max = dx - hx, dx + hx
ydiff_min, ydiff_max = dy - hy, dy + hy
def clamp(val, lo, hi):
"""Hilfsfunktion: begrenzt 'val' auf den Bereich [lo, hi]."""
return max(lo, min(val, hi))
# Punkt im Rechteck, der dem Ursprung am nächsten liegt
x_star = clamp(0.0, xdiff_min, xdiff_max)
y_star = clamp(0.0, ydiff_min, ydiff_max)
# Kandidatenpunkte: der nächste Punkt + alle vier Ecken
candidates = [
(x_star, y_star),
(xdiff_min, ydiff_min), (xdiff_min, ydiff_max),
(xdiff_max, ydiff_min), (xdiff_max, ydiff_max)
]
Ry = _rotation_matrix_y(deg)
angles = []
for (x, y) in candidates:
v = np.array([x, y, L], dtype=float)
v /= np.linalg.norm(v)
v_lab = Ry @ v
cos_theta = np.clip(v_lab[2] / np.linalg.norm(v_lab), -1.0, 1.0)
angles.append(np.degrees(np.arccos(cos_theta)))
return float(np.min(angles)), float(np.max(angles))
###############################################################################
# MC Simmulation
###############################################################################
def geom_factor_C2(x_e, y_e, N=200_000, rng=None, deg=45.0):
if rng is None:
rng = np.random.default_rng()
# 1) Startpunkte auf der E-Diode
x0 = (rng.random(N) - 0.5) * Dx + x_e
y0 = (rng.random(N) - 0.5) * Dy + y_e
# 2) isotrope Halbraum-Emission
cos_theta = rng.random(N)
sin_theta = np.sqrt(1.0 - cos_theta**2)
phi = 2.0 * np.pi * rng.random(N)
ux = sin_theta * np.cos(phi)
uy = sin_theta * np.sin(phi)
uz = cos_theta
# 3) Flug zur C2-Ebene
t = L_C / uz
x1 = x0 + t * ux
y1 = y0 + t * uy
# 4) Trefferbedingung
dxh = x1 - C2_x
dyh = y1 - C2_y
hit = (dxh**2 + dyh**2) <= (C2_radius**2)
# 5) Zufallsvariable
X = cos_theta * hit
mean_val = np.mean(X)
var_val = np.var(X, ddof=1)
se_val = np.sqrt(var_val / N)
G = A_pix_cm2 * 2.0 * np.pi * mean_val
G_se = A_pix_cm2 * 2.0 * np.pi * se_val
# 6) Winkelstatistiken (nur Treffer)
if np.any(hit):
Ry = _rotation_matrix_y(deg)
v = np.stack([ux[hit], uy[hit], uz[hit]], axis=1)
v_lab = v @ Ry.T
cos_thetas = np.clip(
v_lab[:, 2] / np.linalg.norm(v_lab, axis=1),
-1.0, 1.0
)
thetas = np.degrees(np.arccos(cos_thetas))
theta_std = float(np.std(thetas))
theta_mean = float(np.mean(thetas))
hist, bins = np.histogram(thetas, bins=100)
theta_mode = float(bins[np.argmax(hist)])
else:
theta_std = theta_mean = theta_mode = np.nan
return {
"G_pair_MC_cm2_sr": float(G),
"G_se_cm2_sr": float(G_se),
"theta_MC_std_deg": theta_std,
"theta_MC_mean_deg": theta_mean,
"theta_MC_mode_deg": theta_mode,
}
def geom_factor_diodes(dx, dy, N=200_000, rng=None, deg=45.0):
"""
Monte-Carlo des Geometriefaktors für ein A/E-Diodenpaar.
(genau wie oben nur diesmal mit Abstand zwischen Dioden dy dx)
"""
if rng is None:
rng = np.random.default_rng()
# 1) Startpunkte gleichmäßig über die E-Diode
x0 = (rng.random(N) - 0.5) * Dx
y0 = (rng.random(N) - 0.5) * Dy
# 2) isotrope Halbraum-Emission
cos_theta = rng.random(N)
sin_theta = np.sqrt(1.0 - cos_theta**2)
phi = 2.0 * np.pi * rng.random(N)
ux = sin_theta * np.cos(phi)
uy = sin_theta * np.sin(phi)
uz = cos_theta
# 3) Schnittpunkte in der Ebene z=L
t = L / uz
x1 = x0 + t * ux
y1 = y0 + t * uy
# 4) Trefferbedingung auf A-Diode mit Zentrum (dx, dy)
hit = (np.abs(x1 - dx) <= Dx / 2.0) & (np.abs(y1 - dy) <= Dy / 2.0)
# 5) Geometriefaktor in cm^2·sr
X = cos_theta * hit # Zufallsvariable
mean_val = np.mean(X)
var_val = np.var(X, ddof=1) # Stichprobenvarianz
se_val = np.sqrt(var_val / N) # Standardfehler des Mittelwerts
G = A_pix_cm2 * 2.0 * np.pi * mean_val
G_se = A_pix_cm2 * 2.0 * np.pi * se_val
# 6) Winkelstatistiken der tatsächlich getroffenen Trajektorien
if np.any(hit):
Ry = _rotation_matrix_y(deg)
v = np.stack([ux[hit], uy[hit], uz[hit]], axis=1)
v_lab = v @ Ry.T
cos_thetas = np.clip(v_lab[:, 2] / np.linalg.norm(v_lab, axis=1), -1.0, 1.0)
thetas = np.degrees(np.arccos(cos_thetas))
theta_std = float(np.std(thetas))
hist, bin_edges = np.histogram(thetas, bins=100)
theta_mode = float(bin_edges[np.argmax(hist)])
theta_mean = float(np.mean(thetas))
return {
"G_pair_MC_cm2_sr": float(G),
"G_se_cm2_sr": float(G_se),
"theta_MC_std_deg": theta_std,
"theta_MC_mean_deg": theta_mean,
"theta_MC_mode_deg": theta_mode,
}
###############################################################################
# Tabellen Erstellung
###############################################################################
def make_AE_table(output_csv="AE_table.csv", N_MC=200_000, deg=45.0, rng=None):
"""
Erstellt eine Tabelle für A/E-Diodenpaare mit Geometriefaktor und Winkeln.
Wenn keine speziellen pairs übergeben, werden alle Koinzidenzen in eine tabelle getan
der GF und die Winkelstatistik werden für jeden theta lab nur einmal gemacht also nicht für alle 400 koinzidenzen
"""
if rng is None:
rng = np.random.default_rng()
# alle Kombinationen A×E
pairs = [(Aname, Ename) for Aname in sorted(mapA.keys())
for Ename in sorted(mapE.keys())]
# Cache, um gleiche theta_geometrien nur einmal zu simulieren:
GF_cache = {} # key: theta_lab value: G_stats-Dictionary
unique_sims = 0 # Zähler für tatsächlich durchgeführte MC-Simulationen
raw_rows = [] # temporäre Liste aller Zeilen
for Aname, Ename in pairs:
dc, dr, dx_mm, dy_mm = compute_dc_dr(Aname, Ename)
phi_lab = phi_angle_lab(dx_mm, dy_mm, L=L, deg=deg)
theta_lab = theta_angle_lab(dx_mm, dy_mm, L=L, deg=deg)
# Der Schlüssel zur Gruppierung der MC-Simulationen ist theta_lab
theta_key = round(theta_lab, 5)
if theta_key in GF_cache:
# Bereits vorhandenes Ergebnis wiederverwenden
G_stats = GF_cache[theta_key]
else:
# Neue Monte-Carlo-Simulation für diese Geometrie
G_stats = geom_factor_diodes(dx_mm, dy_mm, N=N_MC, rng=rng, deg=deg)
GF_cache[theta_key] = G_stats
unique_sims += 1
# Winkelbereich schätzung über min max
theta_min, theta_max = theta_min_max_lab(dx_mm, dy_mm, L=L, Dx=Dx, Dy=Dy, deg=deg)
theta_span = 0.5 * (theta_max - theta_min)
raw_rows.append({
"dc": dc,
"dr": dr,
"dx_mm": dx_mm,
"dy_mm": dy_mm,
"phi_lab_deg": phi_lab,
"theta_lab_deg": theta_lab,
"theta_min_lab_deg": theta_min,
"theta_max_lab_deg": theta_max,
"theta_span_lab_deg": theta_span,
"G_pair_MC_cm2_sr": G_stats["G_pair_MC_cm2_sr"],
"G_se_cm2_sr": G_stats["G_se_cm2_sr"],
"theta_MC_mode_deg": G_stats["theta_MC_mode_deg"],
"theta_MC_mean_deg": G_stats["theta_MC_mean_deg"],
"theta_MC_std_deg": G_stats["theta_MC_std_deg"],
"coincidence": f"{Aname}-{Ename}",
})
# Schritt 2: Gruppierung nach (dc, dr) nur eine Zeile pro Geometrieklasse
df_raw = pd.DataFrame(raw_rows)
group_cols = ["dc", "dr"]
grouped_rows = []
for (dc, dr), group in df_raw.groupby(group_cols):
# Alle Koinzidenzen sammeln (A-E-Paare mit dieser Geometrie)
coincidences = ", ".join(group["coincidence"].tolist())
# Da alle physikalischen Werte für diese Geometrie identisch sind,
# nehme einfach die erste Zeile
row = group.iloc[0].copy()
row["coincidence"] = coincidences
# Rundungen zur Ausgabe (Einheiten mm / Grad / cm^2 sr):
row["dx_mm"] = round(row["dx_mm"], 1)
row["dy_mm"] = round(row["dy_mm"], 1)
for key in [
"phi_lab_deg", "theta_lab_deg", "theta_min_lab_deg", "theta_max_lab_deg",
"theta_span_lab_deg", "G_pair_MC_cm2_sr",
"theta_MC_mode_deg", "theta_MC_mean_deg", "theta_MC_std_deg"
]:
row[key] = round(float(row[key]), 5)
grouped_rows.append(row)
# Finale Tabelle erzeugen
df_out = pd.DataFrame(grouped_rows)
# Sortieren nach Geometriefaktor absteigend
df_out = df_out.sort_values("G_pair_MC_cm2_sr", ascending=False)
# Schreiben in CSV
df_out.to_csv(output_csv, sep=";", index=False)
print(f"Tabelle gespeichert unter: {output_csv}")
print(f"Einzigartige MC-Simulationen (AE): {unique_sims}")
return df_out
def make_CE_table(output_csv="CE_table.csv", E_list=None, N_MC=200_000, deg=45.0, rng=None):
"""
Erstellt eine Tabelle für E-Dioden in Koincidenz mit dem HET_B
"""
if rng is None:
rng = np.random.default_rng()
if E_list is None:
# Standard: alle E-Dioden in mapE
E_list = sorted(mapE.keys())
rows = []
# Cache: gleiche theta_lab-Geometrie wie oben nur einmal Monte-Carlo
GF_cache = {}
unique_sims = 0
for Ename in E_list:
x_e, y_e = mapE[Ename]
# Differenzvektor von E-Diode zum C2-Zentrum (im Dioden-Frame)
dx = C2_x - x_e
dy = C2_y - y_e
# Gerundete Werte für die Ausgabe
dx_r = round(dx, 1)
dy_r = round(dy, 1)
# Polarwinkel theta im Labor-Frame für diese Richtung
theta_lab = theta_angle_lab(dx, dy, L=L_C, deg=deg)
# Cache-Schlüssel wie bei AE (theta_lab gerundet)
theta_key = round(theta_lab, 5)
# Monte-Carlo Simulation
if theta_key in GF_cache:
stats = GF_cache[theta_key]
else:
stats = geom_factor_C2(x_e, y_e, N=N_MC, rng=rng, deg=deg)
GF_cache[theta_key] = stats
unique_sims += 1
# Azimutwinkel phi im Labor-Frame
phi = phi_angle_lab(dx, dy, L=L_C, deg=deg)
rows.append({
"coincidence": f"{Ename}-C2",
"dx_mm": dx_r,
"dy_mm": dy_r,
"phi_lab_deg": round(phi, 5),
"theta_lab_deg": round(theta_lab, 5),
"G_pair_MC_cm2_sr": round(stats["G_pair_MC_cm2_sr"], 5),
"G_pair_MC_err_cm2_sr": round(stats["G_se_cm2_sr"], 5),
"theta_MC_mode_deg": round(stats["theta_MC_mode_deg"], 5),
"theta_MC_mean_deg": round(stats["theta_MC_mean_deg"], 5),
"theta_MC_std_deg": round(stats["theta_MC_std_deg"], 5),
})
df_out = pd.DataFrame(rows)
# Sortierung nach Geometriefaktor absteigend
df_out = df_out.sort_values("G_pair_MC_cm2_sr", ascending=False)
df_out.to_csv(output_csv, sep=";", index=False)
print(f"Tabelle gespeichert unter: {output_csv}")
print(f"CE-MC-Simulationen durchgeführt (mit Theta-Cache): {unique_sims}")
return df_out
###############################################################################
# Plot
###############################################################################
def plot_angle_distribution(dc, dr, L=L, Dx=Dx, Dy=Dy, A_pix_cm2=A_pix_cm2, N=500000, deg=45.0):
"""
Führt eine Monte-Carlo-Simulation für ein bestimmtes (dc, dr)-Paar aus
und zeigt die Winkelverteilung der tatsächlich durchflogenen Teilchen.
"""
# Umrechnung von Raster zu absoluten mm-Offsets
dx = dr * abstand_x
dy = dc * abstand_y
rng = np.random.default_rng()
x0 = (rng.random(N) - 0.5) * Dx
y0 = (rng.random(N) - 0.5) * Dy
cos_theta = rng.random(N)
sin_theta = np.sqrt(1.0 - cos_theta**2)
phi = 2.0 * np.pi * rng.random(N)
ux = sin_theta * np.cos(phi)
uy = sin_theta * np.sin(phi)
uz = cos_theta
t = L / uz
x1 = x0 + t * ux
y1 = y0 + t * uy
hit = (np.abs(x1 - dx) <= Dx / 2.0) & (np.abs(y1 - dy) <= Dy / 2.0)
# Rotation ins Labor-System
th = np.radians(deg)
Ry = np.array([
[ np.cos(th), 0, np.sin(th)],
[ 0, 1, 0 ],
[-np.sin(th), 0, np.cos(th)]
])
v = np.stack([ux[hit], uy[hit], uz[hit]], axis=1)
v_lab = v @ Ry.T
cos_thetas = np.clip(v_lab[:, 2] / np.linalg.norm(v_lab, axis=1), -1.0, 1.0)
thetas = np.degrees(np.arccos(cos_thetas))
# Statistik
theta_mean = np.mean(thetas)
theta_std = np.std(thetas)
# Modus
hist, bin_edges = np.histogram(thetas, bins=100)
mode_idx = np.argmax(hist)
theta_mode = 0.5 * (bin_edges[mode_idx] + bin_edges[mode_idx+1])
# Bin-Breite: 0.5°
theta_min = np.min(thetas)
theta_max = np.max(thetas)
bin_width = 0.2
#bins = int(np.ceil((theta_max - theta_min) / bin_width))
bins = 100
print(f"theta_min = {theta_min:.3f}°")
print(f"theta_max = {theta_max:.3f}°")
print(f"bins = {bins}")
X = cos_theta * hit
mean_val = np.mean(X)
var_val = np.var(X, ddof=1)
se_val = np.sqrt(var_val / N)
G = A_pix_cm2 * 2.0 * np.pi * mean_val
G_se = A_pix_cm2 * 2.0 * np.pi * se_val
# Plot
plt.figure(figsize=(8,5))
plt.hist(thetas, bins=bins, color="lightblue", edgecolor="gray")
# Linien
plt.axvline(theta_mean, color="red", linewidth=2, label=f"Mean = {theta_mean:.2f}°")
plt.axvline(theta_mode, color="purple", linewidth=2, label=f"Mode = {theta_mode:.2f}°")
plt.axvline(theta_mean - theta_std, color="orange", linestyle="--", label=f"Std = {theta_std:.2f}°")
plt.axvline(theta_mean + theta_std, color="orange", linestyle="--")
# Titel vorbereiten
exp_N = int(np.log10(N))
mantissa = N / 10**exp_N
# Falls du nur 10^x darstellen willst ohne Mantisse 1
if mantissa == 1:
N_str = r"$10^{" + f"{exp_N}" + "}$"
else:
N_str = r"$" + f"{mantissa:.1f}" + r"\cdot10^{" + f"{exp_N}" + "}$"
plt.title(
rf"Angle distribution for dc={dc}, dr={dr} | deg={deg}° (N={N_str})" + "\n"
f"G = {G:.4f} cm²·sr",
fontsize=15
)
plt.xlabel("θ in °", fontsize = 15)
plt.ylabel("Counts", fontsize = 15)
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()
def plot_angle_distribution_CE(
Ename,
N=500_000,
deg=45.0,
bins=100
):
"""
Winkelverteilung für EC2-Koinzidenzen (Monte-Carlo)
- Quelle: isotrope Halbraum-Emission von der E-Diode
- Detektor: C2-Kreis in Ebene z = L_C
- Winkel: Polarwinkel theta im Labor-Frame
"""
# -------------------------------------------------
# 1. E-Diodenposition
# -------------------------------------------------
x_e, y_e = mapE[Ename]
rng = np.random.default_rng()
# -------------------------------------------------
# 2. Startpunkte auf E-Diode
# -------------------------------------------------
x0 = (rng.random(N) - 0.5) * Dx + x_e
y0 = (rng.random(N) - 0.5) * Dy + y_e
# -------------------------------------------------
# 3. Isotrope Halbraum-Emission (z > 0)
# -------------------------------------------------
cos_theta = rng.random(N)
sin_theta = np.sqrt(1.0 - cos_theta**2)
phi = 2.0 * np.pi * rng.random(N)
ux = sin_theta * np.cos(phi)
uy = sin_theta * np.sin(phi)
uz = cos_theta
# -------------------------------------------------
# 4. Flug zur C2-Ebene (z = L_C)
# -------------------------------------------------
t = L_C / uz
x1 = x0 + t * ux
y1 = y0 + t * uy
# -------------------------------------------------
# 5. Trefferbedingung C2
# -------------------------------------------------
dxh = x1 - C2_x
dyh = y1 - C2_y
hit = (dxh**2 + dyh**2) <= C2_radius**2
if not np.any(hit):
print("WARNUNG: Keine Treffer auf C2")
return None
# -------------------------------------------------
# 6. Rotation ins Labor-System
# -------------------------------------------------
Ry = _rotation_matrix_y(deg)
v = np.stack([ux[hit], uy[hit], uz[hit]], axis=1)
v_lab = v @ Ry.T
cos_thetas = np.clip(
v_lab[:, 2] / np.linalg.norm(v_lab, axis=1),
-1.0, 1.0
)
thetas = np.degrees(np.arccos(cos_thetas))
# -------------------------------------------------
# 7. Statistik
# -------------------------------------------------
theta_mean = np.mean(thetas)
theta_std = np.std(thetas)
hist, bin_edges = np.histogram(thetas, bins=bins)
mode_idx = np.argmax(hist)
theta_mode = 0.5 * (bin_edges[mode_idx] + bin_edges[mode_idx + 1])
# -------------------------------------------------
# 8. Geometriefaktor
# -------------------------------------------------
X = cos_theta[hit]
mean_val = np.mean(X)
var_val = np.var(X, ddof=1)
se_val = np.sqrt(var_val / N)
G = A_pix_cm2 * 2.0 * np.pi * mean_val
G_se = A_pix_cm2 * 2.0 * np.pi * se_val
# -------------------------------------------------
# 9. Plot
# -------------------------------------------------
plt.figure(figsize=(8, 5))
plt.hist(thetas, bins=bins, edgecolor="gray")
plt.axvline(theta_mean, color="red", linewidth=2,
label=f"Mean = {theta_mean:.2f}°")
plt.axvline(theta_mode, color="purple", linewidth=2,
label=f"Mode = {theta_mode:.2f}°")
plt.axvline(theta_mean - theta_std, color="orange",
linestyle="--", label=f"Std = {theta_std:.2f}°")
plt.axvline(theta_mean + theta_std, color="orange",
linestyle="--")
plt.title(
f"Angle distribution {Ename}C2 | deg={deg}°\n"
f"G = 0.17755 cm²·sr", fontsize = 15 # hier hab ich den GF aus der Tabelle eingetragen weil die simmulation irgendiwe bei den C2-E koinzidenzen nicht funktioniert
)
#plt.title(
# f"Angle distribution {Ename}C2 | deg={deg}°\n"
# f"G = {G:.4f} cm²·sr", fontsize = 15
#)
plt.xlabel("θ in °", fontsize = 15)
plt.ylabel("Counts", fontsize = 15)
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print(f"EC2 Treffer: {hit.sum()} / {N}")
print(f"Mean θ = {theta_mean:.3f}°")
print(f"Mode θ = {theta_mode:.3f}°")
print(f"Std θ = {theta_std:.3f}°")
return thetas
###############################################################################
# Main
###############################################################################
def main():
#plot_angle_distribution_CE("E20", deg=45, N=100_000_000)
plot_angle_distribution(0, 0, N = 100_000_000, deg = 90)
rng = np.random.default_rng(123) #dann ist es vergleichbar
df_AE = make_AE_table(
output_csv="AE_test=.csv",
N_MC=50_000_000, # Anzahl Monte-Carlo-Samples pro theta-Geometrie
deg=45.0,
rng=rng
)
df_CE = make_CE_table(
output_csv="CE_test.csv",
N_MC=50_000_000, # Anzahl Monte-Carlo-Samples pro theta-Geometrie
deg=45.0,
rng=rng
)
print(df_AE.head())
print(df_CE.head())
if __name__ == "__main__":
main()

View file

@ -142,7 +142,7 @@ def calibrate_vectors(acc_raw, mag_raw):
"""
mag_sens = 6842 # LSB/gauss
acc_sens = 16000 # 1000 LSB/g and it is only a 12 bit number so divide by 16 (1hex bit) equivalent to shifting 4 bits
mag_calib = [37.23933025452744, 41.29799068099641, 1.0, 0.07392279199604498, 0.4336054231239714, 0.0] # fit coefficients from BA thesis
mag_calib = [1.02305, 1.13811, 1.16184, 0.0265714, -0.188856, -0.177294] # fit coefficients from BA thesis
acc_calib = [0.972148, 0.968608, 0.974061, -0.00109779, -0.0154984, 0.0135722]
acc_raw = [struct.unpack('<h', struct.pack('<H', int(num)))[0] for num in acc_raw] # turns int into hex, turns hex into signed int
mag_raw = [struct.unpack('<h', struct.pack('<H', int(num)))[0] for num in mag_raw]
@ -260,13 +260,11 @@ def main(input):
ax32.set_ylabel('Roll and Pitch in °', fontsize=12)
ax4.set_ylabel('Temperatures in °C', fontsize=12)
ax31.set_xlabel('Time in dd-hh:mm:ss', fontsize=12); ax4.set_xlabel('Time in dd-hh:mm:ss', fontsize=12)
ax1.tick_params(axis="x", labelrotation=25); ax21.tick_params(axis="x",labelrotation=25)
ax31.tick_params(axis="x", labelrotation=25); ax4.tick_params(axis="x", labelrotation=25)
ax1.grid(); ax21.grid(); ax31.grid(); ax4.grid()
ax1.legend(bbox_to_anchor=(1,1), loc="upper left");ax32.legend(bbox_to_anchor=(1,1), loc="upper left");ax4.legend(bbox_to_anchor=(1,1), loc="upper left")
if fp:
plt.suptitle(fp.split('/')[-1], fontsize=12)
plt.subplots_adjust(wspace=0.5,hspace=0.33, top=0.95, bottom=0.13)
plt.subplots_adjust(wspace=0.5)
plt.show()
for l in input:
if i == -1: # skip the first interval because it may be incomplete

View file

@ -1,352 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Oct 22 17:07:38 2025
@author: milanmarx
"""
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import numpy as np
import math
from scipy.optimize import curve_fit
"""
Dieses Skript liest eine MHM datei ein und berechnet die counts pro sekunde für verschiedene kanäle
und kanalgruppen wie BGO. Fittet ein Gaisser Hillas Modell an die daten
"""
thr_map = {
"A1": 15, "A2": 15, "A3": 15, "A4": 15, "A5": 16, "A6": 15, "A7": 15, "A8": 15, "A9": 15, "A10": 15,
"A11": 15, "A12": 15, "A13": 15, "A14": 15, "A15": 15, "A16": 15, "A17": 15, "A18": 15, "A19": 15, "A20": 15,
"E1": 15, "E2": 15, "E3": 15, "E4": 15, "E5": 15, "E6": 15, "E7": 15, "E8": 15, "E9": 15, "E10": 15,
"E11": 15, "E12": 15, "E13": 15, "E14": 16, "E15": 15, "E16": 15, "E17": 15, "E18": 15, "E19": 15, "E20": 15,
"B1": 5.82, "B2": 5.82, "B3": 5.82,
"D1": 5.82, "D2": 5.82, "D3": 5.82,
"C1": 13, "C2": 12
}
###############################################################################
# MHM einlesen
###############################################################################
# Datei einlesen
def read_mhm(filepath: str, cutoff_time=None) -> pd.DataFrame:
# Liest eine MHM (space-separiert) in einen DataFrame.
df = pd.read_csv(filepath, sep=r"\s+")
# Parse der UTC-Spalte zu echten Timestamps ungültige werden NaT.
df["UTC"] = pd.to_datetime(df["UTC"], utc=True, errors="coerce")
# Optional: zeitliche Begrenzung (es bleiben nur Zeilen mit UTC <= cutoff)
if cutoff_time:
cutoff = pd.to_datetime(cutoff_time, utc=True)
df = df[df["UTC"] <= cutoff]
return df
#gruppen die geplottet werden können
groups = {
"BGO_1": ["B1", "B2", "B3"],
"BGO_2": ["D1", "D2", "D3"],
"All": [f"A{i}" for i in range(1, 21)] + [f"E{i}" for i in range(1, 21)] +
["B1", "B2", "B3", "D1", "D2", "D3", "C1", "C2"]
}
###############################################################################
# counts pro sekunde bestimmen
###############################################################################
#Counts pro Sekunde berechnen
def compute_counts_per_second(df, use_threshold=True, average_blocks=1):
#kopie um Original-DF nicht zu verändern
df = df.copy()
# Nur die Kanäle betrachten, die in thr_map definiert sind
channels = [c for c in df.columns if c in thr_map.keys()]
results = []
# Nach UTC-Zeitstempel gruppieren (jede UTC entspricht einem 12-Sekunden-Block)
grouped = list(df.groupby("UTC"))
# Blöcke bündeln, um über größere Intervalle zu mitteln (average_blocks * 12 s)
for i in range(0, len(grouped), average_blocks):
# UTC-Liste der zusammengefassten Blöcke
utc_block = [g[0] for g in grouped[i:i + average_blocks]]
# Daten der Blöcke zusammenführen
combined = pd.concat([g[1] for g in grouped[i:i + average_blocks]])
utc_avg = utc_block[0]
entry = {"UTC": utc_avg}
for ch in channels:
vals = combined[ch].astype(float)
if use_threshold:
# Schwellenwert pro Kanal
thr = thr_map[ch]
# Für BGO-Kanäle (B1-3, D1-3): Temperatur-Korrektur anwenden
if ch in ["B1", "B2", "B3", "D1", "D2", "D3"]:
vals = vals * combined["KorrFaktor"].astype(float)
# Zähle „aktive“ Messungen (Wert > Threshold) und normiere auf Counts/s:
# Jede Zeile repräsentiert 12 s also durch (12 * average_blocks) teilen.
count = (vals > thr).sum() / (12 * average_blocks)
else:
# Alternative: alles ungleich 0 zählt als Treffer (ohne Threshold)
count = (vals != 0).sum() / (12 * average_blocks)
entry[ch] = count
# Housekeeping-Mittelwerte fürs fenster
entry["Druck"] = combined["Druck"].mean()
entry["Temperatur"] = combined["Temperatur"].mean()
results.append(entry)
# Rückgabe: Zeitreihe der Counts/s je Kanal (+ Druck/Temperatur)
return pd.DataFrame(results)
# Kanal oder Gruppe auswählen
def get_group_counts(df_counts, selection):
# Falls eine vordefinierte Gruppe gewählt ist:
if selection in groups:
chs = groups[selection]
# Sub-DataFrame nur mit den Kanälen der Gruppe
subset = df_counts[chs]
# Gruppenmittelwert (hier: Summe/Anzahl Kanäle → mittlere Counts/s über die Gruppe)
mean_counts = subset.sum(axis=1) / len(chs)
# Rückgabe: UTC + HK + aggregierte Counts
return df_counts[["UTC", "Druck", "Temperatur"]].assign(Counts=mean_counts)
# Falls ein einzelner Kanalname gewählt ist:
elif selection in df_counts.columns:
# Rückgabe: UTC + HK + die Counts-Spalte unter generischem Namen „Counts“
return df_counts[["UTC", "Druck", "Temperatur", selection]].rename(columns={selection: "Counts"})
else:
# Weder Gruppe noch Kanal bekannt dann Fehler
raise ValueError(f"Unbekannte Auswahl: {selection}")
###############################################################################
# Plots erstellen
###############################################################################
# Plotfunktionen
def plot_multiple_counts_vs_pressure(df_counts, selections, title="Counts/s vs Pressure"):
plt.figure(figsize=(10, 6))
for sel in selections:
df_sel = get_group_counts(df_counts, sel)
plt.scatter(df_sel["Druck"], df_sel["Counts"], label=sel, marker='x')
plt.xlabel("Pressure", fontsize = 15)
plt.ylabel("Counts in 1/s", fontsize = 15)
plt.title(title, fontsize = 15)
plt.xticks(fontsize=15)
plt.yticks(fontsize=15)
plt.legend()
plt.grid(True)
plt.gca().invert_xaxis()
plt.tight_layout()
plt.show()
def plot_multiple_counts_vs_time(df_counts, selections, title="Counts/s vs UTC-Zeit"):
plt.figure(figsize=(12, 7))
for sel in selections:
df_sel = get_group_counts(df_counts, sel)
plt.scatter(df_sel["UTC"], df_sel["Counts"], label=sel, marker='x')
plt.xlabel("UTC-time", fontsize = 20)
plt.ylabel("Counts in 1/s", fontsize = 20)
plt.title(title, fontsize = 25)
plt.xticks(fontsize=20,rotation = 45, ha = "right")
plt.yticks(fontsize=20)
plt.legend(fontsize = 17)
plt.grid(True)
plt.tight_layout()
plt.show()
def plot_housekeeping(df_counts, title="Housekeeping: pressure & temperature vs UTC"):
fig, ax1 = plt.subplots(figsize=(12, 7))
ax1.set_xlabel("UTC-time", fontsize=20)
ax1.set_ylabel("Pressure in hPa", color="tab:blue", fontsize=20)
ax1.plot(df_counts["UTC"], df_counts["Druck"],
color="tab:blue", label="pressure in hPa")
ax1.tick_params(axis="y", labelcolor="tab:blue", labelsize=20)
ax1.tick_params(axis="x", labelsize=18)
# Rotation der UTC-Ticks
plt.setp(ax1.get_xticklabels(), rotation=45, ha="right")
ax2 = ax1.twinx()
ax2.set_ylabel(
"BGO-B temperature in °C",
color="tab:red",
fontsize=20,
labelpad=12 # minimal größerer Abstand zur Achse
)
ax2.plot(df_counts["UTC"], df_counts["Temperatur"],
color="tab:red", label="Temperature in °C")
ax2.tick_params(axis="y", labelcolor="tab:red", labelsize=20)
plt.title(title, fontsize=25)
plt.tight_layout()
plt.show()
###############################################################################
# Gaisser Hillas Fit
###############################################################################
def gaisser_hillas(X, Nmax, Xmax, X0, Lambda):
# Verhindere numerische Probleme, falls X <= X0 ist (dort ist die Funktion formal nicht definiert)
X = np.array(X)
# Um Probleme im Bereich X <= X0 zu vermeiden, setze dort die Rate extrem klein
safe = X > X0
N = np.zeros_like(X, dtype=float)
# Nur für X > X0 berechnen wir die GaisserHillas-Formel
term1 = ((X[safe] - X0) / (Xmax - X0)) ** ((Xmax - X0) / Lambda)
term2 = np.exp((Xmax - X[safe]) / Lambda)
N[safe] = Nmax * term1 * term2
return N
def fit_gaisser_hillas(df_sel):
# DataFrame kopieren
df = df_sel.copy()
# 2) Atmosphärische Tiefe X [g/cm^2]
g = 9.80665 # m/s^2
df["X"] = (df["Druck"] * 100.0) / g / 10.0
# 3) Fitdaten
X_data = df["X"].values
I_data = df["Counts"].values
mask = I_data > 0
X_fit = X_data[mask]
I_fit = I_data[mask]
if len(X_fit) < 5:
print("Zu wenige Punkte mit Counts > 0 für einen sinnvollen GaisserHillas-Fit.")
return None, None
# 4) Startwerte
Nmax_guess = np.max(I_fit)
Xmax_guess = X_fit[np.argmax(I_fit)]
X0_guess = Xmax_guess - 120.0
Lambda_guess = 70.0
p0 = [Nmax_guess, Xmax_guess, X0_guess, Lambda_guess]
# 5) Fit
popt, pcov = curve_fit(
gaisser_hillas,
X_fit,
I_fit,
p0=p0,
maxfev=10000
)
Nmax, Xmax, X0, Lambda = popt
# Fehler aus Kovarianzmatrix
perr = np.sqrt(np.diag(pcov))
Xmax_err = perr[1]
# 7) Fitkurve
X_plot = np.linspace(np.min(X_fit), np.max(X_fit), 500)
I_plot = gaisser_hillas(X_plot, *popt)
# 8) Konsolenausgabe (jetzt mit Höhenfehler)
print("\n---------------------------")
print(" GaisserHillas Fit")
print("---------------------------")
print(f"Nmax = {Nmax:.3f} (Counts/s)")
print(f"Xmax = {Xmax:.2f} ± {Xmax_err:.2f} g/cm²")
print(f"X0 = {X0:.2f} g/cm²")
print(f"Lambda = {Lambda:.2f} g/cm²")
print("---------------------------\n")
# 9) Plot (unverändert)
plt.figure(figsize=(10, 6))
plt.scatter(X_data, I_data, s=10, alpha=0.5, label="Data points")
plt.plot(X_plot, I_plot, color="red", linewidth=2, label="Fit-function")
plt.axvline(
Xmax,
color="green",
linestyle="--",
label=rf"$\chi$ max = {Xmax:.1f} $\pm$ {Xmax_err:.1f} g/cm²"
)
plt.xlabel(r"atmospheric depth $\chi$ in g/cm²", fontsize=20)
plt.ylabel("Counts in 1/s", fontsize=20)
plt.title(" Fit on counts vs. atmospheric depth", fontsize=25)
plt.grid(True)
plt.legend(fontsize=17)
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
plt.gca().invert_xaxis()
plt.tight_layout()
plt.show()
return popt
###############################################################################
# Main
###############################################################################
def main():
datei = "sdcard_filter_2025-10-08_CUT_Regener_Pfotzer.MHM" # Eingabedatei
use_threshold = True # True: zähle nur Werte > Kanal-Threshold; False: zähle alle ungleich 0
cutoff_time = None#"2025-10-08T19:00:00Z" # None = kein Cut
average_blocks = 1 # Aggregationsfenster: 1 Block = 12s; 2 = 24s; 5 = 60s; ...
selections = [ "BGO_2"] # Welche Kanäle/Gruppen geplottet werden (Liste möglich)
df = read_mhm(datei, cutoff_time=cutoff_time) # Datei einlesen (+ optionaler Zeitfilter)
df_counts = compute_counts_per_second(df, use_threshold=use_threshold, average_blocks=average_blocks)
# df_counts enthält: UTC, Druck, Temperatur, sowie pro Kanal die Counts/s im Aggregationsraster
plot_multiple_counts_vs_pressure(df_counts, selections, title="Counts/s vs Pressure")
plot_multiple_counts_vs_time(df_counts, selections, title="Counts per second of BGO1 and BGO2 vs. UTC-time")
plot_housekeeping(df_counts) # Druck/Temperatur zeitlich
df_sel = get_group_counts(df_counts, selections[0])
fit_gaisser_hillas(df_sel)
if __name__ == "__main__":
main()