510 lines
15 KiB
Python
510 lines
15 KiB
Python
#! /usr/bin/ipython3 --profile=turbo_dose
|
|
|
|
import sys, time, getopt, fileinput, struct, csv, os, contextlib
|
|
import uart
|
|
from base85 import base85_encode, base85_decode
|
|
from map import memmap
|
|
from iotn424 import SFR
|
|
import flash as flash_cmd
|
|
flash_cmd = flash_cmd.flash_cmd()
|
|
sys.path[1:1] = ["./bch4369"]
|
|
from bch4369 import bch
|
|
|
|
options, files = getopt.gnu_getopt(sys.argv[1:], "F:o:M:", ["debug", "tty=", "output=", "map=", "galois"])
|
|
|
|
tty = None
|
|
baud = 115200
|
|
out = None
|
|
debug = None
|
|
map_fn = "hallo.map"
|
|
|
|
def Debug(e, *a):
|
|
if debug:
|
|
import traceback
|
|
sys.stdout.flush()
|
|
print("xdebug", a, repr(e), file=sys.stderr)
|
|
traceback.print_exception(e, limit=-2, file=sys.stderr)
|
|
|
|
for o,v in options:
|
|
|
|
if o == "--debug":
|
|
debug = True
|
|
|
|
if o in "-F --tty":
|
|
tty = v
|
|
v = v.split(",", 1)
|
|
if v[1:]:
|
|
tty = v[0]
|
|
baud = int(v[1])
|
|
do_clock = True
|
|
|
|
if o in "--output":
|
|
if out:
|
|
raise ValueError("cannot have multiple --outputs")
|
|
if v=="-":
|
|
out = sys.stdout
|
|
elif v=="--":
|
|
out = sys.stderr
|
|
else:
|
|
out = open(v, "a")
|
|
|
|
if o in "-M --map":
|
|
map_fn = v
|
|
|
|
if o == "--galois":
|
|
bch.load_galois()
|
|
|
|
if not out:
|
|
out = sys.stdout
|
|
|
|
if len(files)==1:
|
|
if "/dev/tty" in files[0]:
|
|
tty = files[0]
|
|
files = []
|
|
|
|
mmap = memmap(map_fn)
|
|
|
|
class dose_cmd(uart.uart):
|
|
|
|
def cmd(self, c, d=None, timeout=0.2):
|
|
if not isinstance(c, bytes):
|
|
c = c.encode()
|
|
if d:
|
|
c += b" " + base85_encode(d)
|
|
self.flush()
|
|
self.ucmd(c)
|
|
r = self.resp(timeout)
|
|
while r and (r[0] != b'#'[0] or r[1] != c[0]):
|
|
r = self.resp(timeout)
|
|
d = None
|
|
if not r:
|
|
return r, d
|
|
r = r.split()
|
|
e = int(r[-1], 16)
|
|
if len(r)==3:
|
|
d = base85_decode(r[1])
|
|
if self._verbose:
|
|
print(f"{r[0]}[{r[2]}] {"".join([f"{b:02x}" for b in d])}", file=sys.stderr)
|
|
self.last_cmd = c[0:1]
|
|
return r[0], e, d
|
|
|
|
def poke(self, a, d, s=None, ccp=0):
|
|
return self.peek(a, s=s, d=d, ccp=ccp)
|
|
|
|
def peek(self, a=None, s=None, d=None, ccp=0):
|
|
name = None
|
|
if a in SFR:
|
|
name = a
|
|
a, ss = SFR[a]
|
|
if s is None:
|
|
s = ss
|
|
if a in mmap:
|
|
a, ss = mmap[a]
|
|
if ss and s is None:
|
|
s = ss
|
|
if isinstance(a, str):
|
|
print(f"PEEK: unknown addr {a}", file=sys.stderr)
|
|
for k in mmap.keys():
|
|
if a in k:
|
|
print(f"PEEK: in RAM we have {k}", file=sys.stderr)
|
|
for k in SFR.keys():
|
|
if a in k:
|
|
print(f"PEEK: IO SFR we have {k}", file=sys.stderr)
|
|
raise ValueError(a)
|
|
FF = {1: "B", 2: "H", 4:"I"}
|
|
if a is None:
|
|
cc, nn, dd = self.cmd(b"M=")
|
|
else:
|
|
if d is None:
|
|
c = b"M"
|
|
d = b''
|
|
if s is None:
|
|
s = 12
|
|
else:
|
|
c = b'M!'
|
|
if isinstance(d, int) and s in FF:
|
|
d = struct.pack(f"<{FF[s]}", d)
|
|
else:
|
|
d = bytes(d)
|
|
if s is None:
|
|
s = len(d)
|
|
d = struct.pack("<HBB", a,s,ccp) + d + b'\0\0\0\0\0\0\0\0\0\0\0\0'
|
|
cc, nn, dd = self.cmd(c, d[:16])
|
|
if dd and nn:
|
|
aa,ss,cc = struct.unpack("<HBB", dd[:4])
|
|
print(f"PEEK: {aa-nn:04x}[{nn}+{ss}]: {b2hex(dd[4:4+nn])}", file=sys.stderr)
|
|
if name is not None and nn in FF:
|
|
d = struct.unpack(f"<{FF[nn]}", dd[4:4+nn])
|
|
print(f"{name}: {d[0]:#x}", file=sys.stderr)
|
|
return name, d
|
|
return cc, nn, dd
|
|
|
|
def more(self, flags=b''):
|
|
return self.cmd(self.last_cmd + b'=' + flags)
|
|
|
|
def read_mem(self, a, s=None):
|
|
cc, nn, dd = self.peek(a=a, s=s)
|
|
r = dd[4:4+nn]
|
|
while dd[2]:
|
|
cc, nn, dd = self.more()
|
|
r += dd[4:4+nn]
|
|
return r
|
|
|
|
_adc_conf = None
|
|
_sigrow = None
|
|
|
|
ADC_MUX = {
|
|
0: "GND",
|
|
6: "Gate",
|
|
7: "Drain1",
|
|
10: "Drain2",
|
|
0x30: "GND",
|
|
0x31: ("Vdd", 10, 0, 0),
|
|
0x32: "T",
|
|
0x33: "DACREF",
|
|
}
|
|
ADC_REF = {
|
|
0: (3.3, "VDD"),
|
|
4: (1.024,),
|
|
5: (2.048,),
|
|
6: (2.5,),
|
|
7: (4.096,),
|
|
}
|
|
|
|
def adc(self, start=False, read_conf=False):
|
|
if not self._adc_conf or read_conf:
|
|
self._adc_conf = self.read_mem("adc_conf")
|
|
self._sigrow = self.read_mem(0x1100, 0x24)
|
|
g = self._sigrow[0x20]
|
|
o = self._sigrow[0x21]
|
|
if o & 0x80:
|
|
o -= 0x100
|
|
o <<= 6
|
|
g /= 1 << 14
|
|
self.ADC_MUX[0x32] = ("T°C", g, o, 273)
|
|
c = "A!<" if start else "A<"
|
|
cc, nn, dd = self.cmd(c)
|
|
dd = struct.unpack("<8H", dd)
|
|
r = []
|
|
for i in range(nn):
|
|
pos = self._adc_conf[4*i+2] & 0x3f
|
|
ref = self._adc_conf[4*i+1] & 0x07
|
|
if ref in self.ADC_REF:
|
|
ref = self.ADC_REF[ref]
|
|
else:
|
|
ref = (1,)
|
|
a = dd[i]
|
|
if pos != 0x32:
|
|
a *= ref[0]/0x10000
|
|
if pos in self.ADC_MUX:
|
|
pos = self.ADC_MUX[pos]
|
|
else:
|
|
pos = f"Ain{pos}"
|
|
if isinstance(pos, tuple):
|
|
a = (a - pos[2])*pos[1] - pos[3]
|
|
pos = pos[0]
|
|
r.append((a, pos, ref))
|
|
print(f"{i} {pos:6s} {a:.4f} {ref!r}", file=sys.stderr)
|
|
return r
|
|
|
|
def nfet_scan(self, t=1, dcs=(0x1000,)):
|
|
r = []
|
|
for dc in range(*dcs):
|
|
self.poke("TCA0_SINGLE_CMP0", dc)
|
|
time.sleep(t)
|
|
self.cmd("A!")
|
|
time.sleep(1)
|
|
r.append((dc, self.adc()))
|
|
return r
|
|
|
|
def fet_temp_log(self, fn="fet_temp_log.csv",
|
|
dc_start=0, dc_stop=0x1000, dc_step=0x40,
|
|
settle=0.5, adc_settle=0.5,
|
|
duration=None, sweeps=None, interval=0.0,
|
|
idle_dc=0, gate="TCA0_SINGLE_CMP0",
|
|
quiet=True, append=False):
|
|
|
|
"""
|
|
Records FET characteristic curve with temperature over time to a CSV.
|
|
"""
|
|
|
|
if dc_step == 0:
|
|
raise ValueError("dc_step must not be 0")
|
|
if (dc_stop - dc_start) * dc_step <= 0:
|
|
raise ValueError(
|
|
f"dc_step sign does not move {dc_start:#x} -> {dc_stop:#x}")
|
|
if sweeps is not None and sweeps <= 0:
|
|
raise ValueError("Sweeps must be positive or None")
|
|
if sweeps is None and duration is None:
|
|
sweeps = 1
|
|
|
|
def clamp_dc(x):
|
|
x = int(round(x))
|
|
return 0 if x < 0 else 0xffff if x > 0xffff else x
|
|
|
|
dcs = []
|
|
dc = dc_start
|
|
while (dc < dc_stop) if dc_step > 0 else (dc > dc_stop):
|
|
dcs.append(clamp_dc(dc))
|
|
dc += dc_step
|
|
if not dcs:
|
|
raise ValueError("Empty sweep. Check dc_start/dc_stop/dc_step")
|
|
|
|
devnull = open(os.devnull, "w") if quiet else None
|
|
def reader():
|
|
return contextlib.redirect_stderr(devnull) if quiet \
|
|
else contextlib.nullcontext()
|
|
|
|
def row_channels(adc_rows):
|
|
d = {}
|
|
for val, pos, _ref in adc_rows:
|
|
name = str(pos)
|
|
if name in d:
|
|
k = 2
|
|
while f"{name}#{k}" in d:
|
|
k += 1
|
|
name = f"{name}#{k}"
|
|
d[name] = val
|
|
return d
|
|
|
|
with reader():
|
|
first = self.adc(start=True, read_conf=True)
|
|
chan_names = list(row_channels(first).keys())
|
|
temp_chan = next((c for c in chan_names if c.startswith("T")), None)
|
|
header = ["iso_time", "epoch", "t_s", "sweep", "point", "dc"] + chan_names
|
|
|
|
write_header = True
|
|
if append:
|
|
try:
|
|
write_header = os.path.getsize(fn) == 0
|
|
except OSError:
|
|
write_header = True
|
|
f = open(fn, "a" if append else "w", newline="")
|
|
w = csv.writer(f)
|
|
if write_header:
|
|
w.writerow(header)
|
|
f.flush()
|
|
|
|
print(f"fet_temp_log: {len(dcs)} points/sweep, dc "
|
|
f"{dc_start:#x}..{dc_stop:#x} step {dc_step:#x}; channels "
|
|
f"{chan_names}; temp={temp_chan or 'NONE'} -> {fn}",
|
|
file=sys.stderr)
|
|
if temp_chan is None:
|
|
print("fet_temp_log: No temperature channel in the ADC "
|
|
"Only the I-V curve will be logged.",
|
|
file=sys.stderr)
|
|
|
|
all_sweeps = []
|
|
t0 = time.time()
|
|
n = 0
|
|
try:
|
|
while True:
|
|
if sweeps is not None and n >= sweeps:
|
|
break
|
|
if duration is not None and (time.time() - t0) >= duration:
|
|
break
|
|
|
|
sweep_rows = []
|
|
for point, dc in enumerate(dcs):
|
|
self.poke(gate, dc)
|
|
time.sleep(settle)
|
|
self.cmd("A!")
|
|
time.sleep(adc_settle)
|
|
with reader():
|
|
adc_rows = self.adc(start=False)
|
|
now = time.time()
|
|
chans = row_channels(adc_rows)
|
|
w.writerow([
|
|
time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(now)),
|
|
f"{now:.3f}", f"{now - t0:.3f}", n, point, dc,
|
|
] + [chans.get(name, "") for name in chan_names])
|
|
f.flush()
|
|
sweep_rows.append(dict(epoch=now, t_s=now - t0, sweep=n,
|
|
point=point, dc=dc, channels=chans))
|
|
drain = chans.get("Drain1", chans.get("Drain2", "?"))
|
|
temp = chans.get(temp_chan, "?") if temp_chan else "n/a"
|
|
print(f" sweep {n} pt {point:3d} dc={dc:#06x} "
|
|
f"gate={chans.get('Gate', '?')} drain={drain} "
|
|
f"T={temp}", file=sys.stderr)
|
|
|
|
all_sweeps.append(sweep_rows)
|
|
n += 1
|
|
self.poke(gate, clamp_dc(idle_dc))
|
|
|
|
if sweeps is not None and n >= sweeps:
|
|
break
|
|
if duration is not None and (time.time() - t0) >= duration:
|
|
break
|
|
if interval > 0:
|
|
time.sleep(interval)
|
|
except KeyboardInterrupt:
|
|
print("\nfet_temp_log: Interrupted. Closing file.", file=sys.stderr)
|
|
finally:
|
|
self.poke(gate, clamp_dc(idle_dc))
|
|
f.flush()
|
|
f.close()
|
|
if devnull is not None:
|
|
devnull.close()
|
|
|
|
print(f"fet_temp_log: Wrote {sum(len(s) for s in all_sweeps)} rows in "
|
|
f"{len(all_sweeps)} sweep(s) to {fn}", file=sys.stderr)
|
|
return all_sweeps
|
|
|
|
"""
|
|
|
|
Usage example: fet_temp_log("data.csv", dc_step=0x40, duration=3600, interval=60)
|
|
|
|
"""
|
|
|
|
|
|
def flash(self, op=None, abort=False, buf=False, **aa):
|
|
if op is None:
|
|
c = "F"
|
|
if abort:
|
|
c = "F!"
|
|
if buf:
|
|
c += "<"
|
|
return self.cmd(c)
|
|
r = self.cmd("F", flash_cmd.cmd_buffer(op, **aa))
|
|
if r[1]:
|
|
raise flash_cmd.Flash_Error(op)
|
|
return r
|
|
|
|
def write2fbuffer(self, d, op="WriteBuffer2"):
|
|
for i in range(0, 512, 16):
|
|
b = i>>4
|
|
if not b:
|
|
c = "B0@%@"
|
|
elif not (b&3):
|
|
c = "B0@%"
|
|
elif b==31:
|
|
c = "B3%!"
|
|
else:
|
|
c = f"B{b&3}%"
|
|
self.cmd(c, d=d[i:i+16])
|
|
if (b&3) == 3:
|
|
if b==31:
|
|
s = 80
|
|
else:
|
|
s = 64
|
|
self.flash(op, byte=i & 0x1c0, size=s)[1]
|
|
self.wait_for_spi()
|
|
|
|
def wait_for_spi(self):
|
|
while True:
|
|
r = self.cmd("F")[1]
|
|
if not r:
|
|
break
|
|
print(f"SPI busy {r:02x}", file=sys.stderr)
|
|
|
|
def readfbuffer(self, op="ReadBuffer2"):
|
|
d = b''
|
|
for i in range(0, 512, 16):
|
|
b = i>>4
|
|
if (b&3) == 0:
|
|
if b==28:
|
|
s = 80
|
|
else:
|
|
s = 64
|
|
self.flash(op, byte=i, size=s)
|
|
self.wait_for_spi()
|
|
d += self.cmd(f"B{b&3}<!")[2]
|
|
d += self.cmd(f"B4<!")[2]
|
|
self._last_page = d
|
|
if min(d) == 255:
|
|
print(f"FLASH: page is erased")
|
|
elif bch.check_page(d):
|
|
if not bch.galois:
|
|
raise bch.BCHError("galois module not loaded for Parity Error correction")
|
|
d = bch.fix_page(d)
|
|
return d[:512]
|
|
|
|
def read_flash(self, page):
|
|
print(f"FLASH: reading from {page=}")
|
|
self.flash("Transfer2", page=page)
|
|
self.flash_status(True)
|
|
return self.readfbuffer()
|
|
|
|
def read_flash2file(self, fn, page, n):
|
|
with open(fn, "wb") as f:
|
|
while n > 0:
|
|
f.write(self.read_flash(page))
|
|
page += 1
|
|
n -= 1
|
|
|
|
def flash_status(self, blocking=False):
|
|
r = (0,0,bytes(2))
|
|
while not r[2][0] & 0x80:
|
|
self.flash("Status", what="cmdbuf")
|
|
r = self.cmd("F<")
|
|
while r[1]:
|
|
r = self.cmd("F<")
|
|
if not blocking:
|
|
break
|
|
return tuple(r[2][:2])
|
|
|
|
def write_flash(self, page, d):
|
|
print(f"FLASH: writing to {page=}")
|
|
self.write2fbuffer(d)
|
|
self.flash("Program2", page=page)
|
|
self.flash_status(True)
|
|
|
|
def write_file2flash(self, page, fn):
|
|
n = 0
|
|
with open(fn, "rb") as f:
|
|
while True:
|
|
b =f.read(512)
|
|
if not b:
|
|
break
|
|
if len(b)<512:
|
|
b += bytes(512-len(b))
|
|
self.write_flash(page+n, b)
|
|
n += 1
|
|
return n
|
|
|
|
def erase_flash_sector(self, page, op="EraseSector"):
|
|
"""
|
|
op="EraseSector" (default)
|
|
page=0: pages 0 … 7
|
|
page=8: pages 8 … 0xff
|
|
page=0x100: pages 0x100 … 0x1ff
|
|
…
|
|
op="ErasePage", page=n
|
|
op="EraseBlock", page=n [n&~7 … n|7]
|
|
op="EraseChip"
|
|
"""
|
|
self.flash(op, page=page)
|
|
self.flash_status(True)
|
|
|
|
def flash_power(self, on=True):
|
|
op = "PowerUp" if on else "PowerDown"
|
|
self.flash(op)
|
|
|
|
def flash_Id(self):
|
|
self.flash("Id", what="cmdbuf")
|
|
while True:
|
|
r = self.cmd("F<")
|
|
if not r[1]:
|
|
break
|
|
i = r[2][:5]
|
|
print(f"FLASH chip {b2hex(i)}", file=sys.stderr)
|
|
return i
|
|
|
|
def memsetfbuffer(self, byte=0, what=0xff, size=528, op=0x0887):
|
|
for i in range(byte, byte+size, 255):
|
|
s = byte+size - i
|
|
if s > 255:
|
|
s = 255
|
|
self.flash(op, size=s, what=what, byte=i)
|
|
self.wait_for_spi()
|
|
|
|
if tty:
|
|
tty = dose_cmd(tty, baud)
|
|
tty._export(globals())
|
|
tty._verbose = False
|
|
|
|
uart.set_prompt("TurboD")
|
|
|
|
def b2hex(b, sep=" "):
|
|
return sep.join([f"{x:02x}" for x in b])
|
|
|