#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Tue Aug 26 13:47:24 2025 @author: ava Parse ED-data for DORN files, create histograms and optional calculations Trigger-enabled event histograms added Filename suffix via -nameadd """ import numpy as np import argparse import sys import pandas as pd from pathlib import Path from DORNconfiguration import NMAHEPAM_channels, SETH_channels, AHEPAM_channels from event_filters import prepare_trigger_indices, check_trigger # --------------------------- # Argumente # --------------------------- parser = argparse.ArgumentParser(description="Parse ED-data for DORN files with triggers") parser.add_argument("file", type=str) parser.add_argument("-map", type=str, default="ALL", choices=["ALL", "NMAHEPAM", "SETH", "AHEPAM"]) parser.add_argument("-eventhist", action="store_true") parser.add_argument("-Bhist", action="store_true", help="Histogram for B-channels + sum") parser.add_argument("-trigger", nargs="+", default=None, help="Trigger selection: e.g. B1, allB, allV, V1") parser.add_argument("-time", type=int, default=200, help="Event time window threshold") parser.add_argument("-nameadd", type=str, default="", help="Optional suffix for output filename") args = parser.parse_args() file = args.file if not Path(file).is_file(): print("No valid file:", file) sys.exit() filename = Path(file).stem Path("hists").mkdir(parents=True, exist_ok=True) # --------------------------- # Eventhist default # --------------------------- minX_event = -100 maxX_event = 5000 resX_event = 0.838214 * 3 # --------------------------- # B-Histogramm Parameter # --------------------------- minX_B = -5 maxX_B = 300 resX_B = 0.838214 / 2 # --------------------------- # Helper # --------------------------- def apply_calibration(raw_value, name, u_dict): return raw_value * u_dict.get(name, 1.0) # --------------------------- # Mapping # --------------------------- def Evaluate_mapping(mapping_name): if mapping_name == "ALL": columns = [f"0-{i}" for i in range(24)] + [f"1-{i}" for i in range(24)] return columns, lambda ch, plane: ch + plane * 24, None, None elif mapping_name == "NMAHEPAM": channels = NMAHEPAM_channels() elif mapping_name == "SETH": channels = SETH_channels() elif mapping_name == "AHEPAM": channels = AHEPAM_channels() else: print("Unknown mapping") sys.exit() valid_channels = [ch for ch in channels if ch["name"]] columns = [ch["name"] for ch in valid_channels] def resolver(ch, sl): hwc_index = ch + sl * 24 for idx, ch_dict in enumerate(valid_channels): if ch_dict["hwc"] == hwc_index: return idx return None return columns, resolver, valid_channels, None # --------------------------- # Generic Event Histogram # --------------------------- def create_event_histogram(columns, resolver, mapping_channels, time_threshold=200, triggers=None, minX=minX_event, maxX=maxX_event, resX=resX_event, B_only=False): if B_only: channels_to_use = ["B1","B2","B3","B4","B5","B6"] n_cols = len(channels_to_use) + 1 bins = int((maxX_B - minX_B) / resX_B) hist = np.zeros((bins+1, n_cols + 1)) hist[:,0] = np.linspace(minX_B, maxX_B, bins+1) else: bins = int((maxX - minX) / resX) hist = np.zeros((bins+1, len(columns)+1)) hist[:,0] = np.linspace(minX, maxX, bins+1) channels_to_use = columns u_dict = {ch["name"]: ch["u"] for ch in (mapping_channels or [])} thr_dict = {ch["name"]: ch["thr"] for ch in (mapping_channels or [])} # Trigger vorbereiten (IMMER mit allen Kanälen!) trigchans = None if triggers: from event_filters import NMAHEPAM_triggers trigchans = prepare_trigger_indices(triggers, columns, NMAHEPAM_triggers) current_event = [] event_start = None with open(file, "r", encoding="utf-8", errors="ignore") as f: for line in f: if not line.startswith("ED"): continue parts = line.split() try: time = int(float(parts[1])) sli = int(parts[2]) cha = int(parts[3]) raw = float(parts[-1]) / 0x20000 except: continue # 🔧 FIX: IMMER alle Kanäle auflösen idx = resolver(cha, sli) if idx is None: continue name = columns[idx] val = apply_calibration(raw, name, u_dict) if event_start is None: event_start = time if abs(time - event_start) > time_threshold: event_dict = {n:v for n,v in current_event} if (trigchans is None) or check_trigger(event_dict, columns, trigchans, thr_dict): if B_only: fill_B_event(hist, event_dict, channels_to_use, minX_B, maxX_B, resX_B) else: fill_event(hist, current_event, columns, minX, maxX, resX) current_event = [] event_start = time current_event.append((name, val)) # letztes Event if current_event: event_dict = {n:v for n,v in current_event} if (trigchans is None) or check_trigger(event_dict, columns, trigchans, thr_dict): if B_only: fill_B_event(hist, event_dict, channels_to_use, minX_B, maxX_B, resX_B) else: fill_event(hist, current_event, columns, minX, maxX, resX) return hist # --------------------------- # Fill functions # --------------------------- def fill_event(hist, event, columns, minX, maxX, resX): for name, val in event: if val is None: continue if minX <= val <= maxX: x = int((val - minX) / resX) hist[x, columns.index(name)+1] += 1 def fill_B_event(hist, event_dict, B_channels, minX, maxX, resX): for i, b in enumerate(B_channels): val = event_dict.get(b, 0) if minX <= val <= maxX: x = int((val - minX) / resX) hist[x, i+1] += 1 sum_val = sum(event_dict.get(b, 0) for b in B_channels) if minX <= sum_val <= maxX: x = int((sum_val - minX) / resX) hist[x, -1] += 1 # --------------------------- # Save # --------------------------- def save_hist(hist, columns, suffix): add = f"_{args.nameadd}" if args.nameadd else "" df = pd.DataFrame(hist, columns=["value"] + columns) fname = f"hists/{filename}{add}.{suffix}" df.to_csv(fname, sep=" ", index=False) print(fname, "created") # --------------------------- # MAIN # --------------------------- def main(): columns, resolver, channels, _ = Evaluate_mapping(args.map) if args.eventhist: hist = create_event_histogram(columns, resolver, channels, time_threshold=args.time, triggers=args.trigger) save_hist(hist, columns, "eventhist") if args.Bhist: hist = create_event_histogram(columns, resolver, channels, time_threshold=args.time, triggers=args.trigger, B_only=True) save_hist(hist, ["B1","B2","B3","B4","B5","B6","SUM"], "Bhist") if __name__ == "__main__": main()