Compare commits

...

5 commits

Author SHA1 Message Date
Stephan I. Böttcher
fc87dd7ead turbo.py: fix, flush
fix baudrate= parameter
flush out when tty
2024-09-22 22:52:28 +02:00
Stephan I. Böttcher
5953504546 add script turbo.py
The scripts reads from stdin, files or a tty.
Verifies the chhecksums.
Computes natual values.

TODO: more parsers
TODO: add socket to send commands to the tty.
2024-09-22 22:42:27 +02:00
Stephan I. Böttcher
3ef27fe078 new module linear_regression.py 2024-09-22 22:41:26 +02:00
Stephan I. Böttcher
358a52ee9b new module ntc.py 2024-09-22 22:39:40 +02:00
Stephan I. Böttcher
c78ee1c5fc message V was ambiguous 2024-09-22 22:38:58 +02:00
4 changed files with 420 additions and 1 deletions

View file

@ -450,7 +450,7 @@ int main()
uint8_t reset_source = RSTCTRL.RSTFR; uint8_t reset_source = RSTCTRL.RSTFR;
RSTCTRL.RSTFR = reset_source; RSTCTRL.RSTFR = reset_source;
send_str("\nV Turbo Weather V0.7\nB "); send_str("\nB Turbo Weather V0.9 ");
send_hex_byte(reset_source); send_hex_byte(reset_source);
send_eol(); send_eol();

193
src/linear_regression.py Executable file
View file

@ -0,0 +1,193 @@
#! /usr/bin/python3
import numpy
class linreg:
def __init__(self, p=1, ny=1, xoff=0, yoff=0, decay=None, xdecay=None):
self.P = p
self.p = numpy.arange(2*p+1)
self.x = numpy.zeros((2*p+1,))
self.y = numpy.zeros((ny, p+1))
self.yy = numpy.zeros((ny,))
self.w = None
self.xoff = xoff
self.yoff = yoff
self.decay = decay
self.xdecay = xdecay
self.N = 0
Ai = numpy.arange(p+1)
self.Ai = Ai+Ai.reshape((-1,1))
if xoff is True:
self.Bi = self.pascal()
def pascal(self):
p = self.p.shape[0]
Bi = numpy.zeros((p,p,p), dtype=int)
for n in range(p):
Bi[n,0,n] = 1
Bi[n,n,0] = 1
for n in range(1,p):
for k in range(1,n):
Bi[n,k,n-k] = Bi[n-1,k-1,n-k] + Bi[n-1,k, n-k-1]
return Bi
def add(self, x, y, w=1.0, decay=None):
y = numpy.array(y, dtype=float).reshape((-1,))
if decay is None:
if self.N and self.xdecay is not None:
decay = 1 - numpy.exp((self.lastx - x)/self.xdecay)
else:
decay = self.decay
self.lastx = x
if decay is not None:
self.x *= 1 - decay
self.y *= 1 - decay
self.yy *= 1 - decay
if self.xoff is True:
if self.N:
self.transform_x(x - self.x_origin)
self.x_origin = x
if self.yoff is True:
if self.N:
self.transform_y(y - self.y_origin)
self.y_origin = y
else:
self.y[:, 0] += w * y
self.yy += w * y*y
self.x[0] += w
self.N += 1
return
self.N += 1
x -= self.xoff
y -= self.yoff
x = w * numpy.power(x, self.p)
self.x += x
self.y += y[:,numpy.newaxis] * x[:self.P+1]
self.yy += w * y*y
def solve(self):
if self.N <= self.P:
return
try:
return numpy.linalg.solve(self.x[self.Ai], self.y[...,numpy.newaxis])[...,0]
except:
print(self.x[self.Ai], self.y)
raise
def solve_p(self, p):
if self.N <= p or p > self.P:
return
Ai = self.Ai[:p+1, :p+1]
y = self.y[:, :p+1, numpy.newaxis]
return numpy.linalg.solve(self.x[Ai], y)
def transform_x(self, x):
p = numpy.power(-x, self.p).reshape((-1,1))
Bi = self.Bi @ p
xx = self.x[numpy.newaxis,:] @ Bi
self.x = xx[:,0,0]
yy = self.y[:,numpy.newaxis,numpy.newaxis,:] @ Bi[:self.P+1,:self.P+1]
self.y = yy[...,0,0]
def transform_y(self, y):
self.yy += self.x[0]*y*y - 2*y*self.y[:,0]
self.y -= y * self.x[:self.P+1]
def print_result(self, r, fmt="%.4g"):
if r is None:
return
dx = self.xoff
dy = self.yoff
if dx is True:
dx = self.x_origin
if dy is True:
dy = self.y_origin
r[:,0] += dy
for i, rr in enumerate(r):
print(i, fmt % dx, " ".join([fmt % rrr for rrr in rr]))
def strtonum(s):
try:
return float(s)
except:
pass
return int(s, 0)
def main():
import sys, getopt, fileinput
options, files = getopt.gnu_getopt(sys.argv[1:], "p:n:x:y:w:d:X:Y:D:T:I")
p = 1
ny = 1
xoff = 0
yoff = 0
decay = None
xdecay = None
ix = 0
iy = 1
iw = None
id = None
w = 1.0
d = None
I = False
for o, v in options:
if o=="-p":
p = int(v)
if o=="-n":
ny = int(v)
if o=="-X":
if v=="R":
xoff = True
else:
xoff = int(v)
if o=="-Y":
if v=="R":
xoff = True
yoff = True
else:
yoff = int(v)
if o=="-D":
decay = float(v)
if o=="-T":
xdecay = float(v)
if o=="-x":
ix = int(v)
if o=="-y":
iy = int(v)
if o=="-w":
iw = int(v)
if o=="-d":
id = int(v)
if o=="-I":
I = True
LR = linreg(p=p, ny=ny, xoff=xoff, yoff=yoff, decay=decay, xdecay=xdecay)
for l in fileinput.input(files):
ll = l.split()
try:
x = strtonum(ll[ix])
y = numpy.array([strtonum(lll) for lll in ll[iy:iy+ny]])
if iw is not None:
w = strtonum(ll[iw])
if id is not None:
d = strtonum(ll[id])
except:
continue
LR.add(x, y, w, d)
if I:
print_result(LR.solve())
if not I:
print_result(LR.solve())
if __name__=="__main__":
main()

