git-svn-id: svn+ssh://asterix.ieap.uni-kiel.de/home/subversion/stephan/solo/eda/cospi/host@9348 bc5caf13-1734-44f8-af43-603852e9ee25
445 lines
13 KiB
Python
445 lines
13 KiB
Python
|
||
import sys, time, numpy
|
||
|
||
def _connect(ifc):
|
||
i2c.ifc = ifc
|
||
|
||
class I2CError(IOError):
|
||
pass
|
||
|
||
def sphere(v):
|
||
r = numpy.empty_like(v)
|
||
r[:,0] = (v**2).sum(-1)
|
||
r[:,1] = numpy.arctan2((v[:,:2]**2).sum(-1), v[:,2])
|
||
r[:,2] = numpy.arctan2(v[:,1], v[:,0])
|
||
return r.round(3)
|
||
|
||
class i2c:
|
||
NOM = 0
|
||
START = 1
|
||
LAST = 2
|
||
STOP = START | LAST
|
||
ifACTIVE = 4
|
||
RESTART = ifACTIVE | START
|
||
ifACK = 8
|
||
RESUME = 0x10
|
||
SAVE = 0x20
|
||
DISCARD = SAVE
|
||
UNSTUCK = STOP | SAVE
|
||
SEND = 0x40
|
||
|
||
def _connect(self, ifc):
|
||
self.ifc = ifc
|
||
|
||
def cmdi(self, c, d=0):
|
||
self._log(5, f"I²C[{self._n+self._a:3d}]: data(0x{c:02x}, 0x{d:02x})")
|
||
self._n += 1
|
||
return self.ifc.reg(0x33, (c<<8) | d)
|
||
def cmdh(self, m, s, buf=False, rel=False):
|
||
self._log(5, f"I²C[{self._n+self._a:3d}]: head({m}, {s}, {buf}, {rel})")
|
||
self._n += 1
|
||
self.psize = s+1
|
||
self.pmagic = 0x12c0+m
|
||
return self.ifc.reg(0x33, 0x8000 | (rel<<13) | (buf<<12) | (m<<8) | s)
|
||
def cmdm(self, ms=0, restart=False, rel=False):
|
||
self._log(5, f"I²C[{self._n+self._a:3d}]: wait({ms}, {restart}, {rel})")
|
||
self._n += 1
|
||
return self.ifc.reg(0x33, 0xc000 | (rel<<13) | (restart<<12) | ms)
|
||
def cmda(self, a, n=0):
|
||
self._log(5, f"I²C addr({a}, {n})")
|
||
self._n = n
|
||
self._a = a
|
||
return self.ifc.reg(0x32, (n<<8) | a)
|
||
def reset(self):
|
||
self._a = 0
|
||
self._n = 0
|
||
self._log(5, "I²C reset")
|
||
return self.ifc.reg(7, 0x1000)
|
||
def status(self):
|
||
a = self.ifc.reg(0x32)
|
||
return a&0xff, a>>8
|
||
|
||
|
||
def readpacket(self, magic=None, size=None):
|
||
if magic is not None:
|
||
self.pmagic = magic
|
||
if size is not None:
|
||
self.psize = size
|
||
self._d = self.ifc.readfifo(1, self.pmagic, 0xffff, self.psize)
|
||
if self._d[0] != self.pmagic:
|
||
raise I2CError(f"magic mismatch {self.d[0]:04x} expect {self.pmagic:04x}")
|
||
self._log(4, f"I2C readpacket: magic={self._d[0]:04x}, len={len(self._d)}")
|
||
return self._d
|
||
|
||
ACC = 0x32 # 8 bits, SA0 = 1
|
||
MAG = 0x3c # 8 bits, SA1 = 1
|
||
|
||
def conf1(s, k, v):
|
||
s._log(3, f"I²C {s._a:03d}: conf[{s.name}/{k}] = 0x{v:02x}")
|
||
s.cmdi(s.START, s.SAD)
|
||
s.cmdi(s.NOM, s.REG[k])
|
||
s.cmdi(s.NOM, v)
|
||
s.cmdi(s.STOP)
|
||
return 4
|
||
|
||
def conf(self, a=0):
|
||
self.cmda(a)
|
||
s = 0
|
||
for k, v in self.CONF.items():
|
||
s += self.conf1(k, v)
|
||
self._log(2, f"I²C conf({self.name}), len={s}")
|
||
self.cmda(a, s)
|
||
|
||
def readi(s, k):
|
||
s.cmdi(s.START, s.SAD)
|
||
s.cmdi(s.SAVE, s.REG[k])
|
||
s.cmdi(s.START, s.SAD + 1)
|
||
s.cmdi(s.LAST, 0)
|
||
s.cmdi(s.STOP)
|
||
return 5
|
||
|
||
def read1(s, k):
|
||
s.cmda(0)
|
||
s.cmdh(s.I_am & 0b1111, 2)
|
||
s.cmda(0, s.readi(k)+1)
|
||
time.sleep(0.01)
|
||
s._v = s.readpacket()[-1] >> 8
|
||
s._log(3, f"I²C read({s.name}/{k}) = 0x{s._v:04x}")
|
||
return s._v
|
||
|
||
def verify(self):
|
||
self.cmda(0)
|
||
self.cmdh(self.I_am & 0b1111, 1 + len(self.CONF))
|
||
s = 1
|
||
for k in self.CONF:
|
||
s += self.readi(k)
|
||
self.cmda(0, s)
|
||
time.sleep(0.1)
|
||
self.readpacket()
|
||
errors = 0
|
||
for i, k in enumerate(self.CONF):
|
||
x = (self.CONF[k] << 8) | self.REG[k]
|
||
if self._d[i+2] != x:
|
||
self._log(1, f"I²C verify MISMATCH {i}: {self.name}/{k}, {x:04x} → {self._d[i+2]:04x}")
|
||
errors += 1
|
||
else:
|
||
self._log(2, f"I²C verify match {i}: {self.name}/{k}, {x:04x} → {self._d[i+2]:04x}")
|
||
self._log(2, f"I²C verify({self.name}), len={s}")
|
||
return errors
|
||
|
||
def readv(s, k, n=7):
|
||
s.cmda(0)
|
||
s.cmdh(s.I_am & 0b1111, (3+n)//2)
|
||
s.cmdi(s.START, s.SAD)
|
||
s.cmdi(s.SAVE, s.REG[k] | 0x80)
|
||
s.cmdi(s.START, s.SAD + 1)
|
||
s.cmdi(s.LAST, n-1)
|
||
s.cmdi(s.STOP)
|
||
s.cmda(0, 6)
|
||
time.sleep(0.001*(10+n))
|
||
s.readpacket()
|
||
vv = " ".join([f"{d:02x}" for d in s._d])
|
||
s._log(3, f"I²C vector({s.name}, {k}) = {vv}")
|
||
return s._d
|
||
|
||
def read_vector(self):
|
||
return self.readv("STATUS")
|
||
|
||
def who(self, a=None, v=None):
|
||
if a is None:
|
||
a = self.SAD
|
||
v = self.I_am
|
||
self._who = self.read1("WHO")
|
||
self._log(1, f"WHO[SAD=0x{a:02x}] = 0x{self._who:02x}")
|
||
if v is not None and self._who != v:
|
||
raise I2CError(f"who am I got {self._who:02x} expect {v:02x}")
|
||
return self._who
|
||
|
||
def _log(self, v, m):
|
||
if self.verbosity >= v:
|
||
print(m, file=sys.stderr)
|
||
|
||
verbosity = 5
|
||
|
||
def test(s):
|
||
s.cmda(8)
|
||
s.cmdh(1, 5)
|
||
s.cmdi(s.START|s.SEND, s.MAG)
|
||
s.cmdi(s.SAVE, 0x0f)
|
||
s.cmdi(s.START, s.MAG + 1)
|
||
s.cmdi(s.LAST, 0)
|
||
s.cmdi(s.STOP)
|
||
s.cmdi(s.START|s.SEND, s.ACC)
|
||
s.cmdi(s.SAVE, 0x0f)
|
||
s.cmdi(s.START, s.ACC + 1)
|
||
s.cmdi(s.LAST, 0)
|
||
s.cmdi(s.STOP)
|
||
s.cmda(8, 10)
|
||
|
||
def unstuck(s):
|
||
s.cmda(0)
|
||
s.cmdh(0, 3)
|
||
s.cmdi(s.UNSTUCK|s.SEND)
|
||
s.cmdi(s.STOP|s.SEND)
|
||
s.cmda(0, 3)
|
||
|
||
def flight_seq(s, a=0):
|
||
s.cmda(a)
|
||
s.cmdm(2, rel=True) # release the last packet
|
||
s.cmdh(5, 132, buf=True, rel=False) # [0:1] single packet, 133 words
|
||
s.cmdi(s.START, MAG.SAD)
|
||
s.cmdi(s.SAVE, MAG.REG['STATUS'] | 0x80)
|
||
s.cmdi(s.RESTART, MAG.SAD | 1)
|
||
s.cmdi(s.LAST, 6) # [2:6]
|
||
s.cmdi(s.STOP)
|
||
s.cmdi(s.START, MAG.SAD)
|
||
s.cmdi(s.SAVE, MAG.REG['TEMP'] | 0x80) # save will be ignored
|
||
s.cmdi(s.RESTART, MAG.SAD | 1)
|
||
s.cmdi(s.LAST, 1) # [6:7]
|
||
s.cmdi(s.STOP)
|
||
s.cmdi(s.START, ACC.SAD)
|
||
s.cmdi(s.SAVE, ACC.REG['STATUS_AUX'] | 0x80)
|
||
s.cmdi(s.RESTART, ACC.SAD | 1)
|
||
s.cmdi(s.LAST, 6) # [7:11]
|
||
s.cmdi(s.STOP)
|
||
s.cmdm(96)
|
||
s.cmdi(s.START, MAG.SAD)
|
||
s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored
|
||
s.cmdi(s.RESTART, MAG.SAD | 1)
|
||
s.cmdi(s.LAST, 5) # [11:65]
|
||
s.cmdi(s.STOP)
|
||
for n in range(17):
|
||
s.cmdm(99)
|
||
s.cmdi(s.START, MAG.SAD)
|
||
s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored
|
||
s.cmdi(s.RESTART, MAG.SAD | 1)
|
||
s.cmdi(s.LAST, 5) # [11:65]
|
||
s.cmdi(s.STOP)
|
||
s.cmdi(s.START, ACC.SAD)
|
||
s.cmdi(s.SAVE, ACC.REG['FIFO_SRC'])
|
||
s.cmdi(s.RESTART, ACC.SAD | 1)
|
||
s.cmdi(s.LAST, 0) # [65:66]
|
||
s.cmdi(s.STOP)
|
||
s.cmdi(s.START, ACC.SAD)
|
||
s.cmdi(s.SAVE, ACC.REG['STATUS'] | 0x80)
|
||
s.cmdi(s.RESTART, ACC.SAD | 1)
|
||
s.cmdi(s.LAST, 21*6) # words [66:130]
|
||
s.cmdi(s.STOP)
|
||
s.cmdm(82)
|
||
s.cmdi(s.START, MAG.SAD)
|
||
s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored
|
||
s.cmdi(s.RESTART, MAG.SAD | 1)
|
||
s.cmdi(s.LAST, 5) # words [130:133]
|
||
s.cmdi(s.STOP)
|
||
s.cmdm(97, restart=True)
|
||
s._log(2, f"flight sequence loaded @{s._a}, size {s._n}, end {s.status()[0]}")
|
||
return (s._a, s._n)
|
||
|
||
class irena(i2c):
|
||
|
||
f = sys.stdout
|
||
prefix = "@i2c/"
|
||
postfix = ""
|
||
# for 2nd FPGA: postfix = "/del[1]"
|
||
|
||
def output(self, r):
|
||
self.f.write(f"{self.prefix}{r}{self.postfix}\n")
|
||
|
||
def cmdi(self, c, d=0):
|
||
self._n += 1
|
||
flags = ""
|
||
if c & self.ifACTIVE:
|
||
flags = "/ifactive"
|
||
if c & self.START:
|
||
if c & self.LAST:
|
||
if c & self.SAVE:
|
||
r = "unstuck"
|
||
else:
|
||
r = "stop"
|
||
d = None
|
||
else:
|
||
if c & self.ifACTIVE:
|
||
r = "restart"
|
||
else:
|
||
r = "start"
|
||
flags = ""
|
||
self.reading = d & 1
|
||
self.first = True
|
||
if self.reading:
|
||
flags += "/read"
|
||
d &= 0xfe
|
||
if d == self.MAG:
|
||
flags += "/mag"
|
||
d = None
|
||
if d == self.ACC:
|
||
flags += "/acc"
|
||
d = None
|
||
elif self.reading:
|
||
if c & self.LAST:
|
||
r = "read"
|
||
else:
|
||
r = "read/continue"
|
||
if c & self.DISCARD:
|
||
flags += "/discard"
|
||
else:
|
||
if c & self.SAVE:
|
||
flags += "/save"
|
||
if self.first:
|
||
r = "register"
|
||
if d & 0x80:
|
||
flags += "/auto"
|
||
d &= 0x7f
|
||
else:
|
||
r = "write"
|
||
if c & self.SEND:
|
||
flags = "/send"+flags
|
||
if c & self.ifACK:
|
||
flags = "/ifack"+flags
|
||
if d is None:
|
||
self.output(f"{r}{flags}")
|
||
else:
|
||
self.output(f"{r}{flags}[0x{d:02x}]")
|
||
|
||
def cmdh(self, m, s, buf=False, rel=False):
|
||
self._n += 1
|
||
self.psize = s+1
|
||
self.pmagic = 0x12c0+m
|
||
flags = ""
|
||
if rel:
|
||
flags += "/release"
|
||
if buf:
|
||
flags += "/buffer"
|
||
self.output(f"header{flags}[{m},{s}]")
|
||
|
||
def cmdm(self, ms=0, restart=False, rel=False):
|
||
self._n += 1
|
||
flags = ""
|
||
if rel:
|
||
flags += "/release"
|
||
if restart:
|
||
flags += "/loop"
|
||
self.output(f"wait{flags}[{ms}]")
|
||
|
||
def cmda(self, a, n=0):
|
||
self._n = n
|
||
self._a = a
|
||
if n:
|
||
self.output(f"run[0x{a:02x},{n}]")
|
||
else:
|
||
self.output(f"addr[0x{a:02x}]")
|
||
|
||
class ACC(i2c):
|
||
|
||
name = "ACC"
|
||
SAD = i2c.ACC
|
||
I_am = 0x33
|
||
|
||
REG = {
|
||
"STATUS_AUX": 0x07,
|
||
"ADC1": 0x08,
|
||
"ADC2": 0x0a,
|
||
"TEMP": 0x0c,
|
||
"WHO": 0x0f,
|
||
"CTRL0": 0x1e,
|
||
"TEMP_CFG": 0x1f,
|
||
"CTRL1": 0x20,
|
||
"CTRL2": 0x21,
|
||
"CTRL3": 0x22,
|
||
"CTRL4": 0x23,
|
||
"CTRL5": 0x24,
|
||
"CTRL6": 0x25,
|
||
"REF": 0x26,
|
||
"STATUS": 0x27,
|
||
"X": 0x28,
|
||
"Y": 0x2a,
|
||
"Z": 0x2c,
|
||
"FIFO_CTRL": 0x2e,
|
||
"FIFO_SRC": 0x2f,
|
||
"INT1_CFG": 0x30,
|
||
"INT1_SRC": 0x31,
|
||
"INT1_THS": 0x32,
|
||
"INT1_DUR": 0x33,
|
||
"INT2_CFG": 0x34,
|
||
"INT2_SRC": 0x35,
|
||
"INT2_THS": 0x36,
|
||
"INT2_DUR": 0x37,
|
||
"CLICK_CFG": 0x38,
|
||
"CLICK_SRC": 0x39,
|
||
"CLICK_THS": 0x3a,
|
||
"TIME_LIMIT": 0x3b,
|
||
"TIME_LAT": 0x3c,
|
||
"TIME_WIN": 0x3d,
|
||
"ACT_THS": 0x3e,
|
||
"ACT_DUR": 0x3f,
|
||
}
|
||
|
||
CONF = {
|
||
"TEMP_CFG": 0xc0, # TEMP_EN enabled
|
||
"CTRL1": 0x27, # X,Y,Z enabled, ODR=10 Hz
|
||
"CTRL4": 0x88, # BDU (needed for ADC?) HR mode (12-bit)
|
||
"CTRL5": 0x40, # FIFO enable
|
||
"FIFO_CTRL": 0x94, # stream mode, FTH=20
|
||
}
|
||
|
||
def b2g(self, x):
|
||
if x & 0x8000:
|
||
x -= 0x10000
|
||
return round(x/0x4000, 4)
|
||
|
||
def read_g(self, n=32, what="g"):
|
||
d = self.readv("STATUS", 1+n*6)
|
||
if what == "raw":
|
||
return d
|
||
g = [[f"{x:04x}" for x in d[:3]]]
|
||
for i in range(n):
|
||
g.append([self.b2g(x) for x in d[3+3*i:6+3*i]])
|
||
if what=="g":
|
||
return g
|
||
return sphere(numpy.array(g[1:]))
|
||
|
||
def read_adcs(self):
|
||
"""
|
||
* https://github.com/STMicroelectronics/lis3dh-pid
|
||
* Data Format:
|
||
* Outputs are Left Justified in 2’ complements
|
||
* range 800mV
|
||
* code zero means an analogue value of about 1.2V
|
||
* Voltage values smaller than centre values are positive
|
||
* (Example: 800mV = 7Fh / 127 dec)
|
||
* Voltage values bigger than centre values are negative
|
||
* (Example: 1600mV = 80h / -128 dec)
|
||
"""
|
||
return self.readv("STATUS_AUX")
|
||
|
||
class MAG(i2c):
|
||
|
||
name = "MAG"
|
||
SAD = i2c.MAG
|
||
I_am = 0x3d
|
||
|
||
REG = {
|
||
"OFFSET_X": 0x05,
|
||
"OFFSET_Y": 0x07,
|
||
"OFFSET_Z": 0x09,
|
||
"WHO": 0x0f,
|
||
"CTRL1": 0x20,
|
||
"CTRL2": 0x21,
|
||
"CTRL3": 0x22,
|
||
"CTRL4": 0x23,
|
||
"CTRL5": 0x24,
|
||
"STATUS": 0x27,
|
||
"X": 0x28,
|
||
"Y": 0x2a,
|
||
"Z": 0x2c,
|
||
"TEMP": 0x2e,
|
||
"INT_CFG": 0x30,
|
||
"INT_SRC": 0x31,
|
||
"INT_THS": 0x32,
|
||
}
|
||
|
||
CONF = {
|
||
"CTRL1": 0xfc, # TEMP enabled, UHP, ODR=80 Hz
|
||
"CTRL4": 0x0c, # UHP-Z
|
||
"CTRL3": 0x00, # continuous mode
|
||
"CTRL5": 0x40, # BDU
|
||
}
|