#! /usr/bin/python3 -i # encoding: UTF-8 from altera_ctrl import * import math def set_prompt(prompt="RPIRENA"): sys.ps1 = prompt + "> " sys.ps2 = prompt + ". " try: ip=sys.modules["__main__"].__dict__["get_ipython"]() from IPython.terminal.prompts import Prompts, Token class soloprompts(Prompts): def in_prompt_tokens(self, *wtf): return [(Token, prompt+'> ')] def continuation_prompt_tokens(self, *wtf): return [(Token, prompt+'. ')] def out_prompt_tokens(self, *wtf): return [(Token, prompt+'= ')] def rewrite_prompt_tokens(self, *wtf): return [(Token, prompt+'- ')] ip.prompts=soloprompts(ip) except ImportError: ip.prompt_manager.templates.update({ 'in': 'In [{color.number}{count}{color.prompt}] '+prompt+'> ', 'in2': ' .{dots}. '+prompt+'. ', 'out': 'Out[{color.number}{count}{color.prompt}] '+prompt+'> ', }) except NameError: pass except KeyError: pass set_prompt() DREG_ADDR = 0xc020 CORE_ADDR = 0x8040 TRIG_ADDR = 0x8100 def filter(ch, p): c = b"" for i in range(16): n = p[(i+4)&15][0] & 0x3f a = p[(i+2)&15][1] & 0x1fff b = p[(i+2)&15][2] & 0x1fff d = (n<<26)|(b<<13)|a; c += struct.pack(">6H", DREG_ADDR+1, d&0xffff, DREG_ADDR+2, d>>16, DREG_ADDR+3, 0x800|(ch*16+i)) spidev.sync_transfer(cmd=c, rsize=0) def trigconf(i, v): i *= 2 c = struct.pack(">6H", TRIG_ADDR|0xc000|i, v&0xffff, TRIG_ADDR|0xc001|i, v>>16, 0x8001, 0) spidev.sync_transfer(cmd=c, rsize=0) def i2f(i): if i<0: s = 0x20000 i = -i else: s = 0 if i<0x4000: return s|i e=1 while i>=0x4000: i>>=1 e+=1 if e>15: return s|0x1ffff return s|(e<<13)|(i&0x1fff) def f2i(f): e = (f>>13)&15 if e<2: m = f & 0x3fff else: m = (0x2000|f&0x1fff)<<(e-1) return -m if f&0x20000 else m def ftou16(f): e = f>>12 if e<2: return f return (0x1000|f&0xfff)<<(e-1) l1_trig = [[0,222222222],[0,222222222],[0,222222222],[0,222222222], [0,222222222],[0,222222222],[0,222222222],[0,222222222]] def thres(ch, thr=None, flags=None): if flags is None: flags = l1_trig[ch][0] else: l1_trig[ch][0] = flags if thr is None: thr = l1_trig[ch][1] else: l1_trig[ch][1] = thr trigconf(ch, (flags<<18)|i2f(int(thr))) NL1 = 4 def test_NL1(): global NL1 for i in range(0x100,0x180): reg(i,0x1111) sync(0x8044) r = [sync(0x8001) for i in range(50)] if r[48]==0x1111: NL1=8 elif r[40]==0x1111: NL1=4 else: raise ValueError(f"cannot find NL1 from {repr(r)}") return NL1 def l2trig(i, all=0, none=0, any=0, mask=0xf): trigconf(2*i+NL1, (all<<20)|(none<<10)|any) trigconf(2*i+NL1+1, mask) def adcmask(m=0xf): return reg(CORE_ADDR, m) def age(agemin=2, agetmin=3, agetmax=4, agemax=5): return reg(CORE_ADDR+2, (agemin<<12)|(agetmin<<8)|(agetmax<<4)|agemax) def nsamples(n=0, t=0xff): return reg(CORE_ADDR+3, (t<<8)|n) p22=[ (36, 1264, -3708), ( 1, 1713, -3708), ( 5, 3369, 0), ( 6, 3642, 0), ( 7, 3798, 0), ( 8, 3798, 0), ( 9, 3583, 0), (10, 3096, 0), (12, 992, 3321), (13, -685, 4095), (17, -4095, 0), (18, -4095, 0), (19, -4095, 0), (20, -4095, 0), (21, -4095, 0), (22, -4095, 0), ] p1=[ (20, -670, -851), ( 1, -303, -1070), ( 2, 158, -1153), ( 3, 656, -1263), ( 4, 1200, -1208), ( 5, 1721, -987), ( 6, 2147, -411), ( 7, 2325, 339), ( 8, 2123, 1337), ( 9, 1330, 2454), (10, -128, 2813), (12, -2114, 0), (13, -2114, 0), (14, -2114, 0), (15, -2114, 0), (16, -2103, 0), ] mV=14000.0 class configuration(object): rbf = "rpirena.rbf" coretemp = "/sys/class/hwmon/hwmon0/temp1_input" Vref = 3.300 monitor = ("decent", "Vprim", "Tfpga", "Tsleep", "altera", "halt", "reboot") def __init__(self, name, channels=0xf, l1=[dict(thr=100*mV, flags=0x001)]*4, l2=[dict(any=0x001, mask=0xf)]+[{}]*7, filter=p22, age=(), nsamples=(0,), **stream ): self.name = name self.channels = channels self.filter = filter self.thres = thres self.l1 = l1 self.l2 = l2 self.age = age self.nsamples = nsamples self.NL1 = 4 if len(self.l1) == 8: self.NL1 = 8 self.rbf = "rpirena_t8.rbf" self.stream = dict( i2c = False, ttys = [], pressure = 1, hk = True, counter = True ) self.NTC=[dict()]*8 self.stream.update(stream) self.iniz() def iniz(self): pass def load(self): pass def load_i2c(self): global i2c, I2C, ACC, MAG import i2c, altera_ctrl i2c.i2c.verbosity = 2 I2C = i2c.i2c() ACC = i2c.ACC() MAG = i2c.MAG() i2c._connect(altera_ctrl) for s in (MAG, ACC): for retry in range(100): try: s.conf() sleep(0.1) s.who() if not s.verify(): break except Exception as e: reg(7, 0x11f0) print(s.name, repr(e), file=sys.stderr) I2C.flight_seq(a=0x40) def __call__(self): adcmask(self.channels) age(*self.age) nsamples(*self.nsamples) for ch in range(4): filter(ch, self.filter) for ch in range(self.NL1): thres(ch, **self.l1[ch]) for tr in range(8): l2trig(tr, **self.l2[tr]) if self.stream["i2c"]: self.load_i2c() self.load() monitor_flags.update(self.monitor) threediodes = configuration("3Diodes", channels = 0xd, l1 = [ dict(thr= 10.0*mV, flags=0b1000000001), # A dict(thr= 100*mV, flags=0), # (NTC) dict(thr= 10.0*mV, flags=0b1000000010), # B dict(thr= 10.0*mV, flags=0b1000000100), # C dict(thr= 6*mV, flags=0b0000001000), # A dict(thr= 100*mV, flags=0), # NTC dict(thr= 6*mV, flags=0b0000010000), # B dict(thr= 6*mV, flags=0b0000100000), # C ], l2 = [ dict(any=0b0000000001, none=0b00110000, mask=0xd), # A¬B¬C dict(any=0b0000000010, none=0b00101000, mask=0xd), # B¬A¬C dict(any=0b0000000100, none=0b00011000, mask=0xd), # C¬A¬B dict(any=0b0000011000, none=0b00100000, mask=0xd), # AB¬C dict(any=0b0000101000, none=0b00010000, mask=0xd), # AC¬B dict(any=0b0000111000, mask=0xd), # ABC dict(any=0b0000110000, none=0b00001000, mask=0xd), # BC¬A dict(any=0b1000000000, mask=0xd), # AvBvC ], i2c = True, ttys = ["/dev/ttyAMA0"], pressure = 3 ) tanos_jr = configuration("TANOS", l1 = [ dict(thr= 9.0*mV, flags=0b0100000001), # B2 dict(thr= 12.0*mV, flags=0b1000000010), # CC dict(thr= 12.0*mV, flags=0b1000000100), # AA dict(thr= 10.0*mV, flags=0b0100001000), # B1 dict(thr= 7.0*mV, flags=0b0000010000), # B2 dict(thr= 10.0*mV, flags=0b0000100000), # CC dict(thr= 10.0*mV, flags=0b0001000000), # AA dict(thr= 8.0*mV, flags=0b0010000000), # B1 ], l2 = [ dict(any=0b0011100000), # B1 & AA & CC dict(any=0b0000001000, none=0b0001110000), # B1 & ~(others) dict(any=0b0100000000), # B1 | B2 dict(any=0b0000000010), # CC dict(any=0b0000000100), # AA dict(any=0b1000000000, none=0b0010010000 ), # (AA|CC) & ~B1 & ~B2 dict(any=0b0001100000), # AA & CC dict(any=0b0010010000), # B1 & B2 ], ) tanos_jr.Vref = 3.337 tanos_jr.NTC[4] = dict(R25=10e3) mddm = configuration("MDDM", nsamples=(1,), l1 = [ dict(thr=6*mV, flags=0b0100000001), dict(thr=6*mV, flags=0b1000000010), dict(thr=6*mV, flags=0b1000000100), dict(thr=6*mV, flags=0b0100001000), ], l2 = [ dict(any=0b1100000000), dict(any=0b0000001001), dict(any=0b0000000001), dict(any=0b0000000010), dict(any=0b0000000100), dict(any=0b0000001000), dict(any=0b0100000010), dict(any=0b0100000100), ] ) chaostest = configuration("CSRTEST", nsamples=(48,0xff), l1 = [ dict(thr=74*mV, flags=0b1000000001), # AA (H) dict(thr=79*mV, flags=0b1000000010), # CC (H) dict(thr=74*mV, flags=0b0100000100), # B1 (H) dict(thr=74*mV, flags=0b0100001000), # B2 (H) dict(thr=73*mV, flags=0b0000010000), # AA (L) dict(thr=70*mV, flags=0b0000100000), # CC (L) dict(thr=73*mV, flags=0b0001000000), # B1 (L) dict(thr=73*mV, flags=0b0010000000), # B2 (L) ], l2 = [ dict(any=0b0000010000, none=0b0000000001), # AA dict(any=0b0000100000, none=0b0000000010), # CC dict(any=0b0001000000, none=0b0000000100), # B1 dict(any=0b0010000000, none=0b0000001000), # B2 dict(any=1, none=1), # dict(any=1, none=1), # dict(any=1, none=1), # dict(any=1, none=1), # ] ) chaos = configuration("CHAOS", nsamples=(1,), l1 = [ dict(thr=7*mV, flags=0b1000000001), # AA (H) dict(thr=7*mV, flags=0b1000000010), # CC (H) dict(thr=7*mV, flags=0b0100000100), # B1 (H) dict(thr=7*mV, flags=0b0100001000), # B2 (H) dict(thr=9*mV, flags=0b0000010000), # AA (L) dict(thr=9*mV, flags=0b0000100000), # CC (L) dict(thr=9*mV, flags=0b0001000000), # B1 (L) dict(thr=9*mV, flags=0b0010000000), # B2 (L) ], l2 = [ dict(any=0b0011110000), # AA B1 B2 CC (L) dict(any=0b0011000000), # B1 B2 (L) dict(any=0b0000000001), # AA (H) dict(any=0b0000000010), # CC (H) dict(any=0b0000000100), # B1 (H) dict(any=0b0000001000), # B2 (H) dict(any=0b0000000011), # AA CC (H) dict(any=0b0000001100), # B1 B2 (H) ] ) µMustang = configuration("µM", l1 = [ dict(thr=10*mV, flags=0b0000010001), dict(thr=10*mV, flags=0b0000010010), dict(thr=10*mV, flags=0b0000100100), dict(thr=10*mV, flags=0b0000101000), ], l2= [ dict(any=0b0000010000), dict(any=0b0000100001), dict(any=0b0000000011), dict(any=0b0000001100), dict(any=0b0000110000), dict(all=0b0000001111), dict(any=0b0000100011), dict(any=0b0000011100), ], ) default = configuration("default") current = None def boot(conf=default): global current current = None print("Booting altera", file=sys.stderr) altera_from_file(fn=conf.rbf) global NL1 if conf.NL1: NL1 = conf.NL1 else: NL1 = test_NL1() print(f"Number of L1 triggers {NL1}", file=sys.stderr) print(f"Configure {conf.name}", file=sys.stderr) conf() current = conf print("Reading irena configuration from FPGA", file=sys.stderr) printtrig() def readrecordcmd(a, n, w=0): return struct.pack(">%dH"%(n+2+w), *( (a,)+(0,)*w+(0x8001,)*n+(0,) )) def readrecord(a, n, w=0, cmd=None): if cmd is None: cmd = readrecordcmd(a, n, w) return struct.unpack(">%dH"%n, spidev.sync_transfer(cmd=cmd, rsize=2*n)) def degC(hk, R1=3.3e3, R25=3.3e3, B25=4500): "adc to °C, ERT-J1VT332J: 3.3kΩ, β=4500K" from math import log R = R1*hk/(4096-hk) if R: NTC = B25/(log(R/R25)+B25/298.0) else: NTC = 999 return NTC - 273 read_hk_cmd = readrecordcmd(0x8030, 8,16) read_hk_pressure0_cmd = struct.pack(">3H", 0xc007, 0x0120, 0x837f) + read_hk_cmd read_hk_pressure1_cmd = struct.pack(">3H", 0xc007, 0x0120, 0x83ff) + read_hk_cmd read_hk_counter_cmd = struct.pack(">3H", 0xc007, 0x0120, 0x824f) + read_hk_cmd read_i2c_cmd = struct.pack(">4H", 0xc007, 0x0120, 0xc032, 0x8d40) def read_hk(cmd=read_hk_cmd): return [(i>>12, i & 0xfff) for i in readrecord(0x8030, 8,4, cmd=cmd)] # etsolopi3: Vref=3.337V def HK(hk=None, **kk): Vref = current.Vref if not hk: hk = read_hk(**kk) sys.stderr.write("""HK Vcore = %.3fV Vcc = %.3fV Vio = %.3fV Vrpi = %.3fV T = %.1f°C HK1 = %.3fV %.1f°C HK2 = %.3fV %.1f°C Ibias = %.1fnA """ % ( hk[0][1]*Vref/4096, hk[1][1]*Vref*3/4096, hk[2][1]*Vref*2/4096, hk[3][1]*Vref*2/4096, degC(hk[4][1]), hk[5][1]*Vref/4096, degC(hk[5][1]), hk[6][1]*Vref/4096, degC(hk[6][1]), hk[7][1]*Vref/3.3*0.077, )) return hk def read_pressure(unit=0): reg(0x37f| (0x80 if unit else 0)) sleep(0.5) p = readrecord(0x8015, 8) return p def read_counter(clear=True, size=3): reg(0x248 | size&3 | (4 if clear else 0)) return readrecord(0x8015, [9,16,29,16][size&3]) def pressure_calibrate(p): Word = p[1:] D = Word[4:] C=[0]*7 C[1] = Word[1] >> 1 C[2] = ((Word[3] & 0x3f)<<6) | (Word[4] & 0x3f) C[3] = Word[4]>>6 C[4] = Word[3]>>6 C[5] = ((Word[1] & 1)<<10) | (Word[2]>>6) C[6] = Word[2] & 0x3f UT1 = 8*C[5]+20224 dT = D[2] - UT1 TEMP = 200 + dT*(C[6]+50)/1024 OFF = C[2]*4 + ((C[4]-512)*dT)/4096 SENS = C[1] + (C[3]*dT)/1024 + 24576 X = (SENS * (D[1]-7168))/16384 - OFF P = X*10/32 + 2500 return (TEMP*0.1,P*0.1) EN_EVENTS = 1 EN_CLOCK = 8 EN_TxD = 2 EN_RxD = 4 def enable(on=True, what=EN_EVENTS | EN_CLOCK): if on: reg(10, what) else: reg(9, what) def sel_RxD(pin=None): if pin is None: enable(False, EN_RxD) else: enable(False, (~pin & 3) << 14) enable(True, EN_RxD | ((pin & 3) << 14)) def sel_TxD(pin=None): if pin is None: enable(False, EN_TxD) else: enable(False, (~pin & 3) << 12) enable(True, EN_TxD | ((pin & 3) << 12)) def peek(i=0): return readrecord(CORE_ADDR+8+i, 4, w=2) def readcounter(i=1): return readrecord(CORE_ADDR+12+i, 4, w=1) def readtrig(): n = 32+2*NL1 w = readrecord(CORE_ADDR+4, n) ww = [(w[i+1]<<16) | w[i] for i in range(0,n,2)] l1 = [(i>>18,f2i(i&0x3ffff)) for i in ww[0:NL1]] l2 = [(ww[i+1], ww[i]>>20, (ww[i]>>10)&0x3ff, ww[i]&0x3ff) for i in range(NL1,n//2,2)] return (l1,l2) def printtrig(t=None, f=sys.stderr, mV=None): if not t: t = readtrig() for c in range(NL1): if mV: f.write("L1 thr mask [%d]: %5.1f mV 0x%03x\n" % (c, t[0][c][1]/float(mV), t[0][c][0]) ) else: f.write("L1 thr mask [%d]: %5d 0x%03x\n" % (c, t[0][c][1], t[0][c][0]) ) for c in range(8): f.write("L2 all/none/any read [%d]: 0x%03x / 0x%03x / 0x%03x 0x%05x\n" % (c, t[1][c][1], t[1][c][2], t[1][c][3], t[1][c][0]) ) def F2ATB(f): A = f2i(f>>14) e = (f>>27) & 15 B = (f>>3) & 0x3fe if e>1: B <<= e-1 if f & 0x2000: B = -B T = f&15 return (A,T,B) class histogram(object): def __init__(self, fn, mV=14000, b0=-100, n=4000, m=4, resV=0.838214): self.mV=mV self.b0=b0/resV self.res=mV*resV self.b=[[0]*n for i in range(m)] self.fn = fn self.fmt = "%d"+" %d"*m+"\n" def __iadd__(self, x): for i,xx in enumerate(x): bb = int(xx/self.res - self.b0 - 0.333) if bb<0: bb=0 if bb>=len(self.b[i]): bb = len(self.b[i])-1 self.b[i][bb] += 1 return self def save(self): if not self.fn: return f = open(self.fn+"+", "w") resV=self.res/self.mV for i in range(len(self.b[0])): f.write(self.fmt % (((i+self.b0+0.167)*resV,)+tuple([b[i] for b in self.b]))) f.close() os.rename(self.fn+"+", self.fn) from time import time, sleep monitor_pressure = [] monitor_hk = [] monitor_flags = set() monitor_assent = 0 monitor_assent_pressure = 500 monitor_decent = 0 monitor_decent_pressure = 800 monitor_Tfpga = 0 monitor_Tfpga_temp_sleep = 70 monitor_Tfpga_temp_shutdown = 75 monitor_Vprim = 0 monitor_Vprim_Voltage = 4.2 monitor_Tsleep = 0 monitor_Tsleep_duration = 600 def shutdown(): if "altera" in monitor_flags: altera_reset() if "halt" in monitor_flags: os.system("sudo shutdown -h now") sleep(600) sys.exit() def reboot(): altera_reset() boot(current) reg(7, 0x1f0) enable() # TODO tty bits def monitor(): if "decent" in monitor_flags and monitor_pressure: p = pressure_calibrate(monitor_pressure[-1])[1] global monitor_assent, monitor_decent print(f"p {p:.1f} mbar" f" ({monitor_assent_pressure:.1f}: {monitor_assent})" f" ({monitor_decent_pressure:.1f}: {monitor_decent})", file=sys.stderr) if p < monitor_assent_pressure: monitor_assent += 1 if monitor_assent > 10 and p > monitor_decent_pressure: monitor_decent += 1 if monitor_decent > 3: shutdown() if "Vprim" in monitor_flags and monitor_hk: hk = monitor_hk[-1] Vprim = hk[3]*6.6/4096 global monitor_Vprim print(f"V_prim {Vprim:.3f} V ({monitor_Vprim_Voltage:.3f}, {monitor_Vprim})", file=sys.stderr) if Vprim < monitor_Vprim_Voltage: monitor_Vprim += 1 if monitor_Vprim > 3: shutdown() else: monitor_Vprim = 0; if "Tfpga" in monitor_flags and monitor_hk: hk = monitor_hk[-1] Tfpga = degC(hk[4], **current.NTC[4]) global monitor_Tfpga, monitor_Tsleep print(f"T_fpga {Tfpga:.1f} °C" f" ({monitor_Tfpga_temp_sleep:.1f}/{monitor_Tfpga_temp_shutdown:.1f}" f":{monitor_Tfpga}/{monitor_Tsleep})", file=sys.stderr) if Tfpga > monitor_Tfpga_temp_sleep: if "Tsleep" in monitor_flags: monitor_Tfpga += 1 if monitor_Tfpga > 3: monitor_Tsleep += 1 if monitor_Tsleep > 3: altera_reset() sleep(monitor_Tsleep_duration) reboot() monitor_Tfpga = 0 if Tfpga > monitor_Tfpga_temp_shutdown: monitor_Tfpga += 1 if monitor_Tfpga > 3: shutdown() else: monitor_Tfpga = 0 monitor_pressure[:] = [] monitor_hk[:] = [] hk_good = tuple(range(0,16,2)) hk_index = 0 def stream_hk(f, t, cmds, i2c): global hk_index tf = time() if tf >= t + 1: tt = int(math.floor(tf)) ttt = tt % 60 if i2c and ttt & 1: spidev.sync_transfer(cmd=read_i2c_cmd, rsize=0) if tt % 10: return tt ttt //= 10 hk_index = ttt if tttH", spidev.sync_transfer(cmd=read_fifo_stat_cmd, rsize=2))[0] if status==0xffff: if attempt < 2 and "reboot" in monitor_flags: reboot() return read_fifo_stat(attempt+1) else: shutdown() return 0 return status notification = None notification_messages = [] def stream_events(f, en, ttys=(), hk=True, pressure=1, counter=True, i2c=False): tty_line = [b""]*len(ttys) hk_cmds = [read_hk_cmd]*6 if counter: hk_cmds = [read_hk_counter_cmd]*6 if pressure: if pressure != 2: hk_cmds[1] = read_hk_pressure0_cmd if pressure == 1: hk_cmds[3] = read_hk_pressure0_cmd else: hk_cmds[3] = read_hk_pressure1_cmd hk_cmds[5] = read_hk_pressure0_cmd else: hk_cmds[1] = read_hk_pressure1_cmd hk_cmds[3] = read_hk_pressure1_cmd hk_cmds[5] = read_hk_pressure1_cmd hk_t = stream_hk(f, 0, hk_cmds, i2c) while True: reg(11, en) if hk: hk_t = stream_hk(f, hk_t, hk_cmds, i2c) if notification: mes = notification.trigger() if mes: notification_messages.append(mes) return mes if ttys: for i, tty in enumerate(ttys): tl = tty.read(256) if tl: tty_line[i] += tl eol = tty_line[i].rfind(b'\n') if eol>=0: f.write("TTY%d: %s\n" % (i, repr(tty_line[i][:eol+1]))) tty_line[i] = tty_line[i][eol+1:] if len(tty_line[i])>134: f.write("TTX%d: %s\n" % (i, repr(tty_line[i]))) tty_line[i]=b"" ne = 0 fifo_stat = read_fifo_stat() while fifo_stat & 0x4000 and ne<64: ne += 1 r = spidev.sync_transfer(cmd=read_event_packet_cmd, rsize=30) rr = struct.unpack(">L3H2B9H", r) # print >> sys.stderr, "f2stat", hex(f2s), [hex(i) for i in rr] if rr[0] != 0xbeefa128: f.write("XE %s\n" % " ".join(["0x%04x" % x for x in rr])) reg(7, 0x140) fifo_stat = read_fifo_stat() break fifo_stat = rr[-1] f.write("EI %d 0x%08x 0x%02x %d" % (rr[5], (rr[2]<<16)|rr[1], rr[4], rr[3])) j=6 for i in (1,2,4,8): if rr[1] & i: ATB=F2ATB((rr[j+1]<<16)|rr[j]) f.write(" %d %d %d" % ATB) j+=2 else: f.write(" 0 0 0") f.write("\n") if fifo_stat & 0x2000: ph = spidev.sync_transfer(cmd=read_mux_fifo_packet_head_cmd, rsize=2) pp = struct.unpack(">H", ph) if pp[0] not in mux_packets: f.write(f"XM 0x{pp[0]:04x}\n") reg(7, 0x120) fifo_stat = read_fifo_stat() else: ps, print_func = mux_packets[pp[0]] cmd = read_mux_fifo_packet_cmd[-2*(ps+1):] p = spidev.sync_transfer(cmd=cmd, rsize=2*ps) pp = list(pp) pp.extend(struct.unpack(f">{ps}H", p)) fifo_stat = pp[-1] print_func(f, pp) ns = 0 while fifo_stat & 0x8000 and not fifo_stat & 0x4000 and ns<64: s = spidev.sync_transfer(cmd=read_sample_packet_cmd, rsize=14) ss = struct.unpack(">4HLH", s) if ss[0] != 0x5a61: f.write("XS %s\n" % " ".join(["0x%04x" % x for x in ss])) reg(7, 0x180) fifo_stat = read_fifo_stat() break fifo_stat = ss[-1] ns += 1 f.write("S %d %d %d %d %d\n" % ( ss[4], ss[1]>>4, ((ss[1]&15)<<8)|(ss[2]>>8), ((ss[2]&255)<<4)|(ss[3]>>12), ss[3]&0xfff)) if not (fifo_stat & 0xc000): t = time() if not ne or 0.9 < t % 2.0 < 1: sleep(0.1 - t % 0.1) elif ne < 64: sleep(0.01) def stream_file(fn): conf = current f = open(fn, "a") for i,fn in enumerate(conf.stream["ttys"]): if not isinstance(fn, str): continue import serial, termios fn = fn.split(",") baud = 4800 tty = 1 if len(fn) >= 2: baud = int(fn[1]) if len(fn) >= 3: tty = int(fn[2]) s = serial.Serial(fn[0], baud, timeout=0) if tty >= 0: sel_RxD(tty) conf.stream["ttys"][i] = s reg(7, 0x1f0) en = reg(8) | EN_EVENTS | EN_CLOCK enable(True, en) print(repr(conf.stream), file=sys.stderr) print(f"enable bits are {en:#04x}", file=sys.stderr) while True: try: mes = stream_events(f, en=en, **conf.stream) if mes: f.close() sys.stderr.write("interrupted by notification: %s\n" % repr(mes)) break except IOError as e: sys.stderr.write(repr(e)+"\n") reg(7, 0x1f0) except KeyboardInterrupt: f.close() break