mirror of
https://codeberg.org/SiB64/turbo_weather.git
synced 2026-06-28 23:49:51 +02:00
Compare commits
5 commits
c2823eded9
...
fc87dd7ead
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc87dd7ead | ||
|
|
5953504546 | ||
|
|
3ef27fe078 | ||
|
|
358a52ee9b | ||
|
|
c78ee1c5fc |
4 changed files with 420 additions and 1 deletions
|
|
@ -450,7 +450,7 @@ int main()
|
|||
|
||||
uint8_t reset_source = RSTCTRL.RSTFR;
|
||||
RSTCTRL.RSTFR = reset_source;
|
||||
send_str("\nV Turbo Weather V0.7\nB ");
|
||||
send_str("\nB Turbo Weather V0.9 ");
|
||||
send_hex_byte(reset_source);
|
||||
send_eol();
|
||||
|
||||
|
|
|
|||
193
src/linear_regression.py
Executable file
193
src/linear_regression.py
Executable file
|
|
@ -0,0 +1,193 @@
|
|||
#! /usr/bin/python3
|
||||
|
||||
import numpy
|
||||
|
||||
class linreg:
|
||||
|
||||
def __init__(self, p=1, ny=1, xoff=0, yoff=0, decay=None, xdecay=None):
|
||||
self.P = p
|
||||
self.p = numpy.arange(2*p+1)
|
||||
self.x = numpy.zeros((2*p+1,))
|
||||
self.y = numpy.zeros((ny, p+1))
|
||||
self.yy = numpy.zeros((ny,))
|
||||
self.w = None
|
||||
self.xoff = xoff
|
||||
self.yoff = yoff
|
||||
self.decay = decay
|
||||
self.xdecay = xdecay
|
||||
self.N = 0
|
||||
|
||||
Ai = numpy.arange(p+1)
|
||||
self.Ai = Ai+Ai.reshape((-1,1))
|
||||
|
||||
if xoff is True:
|
||||
self.Bi = self.pascal()
|
||||
|
||||
def pascal(self):
|
||||
p = self.p.shape[0]
|
||||
Bi = numpy.zeros((p,p,p), dtype=int)
|
||||
for n in range(p):
|
||||
Bi[n,0,n] = 1
|
||||
Bi[n,n,0] = 1
|
||||
for n in range(1,p):
|
||||
for k in range(1,n):
|
||||
Bi[n,k,n-k] = Bi[n-1,k-1,n-k] + Bi[n-1,k, n-k-1]
|
||||
return Bi
|
||||
|
||||
def add(self, x, y, w=1.0, decay=None):
|
||||
|
||||
y = numpy.array(y, dtype=float).reshape((-1,))
|
||||
|
||||
if decay is None:
|
||||
if self.N and self.xdecay is not None:
|
||||
decay = 1 - numpy.exp((self.lastx - x)/self.xdecay)
|
||||
else:
|
||||
decay = self.decay
|
||||
self.lastx = x
|
||||
|
||||
if decay is not None:
|
||||
self.x *= 1 - decay
|
||||
self.y *= 1 - decay
|
||||
self.yy *= 1 - decay
|
||||
|
||||
if self.xoff is True:
|
||||
if self.N:
|
||||
self.transform_x(x - self.x_origin)
|
||||
self.x_origin = x
|
||||
if self.yoff is True:
|
||||
if self.N:
|
||||
self.transform_y(y - self.y_origin)
|
||||
self.y_origin = y
|
||||
else:
|
||||
self.y[:, 0] += w * y
|
||||
self.yy += w * y*y
|
||||
self.x[0] += w
|
||||
self.N += 1
|
||||
return
|
||||
|
||||
self.N += 1
|
||||
x -= self.xoff
|
||||
y -= self.yoff
|
||||
x = w * numpy.power(x, self.p)
|
||||
self.x += x
|
||||
self.y += y[:,numpy.newaxis] * x[:self.P+1]
|
||||
self.yy += w * y*y
|
||||
|
||||
def solve(self):
|
||||
if self.N <= self.P:
|
||||
return
|
||||
try:
|
||||
return numpy.linalg.solve(self.x[self.Ai], self.y[...,numpy.newaxis])[...,0]
|
||||
except:
|
||||
print(self.x[self.Ai], self.y)
|
||||
raise
|
||||
|
||||
def solve_p(self, p):
|
||||
if self.N <= p or p > self.P:
|
||||
return
|
||||
Ai = self.Ai[:p+1, :p+1]
|
||||
y = self.y[:, :p+1, numpy.newaxis]
|
||||
return numpy.linalg.solve(self.x[Ai], y)
|
||||
|
||||
def transform_x(self, x):
|
||||
p = numpy.power(-x, self.p).reshape((-1,1))
|
||||
Bi = self.Bi @ p
|
||||
xx = self.x[numpy.newaxis,:] @ Bi
|
||||
self.x = xx[:,0,0]
|
||||
yy = self.y[:,numpy.newaxis,numpy.newaxis,:] @ Bi[:self.P+1,:self.P+1]
|
||||
self.y = yy[...,0,0]
|
||||
|
||||
def transform_y(self, y):
|
||||
self.yy += self.x[0]*y*y - 2*y*self.y[:,0]
|
||||
self.y -= y * self.x[:self.P+1]
|
||||
|
||||
def print_result(self, r, fmt="%.4g"):
|
||||
if r is None:
|
||||
return
|
||||
dx = self.xoff
|
||||
dy = self.yoff
|
||||
if dx is True:
|
||||
dx = self.x_origin
|
||||
if dy is True:
|
||||
dy = self.y_origin
|
||||
r[:,0] += dy
|
||||
for i, rr in enumerate(r):
|
||||
print(i, fmt % dx, " ".join([fmt % rrr for rrr in rr]))
|
||||
|
||||
def strtonum(s):
|
||||
try:
|
||||
return float(s)
|
||||
except:
|
||||
pass
|
||||
return int(s, 0)
|
||||
|
||||
def main():
|
||||
|
||||
import sys, getopt, fileinput
|
||||
options, files = getopt.gnu_getopt(sys.argv[1:], "p:n:x:y:w:d:X:Y:D:T:I")
|
||||
p = 1
|
||||
ny = 1
|
||||
xoff = 0
|
||||
yoff = 0
|
||||
decay = None
|
||||
xdecay = None
|
||||
ix = 0
|
||||
iy = 1
|
||||
iw = None
|
||||
id = None
|
||||
w = 1.0
|
||||
d = None
|
||||
I = False
|
||||
for o, v in options:
|
||||
if o=="-p":
|
||||
p = int(v)
|
||||
if o=="-n":
|
||||
ny = int(v)
|
||||
if o=="-X":
|
||||
if v=="R":
|
||||
xoff = True
|
||||
else:
|
||||
xoff = int(v)
|
||||
if o=="-Y":
|
||||
if v=="R":
|
||||
xoff = True
|
||||
yoff = True
|
||||
else:
|
||||
yoff = int(v)
|
||||
if o=="-D":
|
||||
decay = float(v)
|
||||
if o=="-T":
|
||||
xdecay = float(v)
|
||||
if o=="-x":
|
||||
ix = int(v)
|
||||
if o=="-y":
|
||||
iy = int(v)
|
||||
if o=="-w":
|
||||
iw = int(v)
|
||||
if o=="-d":
|
||||
id = int(v)
|
||||
if o=="-I":
|
||||
I = True
|
||||
|
||||
LR = linreg(p=p, ny=ny, xoff=xoff, yoff=yoff, decay=decay, xdecay=xdecay)
|
||||
|
||||
for l in fileinput.input(files):
|
||||
ll = l.split()
|
||||
try:
|
||||
x = strtonum(ll[ix])
|
||||
y = numpy.array([strtonum(lll) for lll in ll[iy:iy+ny]])
|
||||
if iw is not None:
|
||||
w = strtonum(ll[iw])
|
||||
if id is not None:
|
||||
d = strtonum(ll[id])
|
||||
except:
|
||||
continue
|
||||
LR.add(x, y, w, d)
|
||||
if I:
|
||||
print_result(LR.solve())
|
||||
|
||||
if not I:
|
||||
print_result(LR.solve())
|
||||
|
||||
if __name__=="__main__":
|
||||
main()
|
||||
53
src/ntc.py
Normal file
53
src/ntc.py
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
|
||||
import math
|
||||
|
||||
class ntc:
|
||||
|
||||
"""Calculate Temperature from NTC readings
|
||||
Negative Temperature Coefficient thermistors.
|
||||
The resitance follows the law of Arrhenius
|
||||
W = kβ
|
||||
R = R₂₅ exp(β/T₂₅) exp(-β/T)
|
||||
|
||||
The NTC is biases via resistor R₁ from Voltage V₁.
|
||||
"""
|
||||
|
||||
β = 3695.0 # K
|
||||
T25 = 298.16 # K
|
||||
T0 = 273.16 # K
|
||||
R25 = 10 # kΩ
|
||||
R1 = 10 # kΩ
|
||||
V1 = 1
|
||||
res = 1/0x1000
|
||||
|
||||
log = math.log
|
||||
|
||||
def Rntc(self, a, V1=None):
|
||||
"""return NTC Resistance from ADC reading
|
||||
a: ADC reading,
|
||||
V1: full scale reading
|
||||
"""
|
||||
if not V1:
|
||||
V1 = self.V1
|
||||
a /= V1
|
||||
if a < self.res:
|
||||
a = 1
|
||||
if a > 1 - self.res:
|
||||
a = 1 - self.res
|
||||
return self.R1 / (1/a - 1)
|
||||
|
||||
def TntcR(self, R):
|
||||
"""return Temperature from NTC Resistance"""
|
||||
return self.β / (self.log(R/self.R25) + self.β/self.T25) - self.T0
|
||||
|
||||
def Tntc(self, a, V1=None):
|
||||
"""return Temperature from ADC reading"""
|
||||
return self.TntcR(self.Rntc(a, V1))
|
||||
|
||||
def TntcB(self, a, b):
|
||||
"""Bridge mode
|
||||
a: bridge voltage (ADC_T - ADC_V)
|
||||
b: reference branch voltage (ADC_V)
|
||||
see `turbo.sch`
|
||||
"""
|
||||
return self.Tntc(a + b, b * (1 + self.R1/self.R25))
|
||||
173
src/turbo.py
Executable file
173
src/turbo.py
Executable file
|
|
@ -0,0 +1,173 @@
|
|||
#! /usr/bin/python3
|
||||
|
||||
import sys, time, getopt, serial, fileinput
|
||||
import pressure, ntc, linear_regression
|
||||
|
||||
options, files = getopt.gnu_getopt(sys.argv[1:], "xF:C", ["tty=", "noise", "clock"])
|
||||
|
||||
tty = None
|
||||
out = sys.stdout
|
||||
|
||||
do_noise = False
|
||||
do_clock = False
|
||||
|
||||
for o,v in options:
|
||||
if o in "-F --tty":
|
||||
if tty:
|
||||
raise ValueError("can only do one tty")
|
||||
tty = v
|
||||
do_clock = True
|
||||
|
||||
if o in "-C --clock":
|
||||
do_clock = True
|
||||
|
||||
if o in "-x --noise":
|
||||
do_noise = True
|
||||
|
||||
if tty and files:
|
||||
raise ValueError("cannot do tty and files")
|
||||
|
||||
if len(files)==1:
|
||||
if "/dev/tty" in files[0]:
|
||||
tty = files[0]
|
||||
|
||||
if tty:
|
||||
inp = serial.Serial(port=tty, baudrate=2400)
|
||||
else:
|
||||
inp = fileinput.input(files, mode="rb")
|
||||
|
||||
checksum = 0
|
||||
|
||||
def add_checksum(line):
|
||||
global checksum
|
||||
checksum += sum(line)
|
||||
# noise(line, "s")
|
||||
|
||||
def noise(line, prefix="x"):
|
||||
print(prefix, repr(line), file=out)
|
||||
return False
|
||||
|
||||
def echo(line, *a):
|
||||
try:
|
||||
print(line.strip().decode(), *a, file=out)
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
return noise(line)
|
||||
|
||||
def check_sum(line):
|
||||
global checksum
|
||||
try:
|
||||
rcs = int(line.strip()[1:], 16)
|
||||
except:
|
||||
return noise(line)
|
||||
if rcs > 0xff:
|
||||
return noise(line)
|
||||
add_checksum(line[:1])
|
||||
lcs = checksum & 0xff
|
||||
checksum = 0
|
||||
|
||||
if lcs==rcs:
|
||||
echo(line, "√")
|
||||
emit()
|
||||
else:
|
||||
echo(line, f"{lcs:02x}", "Checksum Error")
|
||||
|
||||
return False
|
||||
|
||||
Values = {}
|
||||
|
||||
NTC = ntc.ntc()
|
||||
|
||||
def voltages(line):
|
||||
ll = line.split()
|
||||
try:
|
||||
vv = {
|
||||
"Tcpu": float(ll[1]),
|
||||
"Vcpu": float(ll[2]),
|
||||
"Vbat": float(ll[3]),
|
||||
"dVntc": float(ll[4]),
|
||||
"Vntc": float(ll[5]),
|
||||
"cVntc": int(ll[6], 16),
|
||||
"Vrf": float(ll[7]),
|
||||
"cVrf": int(ll[8], 16),
|
||||
}
|
||||
except:
|
||||
return noise(line)
|
||||
|
||||
Values.update(vv)
|
||||
emit_voltages(**vv)
|
||||
|
||||
return echo(line)
|
||||
|
||||
def emit_voltages(Tcpu, Vcpu, Vbat, dVntc, Vntc, cVntc, Vrf, cVrf):
|
||||
Tntc = NTC.TntcB(dVntc, Vrf)
|
||||
Tntc2 = NTC.TntcB(cVntc-cVrf, cVrf)
|
||||
print(f"v {Tcpu:.1f}°C"
|
||||
f" {Tntc:.2f}°C"
|
||||
f" Bat {Vbat:.3f}V"
|
||||
f" Vcc {Vcpu:.3f}V"
|
||||
f" Vrf {Vrf/500:.3f}V",
|
||||
file=out)
|
||||
|
||||
freq = linear_regression.linreg(p=2, xoff=True, yoff=True, xdecay=600)
|
||||
|
||||
def clock(line):
|
||||
try:
|
||||
c = int(line.split()[1], 16)
|
||||
except:
|
||||
return noise(line, "t")
|
||||
s = None
|
||||
if do_clock:
|
||||
t = time.time()
|
||||
freq.add(t, c)
|
||||
s = freq.solve()
|
||||
if s is None:
|
||||
return echo(line)
|
||||
return echo(line, f"{c} {t:.1f}", *("%.4g" % ss for ss in s[0]))
|
||||
|
||||
def emit():
|
||||
pass
|
||||
|
||||
processes = {
|
||||
b'A': echo,
|
||||
b'B': echo,
|
||||
b'C': echo,
|
||||
b'D': echo,
|
||||
b'E': echo,
|
||||
b'F': echo,
|
||||
b'P': echo,
|
||||
b'Q': check_sum,
|
||||
b'R': echo,
|
||||
b'S': echo,
|
||||
b'T': clock,
|
||||
b'U': echo,
|
||||
b'V': voltages,
|
||||
b'W': echo,
|
||||
b'X': echo,
|
||||
}
|
||||
|
||||
while True:
|
||||
try:
|
||||
line = inp.readline()
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
if not line:
|
||||
break
|
||||
|
||||
line_key = line[0:1]
|
||||
is_noise = line_key not in processes
|
||||
if 0 in line:
|
||||
is_noise = True
|
||||
|
||||
if is_noise:
|
||||
if do_noise:
|
||||
noise(line)
|
||||
continue
|
||||
|
||||
if processes[line_key](line):
|
||||
add_checksum(line)
|
||||
|
||||
if tty:
|
||||
out.flush()
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue