rpirena/rpirena.py
stephan d23a34b653 rpirena: tanos_jr config w/ 8 L1
git-svn-id: svn+ssh://asterix.ieap.uni-kiel.de/home/subversion/stephan/solo/eda/cospi/host@9404 bc5caf13-1734-44f8-af43-603852e9ee25
2026-06-03 12:56:04 +00:00

892 lines
26 KiB
Python
Executable file

#! /usr/bin/python3 -i
# encoding: UTF-8
from altera_ctrl import *
import math
def set_prompt(prompt="RPIRENA"):
sys.ps1 = prompt + "> "
sys.ps2 = prompt + ". "
try:
ip=sys.modules["__main__"].__dict__["get_ipython"]()
from IPython.terminal.prompts import Prompts, Token
class soloprompts(Prompts):
def in_prompt_tokens(self, *wtf):
return [(Token, prompt+'> ')]
def continuation_prompt_tokens(self, *wtf):
return [(Token, prompt+'. ')]
def out_prompt_tokens(self, *wtf):
return [(Token, prompt+'= ')]
def rewrite_prompt_tokens(self, *wtf):
return [(Token, prompt+'- ')]
ip.prompts=soloprompts(ip)
except ImportError:
ip.prompt_manager.templates.update({
'in': 'In [{color.number}{count}{color.prompt}] '+prompt+'> ',
'in2': ' .{dots}. '+prompt+'. ',
'out': 'Out[{color.number}{count}{color.prompt}] '+prompt+'> ',
})
except NameError:
pass
except KeyError:
pass
set_prompt()
DREG_ADDR = 0xc020
CORE_ADDR = 0x8040
TRIG_ADDR = 0x8100
def filter(ch, p):
c = b""
for i in range(16):
n = p[(i+4)&15][0] & 0x3f
a = p[(i+2)&15][1] & 0x1fff
b = p[(i+2)&15][2] & 0x1fff
d = (n<<26)|(b<<13)|a;
c += struct.pack(">6H", DREG_ADDR+1, d&0xffff, DREG_ADDR+2, d>>16, DREG_ADDR+3, 0x800|(ch*16+i))
spidev.sync_transfer(cmd=c, rsize=0)
def trigconf(i, v):
i *= 2
c = struct.pack(">6H", TRIG_ADDR|0xc000|i, v&0xffff, TRIG_ADDR|0xc001|i, v>>16, 0x8001, 0)
spidev.sync_transfer(cmd=c, rsize=0)
def i2f(i):
if i<0:
s = 0x20000
i = -i
else:
s = 0
if i<0x4000:
return s|i
e=1
while i>=0x4000:
i>>=1
e+=1
if e>15:
return s|0x1ffff
return s|(e<<13)|(i&0x1fff)
def f2i(f):
e = (f>>13)&15
if e<2:
m = f & 0x3fff
else:
m = (0x2000|f&0x1fff)<<(e-1)
return -m if f&0x20000 else m
def ftou16(f):
e = f>>12
if e<2:
return f
return (0x1000|f&0xfff)<<(e-1)
l1_trig = [[0,222222222],[0,222222222],[0,222222222],[0,222222222],
[0,222222222],[0,222222222],[0,222222222],[0,222222222]]
def thres(ch, thr=None, flags=None):
if flags is None:
flags = l1_trig[ch][0]
else:
l1_trig[ch][0] = flags
if thr is None:
thr = l1_trig[ch][1]
else:
l1_trig[ch][1] = thr
trigconf(ch, (flags<<18)|i2f(int(thr)))
NL1 = 4
def test_NL1():
global NL1
for i in range(0x100,0x180):
reg(i,0x1111)
sync(0x8044)
r = [sync(0x8001) for i in range(50)]
if r[48]==0x1111:
NL1=8
elif r[40]==0x1111:
NL1=4
else:
raise ValueError(f"cannot find NL1 from {repr(r)}")
return NL1
def l2trig(i, all=0, none=0, any=0, mask=0xf):
trigconf(2*i+NL1, (all<<20)|(none<<10)|any)
trigconf(2*i+NL1+1, mask)
def adcmask(m=0xf):
return reg(CORE_ADDR, m)
def age(agemin=2, agetmin=3, agetmax=4, agemax=5):
return reg(CORE_ADDR+2, (agemin<<12)|(agetmin<<8)|(agetmax<<4)|agemax)
def nsamples(n=0, t=0xff):
return reg(CORE_ADDR+3, (t<<8)|n)
p22=[
(36, 1264, -3708),
( 1, 1713, -3708),
( 5, 3369, 0),
( 6, 3642, 0),
( 7, 3798, 0),
( 8, 3798, 0),
( 9, 3583, 0),
(10, 3096, 0),
(12, 992, 3321),
(13, -685, 4095),
(17, -4095, 0),
(18, -4095, 0),
(19, -4095, 0),
(20, -4095, 0),
(21, -4095, 0),
(22, -4095, 0),
]
p1=[
(20, -670, -851),
( 1, -303, -1070),
( 2, 158, -1153),
( 3, 656, -1263),
( 4, 1200, -1208),
( 5, 1721, -987),
( 6, 2147, -411),
( 7, 2325, 339),
( 8, 2123, 1337),
( 9, 1330, 2454),
(10, -128, 2813),
(12, -2114, 0),
(13, -2114, 0),
(14, -2114, 0),
(15, -2114, 0),
(16, -2103, 0),
]
mV=14000.0
class configuration(object):
rbf = "rpirena.rbf"
coretemp = "/sys/class/hwmon/hwmon0/temp1_input"
Vref = 3.300
monitor = ("decent", "Vprim", "Tfpga", "Tsleep", "altera", "halt", "reboot")
def __init__(self,
name,
channels=0xf,
l1=[dict(thr=100*mV, flags=0x001)]*4,
l2=[dict(any=0x001, mask=0xf)]+[{}]*7,
filter=p22,
age=(),
nsamples=(0,),
**stream ):
self.name = name
self.channels = channels
self.filter = filter
self.thres = thres
self.l1 = l1
self.l2 = l2
self.age = age
self.nsamples = nsamples
self.NL1 = 4
if len(self.l1) == 8:
self.NL1 = 8
self.rbf = "rpirena_t8.rbf"
self.stream = dict(
i2c = False,
ttys = [],
pressure = 1,
hk = True,
counter = True
)
self.NTC=[dict()]*8
self.stream.update(stream)
self.iniz()
def iniz(self):
pass
def load(self):
pass
def load_i2c(self):
global i2c, I2C, ACC, MAG
import i2c, altera_ctrl
i2c.i2c.verbosity = 2
I2C = i2c.i2c()
ACC = i2c.ACC()
MAG = i2c.MAG()
i2c._connect(altera_ctrl)
for s in (MAG, ACC):
for retry in range(100):
try:
s.conf()
sleep(0.1)
s.who()
if not s.verify():
break
except Exception as e:
reg(7, 0x11f0)
print(s.name, repr(e), file=sys.stderr)
I2C.flight_seq(a=0x40)
def __call__(self):
adcmask(self.channels)
age(*self.age)
nsamples(*self.nsamples)
for ch in range(4):
filter(ch, self.filter)
for ch in range(self.NL1):
thres(ch, **self.l1[ch])
for tr in range(8):
l2trig(tr, **self.l2[tr])
if self.stream["i2c"]:
self.load_i2c()
self.load()
monitor_flags.update(self.monitor)
threediodes = configuration("3Diodes",
channels = 0xd,
l1 = [
dict(thr= 10.0*mV, flags=0b1000000001), # A
dict(thr= 100*mV, flags=0), # (NTC)
dict(thr= 10.0*mV, flags=0b1000000010), # B
dict(thr= 10.0*mV, flags=0b1000000100), # C
dict(thr= 6*mV, flags=0b0000001000), # A
dict(thr= 100*mV, flags=0), # NTC
dict(thr= 6*mV, flags=0b0000010000), # B
dict(thr= 6*mV, flags=0b0000100000), # C
],
l2 = [
dict(any=0b0000000001, none=0b00110000, mask=0xd), # A¬B¬C
dict(any=0b0000000010, none=0b00101000, mask=0xd), # B¬A¬C
dict(any=0b0000000100, none=0b00011000, mask=0xd), # C¬A¬B
dict(any=0b0000011000, none=0b00100000, mask=0xd), # AB¬C
dict(any=0b0000101000, none=0b00010000, mask=0xd), # AC¬B
dict(any=0b0000111000, mask=0xd), # ABC
dict(any=0b0000110000, none=0b00001000, mask=0xd), # BC¬A
dict(any=0b1000000000, mask=0xd), # AvBvC
],
i2c = True,
ttys = ["/dev/ttyAMA0"],
pressure = 3
)
tanos_jr = configuration("TANOS",
l1 = [
dict(thr= 9.0*mV, flags=0b0100000001), # B2
dict(thr= 12.0*mV, flags=0b1000000010), # CC
dict(thr= 12.0*mV, flags=0b1000000100), # AA
dict(thr= 10.0*mV, flags=0b0100001000), # B1
dict(thr= 7.0*mV, flags=0b0000010000), # B2
dict(thr= 10.0*mV, flags=0b0000100000), # CC
dict(thr= 10.0*mV, flags=0b0001000000), # AA
dict(thr= 8.0*mV, flags=0b0010000000), # B1
],
l2 = [
dict(any=0b0011100000), # B1 & AA & CC
dict(any=0b0000001000, none=0b0001110000), # B1 & ~(others)
dict(any=0b0100000000), # B1 | B2
dict(any=0b0000000010), # CC
dict(any=0b0000000100), # AA
dict(any=0b1000000000, none=0b0010010000 ), # (AA|CC) & ~B1 & ~B2
dict(any=0b0001100000), # AA & CC
dict(any=0b0010010000), # B1 & B2
],
)
tanos_jr.Vref = 3.337
tanos_jr.NTC[4] = dict(R25=10e3)
mddm = configuration("MDDM",
nsamples=(1,),
l1 = [
dict(thr=6*mV, flags=0b0100000001),
dict(thr=6*mV, flags=0b1000000010),
dict(thr=6*mV, flags=0b1000000100),
dict(thr=6*mV, flags=0b0100001000),
],
l2 = [
dict(any=0b1100000000),
dict(any=0b0000001001),
dict(any=0b0000000001),
dict(any=0b0000000010),
dict(any=0b0000000100),
dict(any=0b0000001000),
dict(any=0b0100000010),
dict(any=0b0100000100),
]
)
chaostest = configuration("CSRTEST",
nsamples=(48,0xff),
l1 = [
dict(thr=74*mV, flags=0b1000000001), # AA (H)
dict(thr=79*mV, flags=0b1000000010), # CC (H)
dict(thr=74*mV, flags=0b0100000100), # B1 (H)
dict(thr=74*mV, flags=0b0100001000), # B2 (H)
dict(thr=73*mV, flags=0b0000010000), # AA (L)
dict(thr=70*mV, flags=0b0000100000), # CC (L)
dict(thr=73*mV, flags=0b0001000000), # B1 (L)
dict(thr=73*mV, flags=0b0010000000), # B2 (L)
],
l2 = [
dict(any=0b0000010000, none=0b0000000001), # AA
dict(any=0b0000100000, none=0b0000000010), # CC
dict(any=0b0001000000, none=0b0000000100), # B1
dict(any=0b0010000000, none=0b0000001000), # B2
dict(any=1, none=1), #
dict(any=1, none=1), #
dict(any=1, none=1), #
dict(any=1, none=1), #
]
)
chaos = configuration("CHAOS",
nsamples=(1,),
l1 = [
dict(thr=7*mV, flags=0b1000000001), # AA (H)
dict(thr=7*mV, flags=0b1000000010), # CC (H)
dict(thr=7*mV, flags=0b0100000100), # B1 (H)
dict(thr=7*mV, flags=0b0100001000), # B2 (H)
dict(thr=9*mV, flags=0b0000010000), # AA (L)
dict(thr=9*mV, flags=0b0000100000), # CC (L)
dict(thr=9*mV, flags=0b0001000000), # B1 (L)
dict(thr=9*mV, flags=0b0010000000), # B2 (L)
],
l2 = [
dict(any=0b0011110000), # AA B1 B2 CC (L)
dict(any=0b0011000000), # B1 B2 (L)
dict(any=0b0000000001), # AA (H)
dict(any=0b0000000010), # CC (H)
dict(any=0b0000000100), # B1 (H)
dict(any=0b0000001000), # B2 (H)
dict(any=0b0000000011), # AA CC (H)
dict(any=0b0000001100), # B1 B2 (H)
]
)
µMustang = configuration("µM",
l1 = [
dict(thr=10*mV, flags=0b0000010001),
dict(thr=10*mV, flags=0b0000010010),
dict(thr=10*mV, flags=0b0000100100),
dict(thr=10*mV, flags=0b0000101000),
],
l2= [
dict(any=0b0000010000),
dict(any=0b0000100001),
dict(any=0b0000000011),
dict(any=0b0000001100),
dict(any=0b0000110000),
dict(all=0b0000001111),
dict(any=0b0000100011),
dict(any=0b0000011100),
],
)
default = configuration("default")
current = None
def boot(conf=default):
global current
current = None
print("Booting altera", file=sys.stderr)
altera_from_file(fn=conf.rbf)
global NL1
if conf.NL1:
NL1 = conf.NL1
else:
NL1 = test_NL1()
print(f"Number of L1 triggers {NL1}", file=sys.stderr)
print(f"Configure {conf.name}", file=sys.stderr)
conf()
current = conf
print("Reading irena configuration from FPGA", file=sys.stderr)
printtrig()
def readrecordcmd(a, n, w=0):
return struct.pack(">%dH"%(n+2+w), *( (a,)+(0,)*w+(0x8001,)*n+(0,) ))
def readrecord(a, n, w=0, cmd=None):
if cmd is None:
cmd = readrecordcmd(a, n, w)
return struct.unpack(">%dH"%n, spidev.sync_transfer(cmd=cmd, rsize=2*n))
def degC(hk, R1=3.3e3, R25=3.3e3, B25=4500):
"adc to °C, ERT-J1VT332J: 3.3kΩ, β=4500K"
from math import log
R = R1*hk/(4096-hk)
if R:
NTC = B25/(log(R/R25)+B25/298.0)
else:
NTC = 999
return NTC - 273
read_hk_cmd = readrecordcmd(0x8030, 8,16)
read_hk_pressure0_cmd = struct.pack(">3H", 0xc007, 0x0120, 0x837f) + read_hk_cmd
read_hk_pressure1_cmd = struct.pack(">3H", 0xc007, 0x0120, 0x83ff) + read_hk_cmd
read_hk_counter_cmd = struct.pack(">3H", 0xc007, 0x0120, 0x824f) + read_hk_cmd
read_i2c_cmd = struct.pack(">4H", 0xc007, 0x0120, 0xc032, 0x8d40)
def read_hk(cmd=read_hk_cmd):
return [(i>>12, i & 0xfff) for i in readrecord(0x8030, 8,4, cmd=cmd)]
# etsolopi3: Vref=3.337V
def HK(hk=None, **kk):
Vref = current.Vref
if not hk:
hk = read_hk(**kk)
sys.stderr.write("""HK
Vcore = %.3fV
Vcc = %.3fV
Vio = %.3fV
Vrpi = %.3fV
T = %.1f°C
HK1 = %.3fV %.1f°C
HK2 = %.3fV %.1f°C
Ibias = %.1fnA
""" % (
hk[0][1]*Vref/4096,
hk[1][1]*Vref*3/4096,
hk[2][1]*Vref*2/4096,
hk[3][1]*Vref*2/4096,
degC(hk[4][1]),
hk[5][1]*Vref/4096, degC(hk[5][1]),
hk[6][1]*Vref/4096, degC(hk[6][1]),
hk[7][1]*Vref/3.3*0.077,
))
return hk
def read_pressure(unit=0):
reg(0x37f| (0x80 if unit else 0))
sleep(0.5)
p = readrecord(0x8015, 8)
return p
def read_counter(clear=True, size=3):
reg(0x248 | size&3 | (4 if clear else 0))
return readrecord(0x8015, [9,16,29,16][size&3])
def pressure_calibrate(p):
Word = p[1:]
D = Word[4:]
C=[0]*7
C[1] = Word[1] >> 1
C[2] = ((Word[3] & 0x3f)<<6) | (Word[4] & 0x3f)
C[3] = Word[4]>>6
C[4] = Word[3]>>6
C[5] = ((Word[1] & 1)<<10) | (Word[2]>>6)
C[6] = Word[2] & 0x3f
UT1 = 8*C[5]+20224
dT = D[2] - UT1
TEMP = 200 + dT*(C[6]+50)/1024
OFF = C[2]*4 + ((C[4]-512)*dT)/4096
SENS = C[1] + (C[3]*dT)/1024 + 24576
X = (SENS * (D[1]-7168))/16384 - OFF
P = X*10/32 + 2500
return (TEMP*0.1,P*0.1)
EN_EVENTS = 1
EN_CLOCK = 8
EN_TxD = 2
EN_RxD = 4
def enable(on=True, what=EN_EVENTS | EN_CLOCK):
if on:
reg(10, what)
else:
reg(9, what)
def sel_RxD(pin=None):
if pin is None:
enable(False, EN_RxD)
else:
enable(False, (~pin & 3) << 14)
enable(True, EN_RxD | ((pin & 3) << 14))
def sel_TxD(pin=None):
if pin is None:
enable(False, EN_TxD)
else:
enable(False, (~pin & 3) << 12)
enable(True, EN_TxD | ((pin & 3) << 12))
def peek(i=0):
return readrecord(CORE_ADDR+8+i, 4, w=2)
def readcounter(i=1):
return readrecord(CORE_ADDR+12+i, 4, w=1)
def readtrig():
n = 32+2*NL1
w = readrecord(CORE_ADDR+4, n)
ww = [(w[i+1]<<16) | w[i] for i in range(0,n,2)]
l1 = [(i>>18,f2i(i&0x3ffff)) for i in ww[0:NL1]]
l2 = [(ww[i+1], ww[i]>>20, (ww[i]>>10)&0x3ff, ww[i]&0x3ff) for i in range(NL1,n//2,2)]
return (l1,l2)
def printtrig(t=None, f=sys.stderr, mV=None):
if not t:
t = readtrig()
for c in range(NL1):
if mV:
f.write("L1 thr mask [%d]: %5.1f mV 0x%03x\n" %
(c, t[0][c][1]/float(mV), t[0][c][0]) )
else:
f.write("L1 thr mask [%d]: %5d 0x%03x\n" %
(c, t[0][c][1], t[0][c][0]) )
for c in range(8):
f.write("L2 all/none/any read [%d]: 0x%03x / 0x%03x / 0x%03x 0x%05x\n" %
(c, t[1][c][1], t[1][c][2], t[1][c][3], t[1][c][0]) )
def F2ATB(f):
A = f2i(f>>14)
e = (f>>27) & 15
B = (f>>3) & 0x3fe
if e>1:
B <<= e-1
if f & 0x2000:
B = -B
T = f&15
return (A,T,B)
class histogram(object):
def __init__(self, fn, mV=14000, b0=-100, n=4000, m=4, resV=0.838214):
self.mV=mV
self.b0=b0/resV
self.res=mV*resV
self.b=[[0]*n for i in range(m)]
self.fn = fn
self.fmt = "%d"+" %d"*m+"\n"
def __iadd__(self, x):
for i,xx in enumerate(x):
bb = int(xx/self.res - self.b0 - 0.333)
if bb<0:
bb=0
if bb>=len(self.b[i]):
bb = len(self.b[i])-1
self.b[i][bb] += 1
return self
def save(self):
if not self.fn:
return
f = open(self.fn+"+", "w")
resV=self.res/self.mV
for i in range(len(self.b[0])):
f.write(self.fmt % (((i+self.b0+0.167)*resV,)+tuple([b[i] for b in self.b])))
f.close()
os.rename(self.fn+"+", self.fn)
from time import time, sleep
monitor_pressure = []
monitor_hk = []
monitor_flags = set()
monitor_assent = 0
monitor_assent_pressure = 500
monitor_decent = 0
monitor_decent_pressure = 800
monitor_Tfpga = 0
monitor_Tfpga_temp_sleep = 70
monitor_Tfpga_temp_shutdown = 75
monitor_Vprim = 0
monitor_Vprim_Voltage = 4.2
monitor_Tsleep = 0
monitor_Tsleep_duration = 600
def shutdown():
if "altera" in monitor_flags:
altera_reset()
if "halt" in monitor_flags:
os.system("sudo shutdown -h now")
sleep(600)
sys.exit()
def reboot():
altera_reset()
boot(current)
reg(7, 0x1f0)
enable() # TODO tty bits
def monitor():
if "decent" in monitor_flags and monitor_pressure:
p = pressure_calibrate(monitor_pressure[-1])[1]
global monitor_assent, monitor_decent
print(f"p {p:.1f} mbar"
f" ({monitor_assent_pressure:.1f}: {monitor_assent})"
f" ({monitor_decent_pressure:.1f}: {monitor_decent})",
file=sys.stderr)
if p < monitor_assent_pressure:
monitor_assent += 1
if monitor_assent > 10 and p > monitor_decent_pressure:
monitor_decent += 1
if monitor_decent > 3:
shutdown()
if "Vprim" in monitor_flags and monitor_hk:
hk = monitor_hk[-1]
Vprim = hk[3]*6.6/4096
global monitor_Vprim
print(f"V_prim {Vprim:.3f} V ({monitor_Vprim_Voltage:.3f}, {monitor_Vprim})",
file=sys.stderr)
if Vprim < monitor_Vprim_Voltage:
monitor_Vprim += 1
if monitor_Vprim > 3:
shutdown()
else:
monitor_Vprim = 0;
if "Tfpga" in monitor_flags and monitor_hk:
hk = monitor_hk[-1]
Tfpga = degC(hk[4], **current.NTC[4])
global monitor_Tfpga, monitor_Tsleep
print(f"T_fpga {Tfpga:.1f} °C"
f" ({monitor_Tfpga_temp_sleep:.1f}/{monitor_Tfpga_temp_shutdown:.1f}"
f":{monitor_Tfpga}/{monitor_Tsleep})",
file=sys.stderr)
if Tfpga > monitor_Tfpga_temp_sleep:
if "Tsleep" in monitor_flags:
monitor_Tfpga += 1
if monitor_Tfpga > 3:
monitor_Tsleep += 1
if monitor_Tsleep > 3:
altera_reset()
sleep(monitor_Tsleep_duration)
reboot()
monitor_Tfpga = 0
if Tfpga > monitor_Tfpga_temp_shutdown:
monitor_Tfpga += 1
if monitor_Tfpga > 3:
shutdown()
else:
monitor_Tfpga = 0
monitor_pressure[:] = []
monitor_hk[:] = []
hk_good = tuple(range(0,16,2))
hk_index = 0
def stream_hk(f, t, cmds, i2c):
global hk_index
tf = time()
if tf >= t + 1:
tt = int(math.floor(tf))
ttt = tt % 60
if i2c and ttt & 1:
spidev.sync_transfer(cmd=read_i2c_cmd, rsize=0)
if tt % 10:
return tt
ttt //= 10
hk_index = ttt
if ttt<len(cmds):
cmd = cmds[ttt]
else:
cmd = read_hk_cmd
hk = read_hk(cmd=cmd)
if tuple(h[0] for h in hk) == hk_good:
#sys.stderr.write("T = %5.1f°C, Ibias = %.1fnA, Vprim = %.2fV\n" % (
# degC(hk[4][1]), hk[7][1]*0.077, hk[3][1]*6.6/4096))
hk = tuple(h[1] for h in hk)
monitor_hk.append(hk)
monitor()
f.write("H %.2f %d %d %d %d %d %d %d %d\n" % ((tf,) + hk))
else:
f.write(f"XH {tf:.2f} {repr(hk)}\n")
if current.coretemp:
try:
f.write(f"Y {int(open(current.coretemp).read())/1000:.1f}\n")
except:
pass
f.flush()
return tt
return t
read_fifo_stat_cmd = readrecordcmd(0x8005,1)
read_event_packet_cmd = readrecordcmd(0x8016, 14)[:-4] + read_fifo_stat_cmd
read_sample_packet_cmd = readrecordcmd(0x8017, 6)[:-4] + read_fifo_stat_cmd
read_mux_fifo_packet_head_cmd = readrecordcmd(0x8015, 1)
read_mux_fifo_packet_cmd = readrecordcmd(0x8015, 133)[:-4] + read_fifo_stat_cmd
def print_bate1_packet(f, pp):
monitor_pressure.append(pp)
f.write("P1 0x%04x 0x%04x 0x%04x 0x%04x 0x%04x 0x%04x\n" % tuple(pp[2:8]))
def print_bate2_packet(f, pp):
f.write("P2 0x%04x 0x%04x 0x%04x 0x%04x 0x%04x 0x%04x\n" % tuple(pp[2:8]))
def print_counter_packet(f, pp):
n = NL1 + 9
pp[1:n+1] = map(ftou16, pp[1:n+1])
pp[n+1] += pp[n+2] << 16
print("C64 3", *pp[1:n+2], file=f)
def print_counter1_packet(f, pp):
pp[-3] += pp[-2] << 16
print("C64 1", *pp[1:-2], file=f)
def print_i2c_packet(f, pp):
print("ID", *pp[:-1], file=f)
mux_packets = {
0xba7e: (8, print_bate1_packet),
0xbafe: (8, print_bate2_packet),
0xc364: (20, print_counter_packet),
0xc164: (20, print_counter1_packet),
0x12c5: (133, print_i2c_packet),
}
def read_fifo_stat(attempt=0):
status = struct.unpack(">H", spidev.sync_transfer(cmd=read_fifo_stat_cmd, rsize=2))[0]
if status==0xffff:
if attempt < 2 and "reboot" in monitor_flags:
reboot()
return read_fifo_stat(attempt+1)
else:
shutdown()
return 0
return status
notification = None
notification_messages = []
def stream_events(f, en, ttys=(), hk=True, pressure=1, counter=True, i2c=False):
tty_line = [b""]*len(ttys)
hk_cmds = [read_hk_cmd]*6
if counter:
hk_cmds = [read_hk_counter_cmd]*6
if pressure:
if pressure != 2:
hk_cmds[1] = read_hk_pressure0_cmd
if pressure == 1:
hk_cmds[3] = read_hk_pressure0_cmd
else:
hk_cmds[3] = read_hk_pressure1_cmd
hk_cmds[5] = read_hk_pressure0_cmd
else:
hk_cmds[1] = read_hk_pressure1_cmd
hk_cmds[3] = read_hk_pressure1_cmd
hk_cmds[5] = read_hk_pressure1_cmd
hk_t = stream_hk(f, 0, hk_cmds, i2c)
while True:
reg(11, en)
if hk:
hk_t = stream_hk(f, hk_t, hk_cmds, i2c)
if notification:
mes = notification.trigger()
if mes:
notification_messages.append(mes)
return mes
if ttys:
for i, tty in enumerate(ttys):
tl = tty.read(256)
if tl:
tty_line[i] += tl
eol = tty_line[i].rfind(b'\n')
if eol>=0:
f.write("TTY%d: %s\n" % (i, repr(tty_line[i][:eol+1])))
tty_line[i] = tty_line[i][eol+1:]
if len(tty_line[i])>134:
f.write("TTX%d: %s\n" % (i, repr(tty_line[i])))
tty_line[i]=b""
ne = 0
fifo_stat = read_fifo_stat()
while fifo_stat & 0x4000 and ne<64:
ne += 1
r = spidev.sync_transfer(cmd=read_event_packet_cmd, rsize=30)
rr = struct.unpack(">L3H2B9H", r)
# print >> sys.stderr, "f2stat", hex(f2s), [hex(i) for i in rr]
if rr[0] != 0xbeefa128:
f.write("XE %s\n" % " ".join(["0x%04x" % x for x in rr]))
reg(7, 0x140)
fifo_stat = read_fifo_stat()
break
fifo_stat = rr[-1]
f.write("EI %d 0x%08x 0x%02x %d" % (rr[5], (rr[2]<<16)|rr[1], rr[4], rr[3]))
j=6
for i in (1,2,4,8):
if rr[1] & i:
ATB=F2ATB((rr[j+1]<<16)|rr[j])
f.write(" %d %d %d" % ATB)
j+=2
else:
f.write(" 0 0 0")
f.write("\n")
if fifo_stat & 0x2000:
ph = spidev.sync_transfer(cmd=read_mux_fifo_packet_head_cmd, rsize=2)
pp = struct.unpack(">H", ph)
if pp[0] not in mux_packets:
f.write(f"XM 0x{pp[0]:04x}\n")
reg(7, 0x120)
fifo_stat = read_fifo_stat()
else:
ps, print_func = mux_packets[pp[0]]
cmd = read_mux_fifo_packet_cmd[-2*(ps+1):]
p = spidev.sync_transfer(cmd=cmd, rsize=2*ps)
pp = list(pp)
pp.extend(struct.unpack(f">{ps}H", p))
fifo_stat = pp[-1]
print_func(f, pp)
ns = 0
while fifo_stat & 0x8000 and not fifo_stat & 0x4000 and ns<64:
s = spidev.sync_transfer(cmd=read_sample_packet_cmd, rsize=14)
ss = struct.unpack(">4HLH", s)
if ss[0] != 0x5a61:
f.write("XS %s\n" % " ".join(["0x%04x" % x for x in ss]))
reg(7, 0x180)
fifo_stat = read_fifo_stat()
break
fifo_stat = ss[-1]
ns += 1
f.write("S %d %d %d %d %d\n" % (
ss[4],
ss[1]>>4,
((ss[1]&15)<<8)|(ss[2]>>8),
((ss[2]&255)<<4)|(ss[3]>>12),
ss[3]&0xfff))
if not (fifo_stat & 0xc000):
t = time()
if not ne or 0.9 < t % 2.0 < 1:
sleep(0.1 - t % 0.1)
elif ne < 64:
sleep(0.01)
def stream_file(fn):
conf = current
f = open(fn, "a")
for i,fn in enumerate(conf.stream["ttys"]):
if not isinstance(fn, str):
continue
import serial, termios
fn = fn.split(",")
baud = 4800
tty = 1
if len(fn) >= 2:
baud = int(fn[1])
if len(fn) >= 3:
tty = int(fn[2])
s = serial.Serial(fn[0], baud, timeout=0)
if tty >= 0:
sel_RxD(tty)
conf.stream["ttys"][i] = s
reg(7, 0x1f0)
en = reg(8) | EN_EVENTS | EN_CLOCK
enable(True, en)
print(repr(conf.stream), file=sys.stderr)
print(f"enable bits are {en:#04x}", file=sys.stderr)
while True:
try:
mes = stream_events(f, en=en, **conf.stream)
if mes:
f.close()
sys.stderr.write("interrupted by notification: %s\n" % repr(mes))
break
except IOError as e:
sys.stderr.write(repr(e)+"\n")
reg(7, 0x1f0)
except KeyboardInterrupt:
f.close()
break