Compare commits

..

No commits in common. "ac5972706516484ec050c1755e89c03d5942aaa4" and "d2749e74049ec07756a203f0d09c133767ee0a60" have entirely different histories.

2 changed files with 32 additions and 245 deletions

110
d3d.py
View file

@ -1,12 +1,7 @@
#! /usr/bin/python3
import os, sys, os.path, time, math
os.chdir(os.path.dirname(sys.argv[0]))
arg = sys.argv[1:] and sys.argv[1]
if arg != "nostart":
os.system("screen ./d3d.rc")
verbosity = 1
os.system("screen ./d3d.rc")
class watchdog:
period = 60
@ -33,50 +28,35 @@ class parse_status:
"parse the lines of the `irena-status` file"
verbosity = 1
def __init__(self, lines):
self.hosttime = time.time()
self.parse(lines)
self.last_rate_freq = 0
self.last_error_freq = 0
self.last_warn_amp = 0
def parse(self, lines):
rates = False
self.states = {}
self.rates = {}
self.time = 0
for l in lines:
ll = l.split()
if not ll:
continue
if ll[0]=="irena":
self.version = ll[2]
self.time = int(ll[4])
self.version = ll[2]
self.time=int(ll[4])
fhosttime = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self.hosttime))
if verbosity >= 1:
print(f"Host time {fhosttime}, Status time {ll[5]}")
if ll[0]=="data":
rates = True
if not l[0].isdigit() or len(ll)<2:
if not l[0].isdigit():
continue
if not rates:
try:
self.states[ll[1]] = parse_state(ll)
except Exception as e:
if verbosity>=0:
print(f"parse_state: {repr(e)}")
self.rates[ll[1]] = parse_state(ll)
else:
try:
self.rates[ll[1]] = parse_rate(ll)
except Exception as e:
if verbosity>=0:
print(f"parse_rate: {repr(e)}")
self.rate[ll[1]] = parse_rate(ll)
max_age = 300
def good(self):
return self.time and len(self.states)==20 and len(self.rates)==8
def old(self):
return self.time + self.max_age < self.hosttime
@ -84,46 +64,29 @@ class parse_status:
"Try to identify failure modes and lauch mitigations"
pass
Rate_scale = 4/math.log(100)
def set_Rate(self, LED):
"Set the green LED frequency based on trigger rate"
r = self.states["T_rate"].percent()
r = self["T_rate"].value.percent()
r = max(0, min(100, r))
f = math.log(r+1)*self.Rate_scale + 0.1
f = log(r+1)/log(100)*4+0.1
LED.sine_freq(f)
if verbosity >= 1 and f != self.last_rate_freq:
print(f"Rate {self.states['T_rate'].value} LED frequency {f:.2f} Hz")
self.last_rate_freq = f
def set_Error(self, LED):
def set_ERROR(self, LED):
"Set the yellow LED based on warning or error status counts"
ne = 0
nw = 0
for s in self.states.values():
for s in self.status:
if not s.is_ok():
if s.is_warn():
nw += 1
else:
ne += 1
if ne:
f = 0.5 + min(ne,19)/2
LED.blink_freq(f)
if verbosity >= 0 and self.last_error_freq != f:
print(f"Errors: {ne} LED frequency {f:.2f} Hz")
self.last_error_freq = f
self.last_warm_amp = 0
LED.blink_freq(10 - min(ne,19)/2)
elif nw:
a = amp = 20*min(5,nw)
LED.on(a)
if verbosity >= 0 and self.last_warn_amp != a:
print(f"Warnings: {nw} LED brightness {a:.1f}%")
self.last_warm_amp = a
self.last_error_freq = 0
LED.on(amp = 20*min(5,nw))
else:
LED.off()
self.last_error_freq = 0
self.last_warm_amp = 0
class parse_state:
def __init__(self, ll):
@ -136,7 +99,7 @@ class parse_state:
self.status = ll[6]
self.reset = int(ll[7])
self.limits = list(map(float, ll[8:]))
if not self.is_ok() and verbosity>=1:
if !self.is_ok() and verbosity>=1:
print(self)
def is_ok(self):
@ -182,51 +145,40 @@ def restart_irena():
pass
last_status = 0
next_status = 0
while True:
t = time.time()
t0 = t + 10
t0 = t+10
t1 = Rate_LED.update()
t2 = Error_LED.update()
dt = t - min([x for x in [t0, t1, t2] if x and x>t])
time.sleep(max(dt, 0.01))
t2 = Rate_LED.update()
dt = t-min(t1,t2,t3)
if dt>0:
time.sleep(dt)
t = time.time()
if t < next_status:
continue
next_status = t+10
try:
with open("d3d/irena-status") as st:
status=parse_status(st.readlines(0x2000))
except Exception as e:
if verbosity >= 0:
print(repr(e))
raise
continue
if not status.good():
if verbosity >= 1:
print("no good status")
if last_status:
restart_irena()
continue
if t > last_status + 10:
try:
with open("d3d/irena-status") as st:
status=parse_status(st.readlines(hint=0x2000))
except Exception as e:
if verbosity >= 0:
print(repr(e))
continue
if last_status != status.time:
status.analyse()
status.set_Rate(Rate_LED)
status.set_Error(Error_LED)
last_status = status.time
last_status = status.time
if not dog.due():
continue
if status.old():
restart_irena()
if verbosity >= 0:
print(f"not kicking the dog because status is {status.hosttime - status.time} s old")
restart_irena()
continue
dog.kick()

