Compare commits

...

6 commits

Author SHA1 Message Date
Stephan I. Böttcher
6f80ed5d64 uart_cks: clear after send_cks() 2024-10-05 20:40:42 +02:00
Stephan I. Böttcher
6d49e5b420 uart_tx: s/call/rcall/ 2024-10-05 20:39:52 +02:00
Stephan I. Böttcher
c5ae9b885c cmdsocket: fix cmder.open() 2024-10-05 20:38:31 +02:00
Stephan I. Böttcher
50669bd76d cmd.py: help docu typo 2024-10-05 20:37:42 +02:00
Stephan I. Böttcher
79ebe18db0 cmd.py: ttycmd, … 2024-10-05 17:53:24 +02:00
Stephan I. Böttcher
0f424efdaa new script cmd.py
use cmdcocket to send commands to turbo.py
2024-10-05 12:47:52 +02:00
4 changed files with 482 additions and 12 deletions

View file

@ -496,9 +496,10 @@ int main()
send_calib_adc(i);
send_eol();
}
if (config.send & (SEND_ADC_VOLT|SEND_ADC_HEX))
if (config.power & POWER_LINE)
if (config.power & POWER_LINE) {
send_cks();
uart_cks = 0;
}
}
command();
@ -563,8 +564,6 @@ int main()
continue;
}
uart_cks = 0;
if (config.send & SEND_CONFIG && !config_clock--) {
send_config = 1;
config_clock = config.confp;

448
src/cmd.py Executable file
View file

@ -0,0 +1,448 @@
#! /usr/bin/python3
import sys, time, struct
import cmdsocket
class turbocmd(cmdsocket.cmder):
pace_delay = 0.1
pace_next = 0
def pace(self):
t = time.time()
if t < self.pace_next:
time.sleep(self.pace_next - t)
self.pace_next = t+self.pace_delay
def set_pace(seld, d):
self.pace_delay = d
def clock(self, t=None):
if t is None:
t = time.time()
tt = struct.unpack("4B", struct.pack("<L", int(t)))
self.write(b"K"+b"".join([b"%02x" % ttt for ttt in tt]))
SEND = {
"CONFIG": 0x01,
"BATED": 0x02,
"BATEW": 0x04,
"CLOCK": 0x08,
"CALIB": 0x10,
"ADC_HEX": 0x20,
"ADC_VOLT": 0x40,
"ADC": 0x60,
"DEBUG": 0x80,
}
POWER = {
"DOWN": 0x01,
"DOWN_CLI": 0x02,
"STOP_MCLK": 0x04,
"LED": 0x08,
"STDBY": 0x10,
"RX": 0x20,
"RF": 0x40,
"LINE": 0x80,
}
TRIGGER = {
"ONCE": 0x01,
"CONT": 0x02,
"UART": 0x04,
"CLOCK": 0x08,
"BREAK": 0x10,
"IMMED": 0x20,
}
UART = {
"TX": 0x40,
"RX": 0x80,
"SFD": 0x10,
"ODME": 0x08,
"CLKX2": 0x02,
"GAUTO": 0x04,
"LAUTO": 0x06,
}
REGS = {
"magic": (0x00, 1,),
"version": (0x01, 1,),
"triggers": (0x02, 1, TRIGGER),
"send": (0x03, 1, SEND),
"power": (0x04, 1, POWER),
"calib_test": (0x05, 1,),
"spi_div": (0x06, 1,),
"mclk_delay": (0x07, 1,),
"period": (0x08, 1,),
"confp": (0x09, 1,),
"cpu_clk": (0x0a, 1,),
"mclk_period": (0x0b, 1,),
"baud_div": (0x0c, 2,),
"uart_mode": (0x0e, 1, UART),
"pit_period": (0x0f, 1,),
"immediate": (0x10, 1,),
"pad": (0x11, 7,),
"line_preable": (0x18, 7,),
"zero": (0x1f, 1,),
}
def fconf(self, idx):
for k in self.REGS:
if self.REGS[k][0] == idx:
return self.REGS[k]
num_base = 0
def number(self, a, idx=None, kw=True):
"""return an index, value or bit
None, idx a config index symbol
-1, val a number
0, val a bit value symbol
bit values are searched for the provided index, or the next
if no index is provided, any bit value may be returned
"""
try:
return (-1, int(a, self.num_base))
except ValueError:
if not kw:
raise
if a.lower() in self.REGS:
return (None, self.REGS[a.lower()][0])
a = a.upper()
try:
return (idx, self.fconf(idx)[2][a])
except:
pass
if idx is not None:
idx += 1
return (idx, self.fconf(idx)[2][a])
for v in self.REGS.values():
try:
return (v[0], v[2][a])
except:
pass
# raise
return (None, self.REGS[a][0])
def config(self, cmd, magic, reg, *a):
keywords = cmd != b'E'
ii, idx = self.number(reg, kw=keywords)
b = [magic, idx]
partial = False
if ii is not None and ii >= 0:
b[1:1] = [ii]
partial = True
idx = ii
elif not a:
if cmd==b'C':
aa = self.find_symbol("config")
if cmd==b'U':
aa = self.find_symbol("USERROW")
if cmd==b'E':
aa = self.find_symbol("EEPROM")
self.write(b'M%04x' % (aa+idx))
return
for aa in a:
try:
ii, bb = self.number(aa, idx, kw=keywords)
except Exception as e:
raise ValueError(f"{aa}: invalid config item\n{e}")
if ii==idx:
# got a bit for this config byte
if not partial:
b.append(bb)
partial = True
else:
b[-1] |= bb
continue
if ii is None:
# got a config index
# those need to be consecutive
if bb == idx:
continue
if bb != idx+1:
raise IndexError(f"{aa} nonconsecutive configs")
if not partial:
b.append(0)
partial = False
idx = bb
continue
if ii == idx+1:
# got a bit for the next config byte
if not partial:
b.append(0)
b.append(bb)
partial = True
idx = ii
continue
if ii < 0:
# got a number. Either a byte or a word
b.append(bb & 0xff)
s = 1
if keywords:
try:
s = self.fconf(idx)[1]
except:
pass
if s==2:
b.append(bb >> 8)
if s==4:
b.append((bb >> 8) & 0xff)
b.append((bb >> 16) & 0xff)
b.append((bb >> 24) & 0xff)
idx += s
partial = False
self.write(cmd + b"".join([b"%02x" % (bb&0xff) for bb in b]))
def mem(self, a, v=None):
aa = a.split("+", 1)
a = self.find_symbol(aa[0])
if len(aa)==2:
a += int(aa[1], self.num_base)
if v is None:
self.write(b'M%04x' % a)
return
v = int(v, self.num_base)
self.write(b'W%04x %02x' % (a & 0xffff, v & 0xff))
def value(self, cmd, v):
v = int(v, self.num_base)
self.write(cmd + b'%02x' % (v & 0xff))
# ATtiny424 IO
symbols = {
"VPORTA": 0x0000,
"VPORTB": 0x0004,
"VPORTC": 0x0008,
"RSTCTRL": 0x0040,
"SLPCTRL": 0x0050,
"CLKCTRL": 0x0060,
"BOD": 0x0080,
"VREF": 0x00A0,
"WDT": 0x0100,
"CPUINT": 0x0110,
"CRCSCAN": 0x0120,
"RTC": 0x0140,
"EVSYS": 0x0180,
"CCL": 0x01C0,
"PORTA": 0x0400,
"PORTB": 0x0420,
"PORTC": 0x0440,
"PORTMUX": 0x05E0,
"ADC0": 0x0600,
"AC0": 0x0680,
"USART0": 0x0800,
"USART1": 0x0820,
"TWI0": 0x08A0,
"SPI0": 0x08C0,
"TCA0": 0x0A00,
"TCB0": 0x0A80,
"TCB1": 0x0A90,
"SYSCFG": 0x0F00,
"NVMCTRL": 0x1000,
"SIGROW": 0x1100,
"FUSE": 0x1280,
"LOCK_BIT": 0x128A,
"USERROW": 0x1300,
"EEPROM": 0x1400,
}
def find_symbol(self, s):
try:
return int(s, self.num_base)
except:
pass
if s not in self.symbols:
if not self.elf_read:
self.read_elf()
return self.symbols[s]
elf = "bate.elf"
elf_read = False
def read_elf(self):
self.elf_read = True
import subprocess
r = subprocess.run(["/usr/bin/nm", self.elf], text=True, stdout=subprocess.PIPE)
for l in r.stdout.split("\n"):
ll = l.split()
if len(ll)==3 and ll[1] in "BD":
self.symbols[ll[2]] = int(ll[0], 16) & 0xffff
ADC_MODE = {"DIFF": 0x51, "NORM": 0xd1}
ADC_REF = {"VDD": 0x50, "1V": 0x54, "2V": 0x55, "2.5V": 0x56, "4V": 0x57}
ADC_INP = {"GND": 0x30, "VDD": 0x31, "TEMP": 0x32, "RFP": 4, "NTC": 6, "BAT": 7}
ADC_CONF = {
"mode": (0, 1, ADC_MODE),
"ref": (1, 1, ADC_REF),
"inp": (2, 1, ADC_INP),
"inn": (3, 1, ADC_INP),
"offset": (4, 1, {}),
"shifts": (5, 1, {}),
"calib": (6, 2, {}),
}
def adc_conf(self, reg, *a):
"1 [mode] NORM [ref] 1V [inp] NTC [inn] RFP"
n = int(reg, 0)
r = None
rr = self.ADC_CONF["mode"]
b = []
for aa in a:
print(aa,n,r,rr,b)
if aa in self.ADC_CONF:
r = self.ADC_CONF[aa]
if not b:
rr = r
elif r[0] != rr[0]+len(b):
raise ValueError(f"{aa} adc registers must be consecutive")
continue
if r is None:
for rrr in self.ADC_CONF.values():
if rrr[0] == rr[0]+len(b):
r = rrr
break
if r is None:
raise ValueError(f"{aa} adc register undefined")
try:
i = int(aa, self.num_base)
except:
i = r[2][aa]
b.append(i & 0xff)
if r[1] == 2:
b.append(i >> 8)
r = None
if not b:
a = self.symbols["EEPROM"] + 8*n + rr[0]
self.write(b'M%04x' % (a & 0xffff))
return
b[:0] = [0x9d, 8*n + rr[0]]
self.write(b'E' + b"".join([b"%02x" % (bb & 0xff) for bb in b]))
class ttycmd(turbocmd):
def open(self, path=None):
if path == '-':
path = None
self.path = path
def close(self):
pass
def write(self, m):
self.pace()
m = self.format(m)
if self.path:
with open(self.path, "wb") as f:
f.write(m)
else:
sys.stdout.write(m.decode())
def usage():
print(sys.argv[0], """{options} {args}
format and send commands to the turbo weather station
-h print this help
-q be quiet
-v be verbose, echo commands to stderr
-s «path» open tty or socket
-x args are hexadecimal
-b «base» args are in base «base»
-p prefix messages with newline
-d «delay» (float) seconds to wait between messages
-R reboot the µC
-K set clock
-M «addr» read a byte
-T «n» do «n» immediate triggers
-D «n» do «n» testdata triggers
-W «» write a byte
-C «» write config
-U «» write userrow config
-E «» write to EEPROM
-A «» write/read ADC config in EEPROM
a single command option requiring args «» must be last.
config write commands are limitted to eight bytes, including address and magic
-C «reg» [«value»|{«bits»}]
-A «idx» «reg» «value»
config registers may be numbers or symbols
config values may be symbolic «bits» or numbers
adc registers are lowercare symbols
adc values may be numbers or symbols
values must match the consecutive sequence of registers
redundant matching register names may be interleaved
a config bit symbol may refer to the following register
a config bit symbol may implicitly select the inital register
""", file=sys.stderr)
def main(argv):
import getopt
options, args = getopt.getopt(argv, "hxb:pvqs:d:RKC:U:E:M:W:T:D:A:")
path = None
for o,v in options:
if o=="-s":
path = v
if o=="-h":
usage()
return
if path == '-':
path = None
if path is None or path.startswith('/dev/tty'):
cmd = ttycmd(path)
else:
cmd = turbocmd(path)
last_cmd = None
last_args = []
for o,v in options:
if last_cmd:
raise ValueError(f"{o}: command option after long command")
if o=="-x":
cmd.num_base = 16
if o=="-b":
cmd.num_base = int(v)
if o=="-p":
cmd.bol = b'\n'
if o=="-q":
cmd.verbose = False
if o=="-v":
cmd.verbose = True
if o=="-d":
cmd.set_pace(float(v))
if o=="-K":
cmd.clock()
if o=="-C":
last_cmd = cmd.config
last_args = [b'C', 0xba]
if o=="-U":
last_cmd = cmd.config
last_args = [b'U', 0x9d]
if o=="-E":
last_cmd = cmd.config
last_args = [b'E', 0x9d]
if o=="-M":
cmd.mem(v, None)
if o=="-W":
last_cmd = cmd.mem
if o=="-R":
cmd.value(b'R', 0xd8)
if o=="-D":
cmd.value(b'D', v)
if o=="-T":
cmd.value(b'T', v)
if o=="-A":
last_cmd = cmd.adc_conf
if v and last_cmd:
last_args.append(v)
last_args.extend(args)
if last_cmd:
last_cmd(*last_args)
if __name__=="__main__":
main(sys.argv[1:])

View file

@ -89,22 +89,41 @@ class cmder(cmd_socket):
raise ValueError("socket is not closed")
self.s = socket.socket(socket.AF_UNIX)
self.s.connect(path)
self.path = path
def write(self, m, eol=b'\n'):
eol = b'\n'
bol = b''
verbose = True
def format(self, m):
if isinstance(m, str):
m = m.encode()
if eol and m[-1:] != eol:
m += eol
if self.eol and m[-1:] != self.eol:
m += self.eol
if self.bol and m[:1] != self.bol:
m = self.bol + m
if self.verbose:
print(f"{self.path}.send({m})", file=sys.stderr)
return m
def write(self, m):
self.pace()
m = self.format(m)
if self.s:
self.s.send(m)
def pace(self):
pass
def main():
import getopt, time
options, messages = getopt.gnu_getopt(sys.argv[1:], "s:d:rfn")
options, messages = getopt.gnu_getopt(sys.argv[1:], "s:d:rfnN")
path = ()
delay = 0.1
receiver = False
force = False
eol = b'\n'
bol = None
for o,v in options:
if o=="-s":
path = (v,)
@ -116,6 +135,8 @@ def main():
force = True
if o=="-n":
eol = None
if o=="-N":
bol = b'\n'
if receiver:
s = cmd_receiver(*path, blocking=True, force=force)
@ -126,10 +147,12 @@ def main():
s.close()
return
s = cmder(*path)
s.eol = eol
s.bol = bol
for m in messages:
if delay:
time.sleep(delay)
s.write(m, eol)
s.write(m)
if __name__=="__main__":
main()

View file

@ -141,7 +141,7 @@ command:
mov r18, r20
1:
ld r22, X+
call send_char22
rcall send_char22
subi r18, 1
brne 1b
ldi r24, lo8(uart_rx)