turbo_weather/dose.py
2026-06-28 18:27:04 +02:00

510 lines
15 KiB
Python

#! /usr/bin/ipython3 --profile=turbo_dose
import sys, time, getopt, fileinput, struct, csv, os, contextlib
import uart
from base85 import base85_encode, base85_decode
from map import memmap
from iotn424 import SFR
import flash as flash_cmd
flash_cmd = flash_cmd.flash_cmd()
sys.path[1:1] = ["./bch4369"]
from bch4369 import bch
options, files = getopt.gnu_getopt(sys.argv[1:], "F:o:M:", ["debug", "tty=", "output=", "map=", "galois"])
tty = None
baud = 115200
out = None
debug = None
map_fn = "hallo.map"
def Debug(e, *a):
if debug:
import traceback
sys.stdout.flush()
print("xdebug", a, repr(e), file=sys.stderr)
traceback.print_exception(e, limit=-2, file=sys.stderr)
for o,v in options:
if o == "--debug":
debug = True
if o in "-F --tty":
tty = v
v = v.split(",", 1)
if v[1:]:
tty = v[0]
baud = int(v[1])
do_clock = True
if o in "--output":
if out:
raise ValueError("cannot have multiple --outputs")
if v=="-":
out = sys.stdout
elif v=="--":
out = sys.stderr
else:
out = open(v, "a")
if o in "-M --map":
map_fn = v
if o == "--galois":
bch.load_galois()
if not out:
out = sys.stdout
if len(files)==1:
if "/dev/tty" in files[0]:
tty = files[0]
files = []
mmap = memmap(map_fn)
class dose_cmd(uart.uart):
def cmd(self, c, d=None, timeout=0.2):
if not isinstance(c, bytes):
c = c.encode()
if d:
c += b" " + base85_encode(d)
self.flush()
self.ucmd(c)
r = self.resp(timeout)
while r and (r[0] != b'#'[0] or r[1] != c[0]):
r = self.resp(timeout)
d = None
if not r:
return r, d
r = r.split()
e = int(r[-1], 16)
if len(r)==3:
d = base85_decode(r[1])
if self._verbose:
print(f"{r[0]}[{r[2]}] {"".join([f"{b:02x}" for b in d])}", file=sys.stderr)
self.last_cmd = c[0:1]
return r[0], e, d
def poke(self, a, d, s=None, ccp=0):
return self.peek(a, s=s, d=d, ccp=ccp)
def peek(self, a=None, s=None, d=None, ccp=0):
name = None
if a in SFR:
name = a
a, ss = SFR[a]
if s is None:
s = ss
if a in mmap:
a, ss = mmap[a]
if ss and s is None:
s = ss
if isinstance(a, str):
print(f"PEEK: unknown addr {a}", file=sys.stderr)
for k in mmap.keys():
if a in k:
print(f"PEEK: in RAM we have {k}", file=sys.stderr)
for k in SFR.keys():
if a in k:
print(f"PEEK: IO SFR we have {k}", file=sys.stderr)
raise ValueError(a)
FF = {1: "B", 2: "H", 4:"I"}
if a is None:
cc, nn, dd = self.cmd(b"M=")
else:
if d is None:
c = b"M"
d = b''
if s is None:
s = 12
else:
c = b'M!'
if isinstance(d, int) and s in FF:
d = struct.pack(f"<{FF[s]}", d)
else:
d = bytes(d)
if s is None:
s = len(d)
d = struct.pack("<HBB", a,s,ccp) + d + b'\0\0\0\0\0\0\0\0\0\0\0\0'
cc, nn, dd = self.cmd(c, d[:16])
if dd and nn:
aa,ss,cc = struct.unpack("<HBB", dd[:4])
print(f"PEEK: {aa-nn:04x}[{nn}+{ss}]: {b2hex(dd[4:4+nn])}", file=sys.stderr)
if name is not None and nn in FF:
d = struct.unpack(f"<{FF[nn]}", dd[4:4+nn])
print(f"{name}: {d[0]:#x}", file=sys.stderr)
return name, d
return cc, nn, dd
def more(self, flags=b''):
return self.cmd(self.last_cmd + b'=' + flags)
def read_mem(self, a, s=None):
cc, nn, dd = self.peek(a=a, s=s)
r = dd[4:4+nn]
while dd[2]:
cc, nn, dd = self.more()
r += dd[4:4+nn]
return r
_adc_conf = None
_sigrow = None
ADC_MUX = {
0: "GND",
6: "Gate",
7: "Drain1",
10: "Drain2",
0x30: "GND",
0x31: ("Vdd", 10, 0, 0),
0x32: "T",
0x33: "DACREF",
}
ADC_REF = {
0: (3.3, "VDD"),
4: (1.024,),
5: (2.048,),
6: (2.5,),
7: (4.096,),
}
def adc(self, start=False, read_conf=False):
if not self._adc_conf or read_conf:
self._adc_conf = self.read_mem("adc_conf")
self._sigrow = self.read_mem(0x1100, 0x24)
g = self._sigrow[0x20]
o = self._sigrow[0x21]
if o & 0x80:
o -= 0x100
o <<= 6
g /= 1 << 14
self.ADC_MUX[0x32] = ("T°C", g, o, 273)
c = "A!<" if start else "A<"
cc, nn, dd = self.cmd(c)
dd = struct.unpack("<8H", dd)
r = []
for i in range(nn):
pos = self._adc_conf[4*i+2] & 0x3f
ref = self._adc_conf[4*i+1] & 0x07
if ref in self.ADC_REF:
ref = self.ADC_REF[ref]
else:
ref = (1,)
a = dd[i]
if pos != 0x32:
a *= ref[0]/0x10000
if pos in self.ADC_MUX:
pos = self.ADC_MUX[pos]
else:
pos = f"Ain{pos}"
if isinstance(pos, tuple):
a = (a - pos[2])*pos[1] - pos[3]
pos = pos[0]
r.append((a, pos, ref))
print(f"{i} {pos:6s} {a:.4f} {ref!r}", file=sys.stderr)
return r
def nfet_scan(self, t=1, dcs=(0x1000,)):
r = []
for dc in range(*dcs):
self.poke("TCA0_SINGLE_CMP0", dc)
time.sleep(t)
self.cmd("A!")
time.sleep(1)
r.append((dc, self.adc()))
return r
def fet_temp_log(self, fn="fet_temp_log.csv",
dc_start=0, dc_stop=0x1000, dc_step=0x40,
settle=0.5, adc_settle=0.5,
duration=None, sweeps=None, interval=0.0,
idle_dc=0, gate="TCA0_SINGLE_CMP0",
quiet=True, append=False):
"""
Records FET characteristic curve with temperature over time to a CSV.
"""
if dc_step == 0:
raise ValueError("dc_step must not be 0")
if (dc_stop - dc_start) * dc_step <= 0:
raise ValueError(
f"dc_step sign does not move {dc_start:#x} -> {dc_stop:#x}")
if sweeps is not None and sweeps <= 0:
raise ValueError("Sweeps must be positive or None")
if sweeps is None and duration is None:
sweeps = 1
def clamp_dc(x):
x = int(round(x))
return 0 if x < 0 else 0xffff if x > 0xffff else x
dcs = []
dc = dc_start
while (dc < dc_stop) if dc_step > 0 else (dc > dc_stop):
dcs.append(clamp_dc(dc))
dc += dc_step
if not dcs:
raise ValueError("Empty sweep. Check dc_start/dc_stop/dc_step")
devnull = open(os.devnull, "w") if quiet else None
def reader():
return contextlib.redirect_stderr(devnull) if quiet \
else contextlib.nullcontext()
def row_channels(adc_rows):
d = {}
for val, pos, _ref in adc_rows:
name = str(pos)
if name in d:
k = 2
while f"{name}#{k}" in d:
k += 1
name = f"{name}#{k}"
d[name] = val
return d
with reader():
first = self.adc(start=True, read_conf=True)
chan_names = list(row_channels(first).keys())
temp_chan = next((c for c in chan_names if c.startswith("T")), None)
header = ["iso_time", "epoch", "t_s", "sweep", "point", "dc"] + chan_names
write_header = True
if append:
try:
write_header = os.path.getsize(fn) == 0
except OSError:
write_header = True
f = open(fn, "a" if append else "w", newline="")
w = csv.writer(f)
if write_header:
w.writerow(header)
f.flush()
print(f"fet_temp_log: {len(dcs)} points/sweep, dc "
f"{dc_start:#x}..{dc_stop:#x} step {dc_step:#x}; channels "
f"{chan_names}; temp={temp_chan or 'NONE'} -> {fn}",
file=sys.stderr)
if temp_chan is None:
print("fet_temp_log: No temperature channel in the ADC "
"Only the I-V curve will be logged.",
file=sys.stderr)
all_sweeps = []
t0 = time.time()
n = 0
try:
while True:
if sweeps is not None and n >= sweeps:
break
if duration is not None and (time.time() - t0) >= duration:
break
sweep_rows = []
for point, dc in enumerate(dcs):
self.poke(gate, dc)
time.sleep(settle)
self.cmd("A!")
time.sleep(adc_settle)
with reader():
adc_rows = self.adc(start=False)
now = time.time()
chans = row_channels(adc_rows)
w.writerow([
time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(now)),
f"{now:.3f}", f"{now - t0:.3f}", n, point, dc,
] + [chans.get(name, "") for name in chan_names])
f.flush()
sweep_rows.append(dict(epoch=now, t_s=now - t0, sweep=n,
point=point, dc=dc, channels=chans))
drain = chans.get("Drain1", chans.get("Drain2", "?"))
temp = chans.get(temp_chan, "?") if temp_chan else "n/a"
print(f" sweep {n} pt {point:3d} dc={dc:#06x} "
f"gate={chans.get('Gate', '?')} drain={drain} "
f"T={temp}", file=sys.stderr)
all_sweeps.append(sweep_rows)
n += 1
self.poke(gate, clamp_dc(idle_dc))
if sweeps is not None and n >= sweeps:
break
if duration is not None and (time.time() - t0) >= duration:
break
if interval > 0:
time.sleep(interval)
except KeyboardInterrupt:
print("\nfet_temp_log: Interrupted. Closing file.", file=sys.stderr)
finally:
self.poke(gate, clamp_dc(idle_dc))
f.flush()
f.close()
if devnull is not None:
devnull.close()
print(f"fet_temp_log: Wrote {sum(len(s) for s in all_sweeps)} rows in "
f"{len(all_sweeps)} sweep(s) to {fn}", file=sys.stderr)
return all_sweeps
"""
Usage example: fet_temp_log("data.csv", dc_step=0x40, duration=3600, interval=60)
"""
def flash(self, op=None, abort=False, buf=False, **aa):
if op is None:
c = "F"
if abort:
c = "F!"
if buf:
c += "<"
return self.cmd(c)
r = self.cmd("F", flash_cmd.cmd_buffer(op, **aa))
if r[1]:
raise flash_cmd.Flash_Error(op)
return r
def write2fbuffer(self, d, op="WriteBuffer2"):
for i in range(0, 512, 16):
b = i>>4
if not b:
c = "B0@%@"
elif not (b&3):
c = "B0@%"
elif b==31:
c = "B3%!"
else:
c = f"B{b&3}%"
self.cmd(c, d=d[i:i+16])
if (b&3) == 3:
if b==31:
s = 80
else:
s = 64
self.flash(op, byte=i & 0x1c0, size=s)[1]
self.wait_for_spi()
def wait_for_spi(self):
while True:
r = self.cmd("F")[1]
if not r:
break
print(f"SPI busy {r:02x}", file=sys.stderr)
def readfbuffer(self, op="ReadBuffer2"):
d = b''
for i in range(0, 512, 16):
b = i>>4
if (b&3) == 0:
if b==28:
s = 80
else:
s = 64
self.flash(op, byte=i, size=s)
self.wait_for_spi()
d += self.cmd(f"B{b&3}<!")[2]
d += self.cmd(f"B4<!")[2]
self._last_page = d
if min(d) == 255:
print(f"FLASH: page is erased")
elif bch.check_page(d):
if not bch.galois:
raise bch.BCHError("galois module not loaded for Parity Error correction")
d = bch.fix_page(d)
return d[:512]
def read_flash(self, page):
print(f"FLASH: reading from {page=}")
self.flash("Transfer2", page=page)
self.flash_status(True)
return self.readfbuffer()
def read_flash2file(self, fn, page, n):
with open(fn, "wb") as f:
while n > 0:
f.write(self.read_flash(page))
page += 1
n -= 1
def flash_status(self, blocking=False):
r = (0,0,bytes(2))
while not r[2][0] & 0x80:
self.flash("Status", what="cmdbuf")
r = self.cmd("F<")
while r[1]:
r = self.cmd("F<")
if not blocking:
break
return tuple(r[2][:2])
def write_flash(self, page, d):
print(f"FLASH: writing to {page=}")
self.write2fbuffer(d)
self.flash("Program2", page=page)
self.flash_status(True)
def write_file2flash(self, page, fn):
n = 0
with open(fn, "rb") as f:
while True:
b =f.read(512)
if not b:
break
if len(b)<512:
b += bytes(512-len(b))
self.write_flash(page+n, b)
n += 1
return n
def erase_flash_sector(self, page, op="EraseSector"):
"""
op="EraseSector" (default)
page=0: pages 0 … 7
page=8: pages 8 … 0xff
page=0x100: pages 0x100 … 0x1ff
op="ErasePage", page=n
op="EraseBlock", page=n [n&~7 … n|7]
op="EraseChip"
"""
self.flash(op, page=page)
self.flash_status(True)
def flash_power(self, on=True):
op = "PowerUp" if on else "PowerDown"
self.flash(op)
def flash_Id(self):
self.flash("Id", what="cmdbuf")
while True:
r = self.cmd("F<")
if not r[1]:
break
i = r[2][:5]
print(f"FLASH chip {b2hex(i)}", file=sys.stderr)
return i
def memsetfbuffer(self, byte=0, what=0xff, size=528, op=0x0887):
for i in range(byte, byte+size, 255):
s = byte+size - i
if s > 255:
s = 255
self.flash(op, size=s, what=what, byte=i)
self.wait_for_spi()
if tty:
tty = dose_cmd(tty, baud)
tty._export(globals())
tty._verbose = False
uart.set_prompt("TurboD")
def b2hex(b, sep=" "):
return sep.join([f"{x:02x}" for x in b])