53
src/ntc.py Normal file
View file

@ -0,0 +1,53 @@
import math
class ntc:
"""Calculate Temperature from NTC readings
Negative Temperature Coefficient thermistors.
The resitance follows the law of Arrhenius
W =
R = R₂₅ exp(β/T₂₅) exp(-β/T)
The NTC is biases via resistor R₁ from Voltage V₁.
"""
β = 3695.0 # K
T25 = 298.16 # K
T0 = 273.16 # K
R25 = 10 # kΩ
R1 = 10 # kΩ
V1 = 1
res = 1/0x1000
log = math.log
def Rntc(self, a, V1=None):
"""return NTC Resistance from ADC reading
a: ADC reading,
V1: full scale reading
"""
if not V1:
V1 = self.V1
a /= V1
if a < self.res:
a = 1
if a > 1 - self.res:
a = 1 - self.res
return self.R1 / (1/a - 1)
def TntcR(self, R):
"""return Temperature from NTC Resistance"""
return self.β / (self.log(R/self.R25) + self.β/self.T25) - self.T0
def Tntc(self, a, V1=None):
"""return Temperature from ADC reading"""
return self.TntcR(self.Rntc(a, V1))
def TntcB(self, a, b):
"""Bridge mode
a: bridge voltage (ADC_T - ADC_V)
b: reference branch voltage (ADC_V)
see `turbo.sch`
"""
return self.Tntc(a + b, b * (1 + self.R1/self.R25))

173
src/turbo.py Executable file
View file

@ -0,0 +1,173 @@
#! /usr/bin/python3
import sys, time, getopt, serial, fileinput
import pressure, ntc, linear_regression
options, files = getopt.gnu_getopt(sys.argv[1:], "xF:C", ["tty=", "noise", "clock"])
tty = None
out = sys.stdout
do_noise = False
do_clock = False
for o,v in options:
if o in "-F --tty":
if tty:
raise ValueError("can only do one tty")
tty = v
do_clock = True
if o in "-C --clock":
do_clock = True
if o in "-x --noise":
do_noise = True
if tty and files:
raise ValueError("cannot do tty and files")
if len(files)==1:
if "/dev/tty" in files[0]:
tty = files[0]
if tty:
inp = serial.Serial(port=tty, baudrate=2400)
else:
inp = fileinput.input(files, mode="rb")
checksum = 0
def add_checksum(line):
global checksum
checksum += sum(line)
# noise(line, "s")
def noise(line, prefix="x"):
print(prefix, repr(line), file=out)
return False
def echo(line, *a):
try:
print(line.strip().decode(), *a, file=out)
return True
except:
pass
return noise(line)
def check_sum(line):
global checksum
try:
rcs = int(line.strip()[1:], 16)
except:
return noise(line)
if rcs > 0xff:
return noise(line)
add_checksum(line[:1])
lcs = checksum & 0xff
checksum = 0
if lcs==rcs:
echo(line, "")
emit()
else:
echo(line, f"{lcs:02x}", "Checksum Error")
return False
Values = {}
NTC = ntc.ntc()
def voltages(line):
ll = line.split()
try:
vv = {
"Tcpu": float(ll[1]),
"Vcpu": float(ll[2]),
"Vbat": float(ll[3]),
"dVntc": float(ll[4]),
"Vntc": float(ll[5]),
"cVntc": int(ll[6], 16),
"Vrf": float(ll[7]),
"cVrf": int(ll[8], 16),
}
except:
return noise(line)
Values.update(vv)
emit_voltages(**vv)
return echo(line)
def emit_voltages(Tcpu, Vcpu, Vbat, dVntc, Vntc, cVntc, Vrf, cVrf):
Tntc = NTC.TntcB(dVntc, Vrf)
Tntc2 = NTC.TntcB(cVntc-cVrf, cVrf)
print(f"v {Tcpu:.1f}°C"
f" {Tntc:.2f}°C"
f" Bat {Vbat:.3f}V"
f" Vcc {Vcpu:.3f}V"
f" Vrf {Vrf/500:.3f}V",
file=out)
freq = linear_regression.linreg(p=2, xoff=True, yoff=True, xdecay=600)
def clock(line):
try:
c = int(line.split()[1], 16)
except:
return noise(line, "t")
s = None
if do_clock:
t = time.time()
freq.add(t, c)
s = freq.solve()
if s is None:
return echo(line)
return echo(line, f"{c} {t:.1f}", *("%.4g" % ss for ss in s[0]))
def emit():
pass
processes = {
b'A': echo,
b'B': echo,
b'C': echo,
b'D': echo,
b'E': echo,
b'F': echo,
b'P': echo,
b'Q': check_sum,
b'R': echo,
b'S': echo,
b'T': clock,
b'U': echo,
b'V': voltages,
b'W': echo,
b'X': echo,
}
while True:
try:
line = inp.readline()
except KeyboardInterrupt:
break
if not line:
break
line_key = line[0:1]
is_noise = line_key not in processes
if 0 in line:
is_noise = True
if is_noise:
if do_noise:
noise(line)
continue
if processes[line_key](line):
add_checksum(line)
if tty:
out.flush()