165
led.py
View file

@ -1,165 +0,0 @@
from time import time, sleep
from math import sin, pi as π
class LED():
"Control an LED via a linux sysfs PWM at `path`"
def __init__(self, path, nsine=64, period=None):
self.nsine = nsine
self.waveform = [50*(1 + sin(2*π*n/nsine)) for n in range(nsine)]
self.path = path
if period:
self.set_period(period)
else:
self.get_period()
self.off()
self.next_update = 0
def write_led(self, key, value):
try:
with open(f"{self.path}/{key}", "w") as f:
f.write(f"{value}\n")
except IOError as e:
print(f"write {self.path}/{key} = {value} : {repr(e)}",
file=sys.stderr)
def read_led(self, key):
try:
with open(f"{self.path}/{key}") as f:
return int(f.read())
except Exception as e:
print(f"read {self.path}/{key} : {repr(e)}",
file=sys.stderr)
def enable(self, on=1):
if self.mode:
return
# do not disable PWM, the LED may light up
self.write_led("enable", 1)
def set_period(self, p):
self.period = p
self.write_led("period", p)
def get_period(self):
self.period = self.read_led("period")
return self.period
def set_dc(self, dc):
p = self.period
if not isinstance(dc, int):
dc = int(dc/100 * p)
if dc >= p:
dc = p
if dc < 0:
dc = 0
self.duty_cycle = dc
self.write_led("duty_cycle", dc);
def get_dc(self):
self.duty_cycle = self.read_led("duty_cycle")
return self.duty_cycle / self.period
OFF = 0
ON = 1
BLINK = 2
SINE = 3
FLASH = 4
def on(self, amp=100):
self.enable(None)
self.mode = self.ON
self.set_dc(float(amp))
def off(self):
self.mode = self.OFF
self.enable(0)
self.set_dc(0)
def update(self):
if self.mode == self.BLINK:
return self.update_blink()
elif self.mode == self.SINE:
return self.update_sine()
elif self.mode == self.FLASH:
return self.update_flash()
return None
def sine_freq(self, f):
if self.mode != self.SINE:
self.enable(None)
self.phase = 0
self.mode = self.SINE
self.dt = 1/f/self.nsine
return self.update_sine()
def update_sine(self):
if self.next():
self.phase += 1
self.phase %= self.nsine
self.set_dc(self.waveform[self.phase])
return self.next_update
def blink_freq(self, f, amp=100):
if self.mode != self.BLINK:
self.enable(None)
self.phase = False
self.mode = self.BLINK
self.amp = float(amp)
self.dt = 0.5/f
return self.update_blink()
def update_blink(self):
if self.next():
self.phase = not self.phase
if self.phase:
self.set_dc(self.amp)
else:
self.set_dc(0)
return self.next_update
def flash(self, tau=1, amp=100):
self.mode = self.FLASH
self.enable(None)
self.amp = float(amp)
self.set_dc(self.amp)
if tau > 1:
self.decay = 0.99
self.dt = 0.01*tau
else:
self.dt = 0.01
self.decay = 1 - self.dt/tau
return self.update_flash()
def update_flash(self):
if self.next():
self.amp *= self.decay
self.set_dc(self.amp)
if self.amp < 1/256:
self.off()
return None
return self.next_update
def next(self):
dt = self.dt
t = time()
if t < self.next_update:
return False
self.next_update += dt
if t > self.next_update:
self.next_update += int((t-self.next_update)/dt)*dt
return True
def run(self, duration=0):
t = time()
tend = t + duration
try:
while not duration or t < tend:
t1 = self.update()
dt = t1-t
sleep(max(dt, 0.01))
t = time()
except KeyboardInterrupt:
pass