rpirena/phasefit.py

204 lines
5.1 KiB
Python
Raw Permalink Normal View History

#! /usr/bin/python
# encoding: UTF-8
""" phasefit
Read .EI file(s) (irena with SolO Trigger).
Collect all hits above thr*mV and make a histogram of the phase parameter.
Integrate and normalize the histogram.
Fit a polynom to the phase funtion.
Print the data and fit
"""
import sys, getopt, fileinput, math, numpy, numpy.linalg
from numpy import array, zeros, arange, newaxis
options,files = getopt.getopt(sys.argv[1:], "V:t:N:r:L:")
mV = 14000
thr = 50
nCh = 0
binW = 0.0005
rank = 3
ph_min = 0.01
ph_max = 1 - ph_min
for o,v in options:
if o=='-V':
mV = float(v)
if o=='-t':
thr = float(v)
if o=='-N':
nCh = int(v)
if o=='-r':
rank = int(v)
if o=='-L':
vv = map(float, v.split())
if len(vv)==1:
ph_min,ph_max = vv[0],1-vv[0]
else:
ph_min,ph_max = vv
class histogram(object):
def __init__(self, w):
self.h = {}
self.w = w
self.offs = 0.5
def add(self, b, n=1):
b = numpy.floor(b/self.w + self.offs)
try:
self.addb(int(b), n)
except TypeError:
self.addb(tuple(map(int, b)), n)
def addb(self, b, n=1):
if b in self.h:
self.h[b] += n
else:
self.h[b] = n
def bins(self):
bb = self.h.keys()
bb.sort()
return bb
def copy(self):
hh = histogram(self.w)
hh.offs = self.offs
return hh
def integrate(self):
s = 0
hh = self.copy()
for b in self.bins():
s = s + self.h[b]
hh.h[b] = s
return hh
def __iadd__(self, b):
self.add(b)
return self
def hist(self):
return [(b*self.w, self.h[b]) for b in self.bins()]
def normalize(self, n):
hh = self.copy()
for b in self.h:
hh.h[b] = self.h[b] / n
return hh
class regression(object):
def __init__(self, r=1):
self.n = 0
self.tb = arange(r+1)
self.ta = self.tb.reshape((-1,1)) + self.tb.reshape((1,-1))
self.tb = self.tb
self.ta = self.ta
self.B = zeros(self.tb.shape)
self.A = zeros(self.ta.shape)
self.yy = 0
def add(self, x, y, n=1):
y = array(y)
x = array(x)
n = array(n)
self.n += 1
self.yy += n*y*y
self.B = self.B + (x[...,newaxis]**self.tb) * (n * y)[...,newaxis]
self.A = self.A + (x[...,newaxis,newaxis]**self.ta) * n[...,newaxis,newaxis]
def solve(self):
self.solution = numpy.linalg.solve(self.A,self.B)
return self.solution
def evaluate(self, x):
try:
s = self.solution
except AttributeError:
s = self.solve()
return numpy.add.reduce(x**self.tb * s, axis=-1)
def rank(self):
return self.tb.shape[0]-1
def correlation(self):
if not self.rank():
return ()
dx = self.A[...,1,1]*self.A[...,0,0] - self.A[...,1,0]**2
dy = self.yy*self.A[...,0,0] - self.B[...,0]**2
r = (self.B[...,1]*self.A[...,0,0] - self.A[...,1,0]*self.B[...,0])/numpy.sqrt(dx*dy)
return self.n, self.A[...,0,0], r, dx/self.A[...,0,0], dy/self.A[...,0,0]
def reduce(self, axis=0, x=True, y=True, n=True):
rr = regression(self.rank())
rr.n = self.n
rr.tb = self.tb
rr.ta = self.ta
if x or y or n:
rr.B = numpy.add.reduce(self.B, axis=axis)
else:
rr.B = self.B
if x or n:
rr.A = numpy.add.reduce(self.A, axis=axis)
else:
rr.A = self.A
if y or n:
rr.yy = numpy.add.reduce(self.yy, axis=axis)
else:
rr.yy = self.yy
return rr
h = histogram(binW)
for l in fileinput.input(files):
if l[0:2] != 'EI':
continue
ll = [int(ll,0) for ll in l.split()[1:]]
if not nCh:
nCh = (len(ll)-4)//3
sys.stdout.write("# %d channels\n" % nCh)
if len(ll) != 4+3*nCh:
sys.stderr.write("nCh mismatch: "+l)
continue
for c in range(nCh):
if ll[4+3*c] < thr*mV:
continue
if ll[5+3*c] < 2:
continue
n = zeros((nCh,))
n[c] = 1
h.add(ll[6+3*c] / float(ll[4+3*c]), n)
hi = h.integrate()
I = hi.hist()[-1][1]
hi = hi.normalize(I)
sys.stdout.write("# Integral: %s\n" % repr(I))
r = regression(rank)
hh = hi.hist()
x,y = map(array, zip(*hh))
r.add(x[...,newaxis], y, (y>=ph_min)*(y<=ph_max))
r = r.reduce()
p = r.solve()
rr = r.reduce(x=False)
pp = rr.solve()
for n,c in enumerate(p.transpose()):
sys.stdout.write("# c%d: %s\n" % (n,repr(c)))
for n,c in enumerate(pp.transpose()):
sys.stdout.write("# cc%d: %s\n" % (n,repr(c)))
for n,c in zip(("n", "w", "r","σ²x", "σ²y"), r.correlation()):
sys.stdout.write("# %s: %s\n" % (n,repr(c)))
for x,y in hh:
n = y.shape[0]
sys.stdout.write("%g" % x)
sys.stdout.write((" %g"*n) % tuple(y))
sys.stdout.write((" %g"*n) % tuple(r.evaluate(x)))
sys.stdout.write((" %g") % rr.evaluate(x))
sys.stdout.write("\n")