rpirena/i2c.py

445 lines
13 KiB
Python
Raw Permalink Normal View History

import sys, time, numpy
def _connect(ifc):
i2c.ifc = ifc
class I2CError(IOError):
pass
def sphere(v):
r = numpy.empty_like(v)
r[:,0] = (v**2).sum(-1)
r[:,1] = numpy.arctan2((v[:,:2]**2).sum(-1), v[:,2])
r[:,2] = numpy.arctan2(v[:,1], v[:,0])
return r.round(3)
class i2c:
NOM = 0
START = 1
LAST = 2
STOP = START | LAST
ifACTIVE = 4
RESTART = ifACTIVE | START
ifACK = 8
RESUME = 0x10
SAVE = 0x20
DISCARD = SAVE
UNSTUCK = STOP | SAVE
SEND = 0x40
def _connect(self, ifc):
self.ifc = ifc
def cmdi(self, c, d=0):
self._log(5, f"I²C[{self._n+self._a:3d}]: data(0x{c:02x}, 0x{d:02x})")
self._n += 1
return self.ifc.reg(0x33, (c<<8) | d)
def cmdh(self, m, s, buf=False, rel=False):
self._log(5, f"I²C[{self._n+self._a:3d}]: head({m}, {s}, {buf}, {rel})")
self._n += 1
self.psize = s+1
self.pmagic = 0x12c0+m
return self.ifc.reg(0x33, 0x8000 | (rel<<13) | (buf<<12) | (m<<8) | s)
def cmdm(self, ms=0, restart=False, rel=False):
self._log(5, f"I²C[{self._n+self._a:3d}]: wait({ms}, {restart}, {rel})")
self._n += 1
return self.ifc.reg(0x33, 0xc000 | (rel<<13) | (restart<<12) | ms)
def cmda(self, a, n=0):
self._log(5, f"I²C addr({a}, {n})")
self._n = n
self._a = a
return self.ifc.reg(0x32, (n<<8) | a)
def reset(self):
self._a = 0
self._n = 0
self._log(5, "I²C reset")
return self.ifc.reg(7, 0x1000)
def status(self):
a = self.ifc.reg(0x32)
return a&0xff, a>>8
def readpacket(self, magic=None, size=None):
if magic is not None:
self.pmagic = magic
if size is not None:
self.psize = size
self._d = self.ifc.readfifo(1, self.pmagic, 0xffff, self.psize)
if self._d[0] != self.pmagic:
raise I2CError(f"magic mismatch {self.d[0]:04x} expect {self.pmagic:04x}")
self._log(4, f"I2C readpacket: magic={self._d[0]:04x}, len={len(self._d)}")
return self._d
ACC = 0x32 # 8 bits, SA0 = 1
MAG = 0x3c # 8 bits, SA1 = 1
def conf1(s, k, v):
s._log(3, f"I²C {s._a:03d}: conf[{s.name}/{k}] = 0x{v:02x}")
s.cmdi(s.START, s.SAD)
s.cmdi(s.NOM, s.REG[k])
s.cmdi(s.NOM, v)
s.cmdi(s.STOP)
return 4
def conf(self, a=0):
self.cmda(a)
s = 0
for k, v in self.CONF.items():
s += self.conf1(k, v)
self._log(2, f"I²C conf({self.name}), len={s}")
self.cmda(a, s)
def readi(s, k):
s.cmdi(s.START, s.SAD)
s.cmdi(s.SAVE, s.REG[k])
s.cmdi(s.START, s.SAD + 1)
s.cmdi(s.LAST, 0)
s.cmdi(s.STOP)
return 5
def read1(s, k):
s.cmda(0)
s.cmdh(s.I_am & 0b1111, 2)
s.cmda(0, s.readi(k)+1)
time.sleep(0.01)
s._v = s.readpacket()[-1] >> 8
s._log(3, f"I²C read({s.name}/{k}) = 0x{s._v:04x}")
return s._v
def verify(self):
self.cmda(0)
self.cmdh(self.I_am & 0b1111, 1 + len(self.CONF))
s = 1
for k in self.CONF:
s += self.readi(k)
self.cmda(0, s)
time.sleep(0.1)
self.readpacket()
errors = 0
for i, k in enumerate(self.CONF):
x = (self.CONF[k] << 8) | self.REG[k]
if self._d[i+2] != x:
self._log(1, f"I²C verify MISMATCH {i}: {self.name}/{k}, {x:04x}{self._d[i+2]:04x}")
errors += 1
else:
self._log(2, f"I²C verify match {i}: {self.name}/{k}, {x:04x}{self._d[i+2]:04x}")
self._log(2, f"I²C verify({self.name}), len={s}")
return errors
def readv(s, k, n=7):
s.cmda(0)
s.cmdh(s.I_am & 0b1111, (3+n)//2)
s.cmdi(s.START, s.SAD)
s.cmdi(s.SAVE, s.REG[k] | 0x80)
s.cmdi(s.START, s.SAD + 1)
s.cmdi(s.LAST, n-1)
s.cmdi(s.STOP)
s.cmda(0, 6)
time.sleep(0.001*(10+n))
s.readpacket()
vv = " ".join([f"{d:02x}" for d in s._d])
s._log(3, f"I²C vector({s.name}, {k}) = {vv}")
return s._d
def read_vector(self):
return self.readv("STATUS")
def who(self, a=None, v=None):
if a is None:
a = self.SAD
v = self.I_am
self._who = self.read1("WHO")
self._log(1, f"WHO[SAD=0x{a:02x}] = 0x{self._who:02x}")
if v is not None and self._who != v:
raise I2CError(f"who am I got {self._who:02x} expect {v:02x}")
return self._who
def _log(self, v, m):
if self.verbosity >= v:
print(m, file=sys.stderr)
verbosity = 5
def test(s):
s.cmda(8)
s.cmdh(1, 5)
s.cmdi(s.START|s.SEND, s.MAG)
s.cmdi(s.SAVE, 0x0f)
s.cmdi(s.START, s.MAG + 1)
s.cmdi(s.LAST, 0)
s.cmdi(s.STOP)
s.cmdi(s.START|s.SEND, s.ACC)
s.cmdi(s.SAVE, 0x0f)
s.cmdi(s.START, s.ACC + 1)
s.cmdi(s.LAST, 0)
s.cmdi(s.STOP)
s.cmda(8, 10)
def unstuck(s):
s.cmda(0)
s.cmdh(0, 3)
s.cmdi(s.UNSTUCK|s.SEND)
s.cmdi(s.STOP|s.SEND)
s.cmda(0, 3)
def flight_seq(s, a=0):
s.cmda(a)
s.cmdm(2, rel=True) # release the last packet
s.cmdh(5, 132, buf=True, rel=False) # [0:1] single packet, 133 words
s.cmdi(s.START, MAG.SAD)
s.cmdi(s.SAVE, MAG.REG['STATUS'] | 0x80)
s.cmdi(s.RESTART, MAG.SAD | 1)
s.cmdi(s.LAST, 6) # [2:6]
s.cmdi(s.STOP)
s.cmdi(s.START, MAG.SAD)
s.cmdi(s.SAVE, MAG.REG['TEMP'] | 0x80) # save will be ignored
s.cmdi(s.RESTART, MAG.SAD | 1)
s.cmdi(s.LAST, 1) # [6:7]
s.cmdi(s.STOP)
s.cmdi(s.START, ACC.SAD)
s.cmdi(s.SAVE, ACC.REG['STATUS_AUX'] | 0x80)
s.cmdi(s.RESTART, ACC.SAD | 1)
s.cmdi(s.LAST, 6) # [7:11]
s.cmdi(s.STOP)
s.cmdm(96)
s.cmdi(s.START, MAG.SAD)
s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored
s.cmdi(s.RESTART, MAG.SAD | 1)
s.cmdi(s.LAST, 5) # [11:65]
s.cmdi(s.STOP)
for n in range(17):
s.cmdm(99)
s.cmdi(s.START, MAG.SAD)
s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored
s.cmdi(s.RESTART, MAG.SAD | 1)
s.cmdi(s.LAST, 5) # [11:65]
s.cmdi(s.STOP)
s.cmdi(s.START, ACC.SAD)
s.cmdi(s.SAVE, ACC.REG['FIFO_SRC'])
s.cmdi(s.RESTART, ACC.SAD | 1)
s.cmdi(s.LAST, 0) # [65:66]
s.cmdi(s.STOP)
s.cmdi(s.START, ACC.SAD)
s.cmdi(s.SAVE, ACC.REG['STATUS'] | 0x80)
s.cmdi(s.RESTART, ACC.SAD | 1)
s.cmdi(s.LAST, 21*6) # words [66:130]
s.cmdi(s.STOP)
s.cmdm(82)
s.cmdi(s.START, MAG.SAD)
s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored
s.cmdi(s.RESTART, MAG.SAD | 1)
s.cmdi(s.LAST, 5) # words [130:133]
s.cmdi(s.STOP)
s.cmdm(97, restart=True)
s._log(2, f"flight sequence loaded @{s._a}, size {s._n}, end {s.status()[0]}")
return (s._a, s._n)
class irena(i2c):
f = sys.stdout
prefix = "@i2c/"
postfix = ""
# for 2nd FPGA: postfix = "/del[1]"
def output(self, r):
self.f.write(f"{self.prefix}{r}{self.postfix}\n")
def cmdi(self, c, d=0):
self._n += 1
flags = ""
if c & self.ifACTIVE:
flags = "/ifactive"
if c & self.START:
if c & self.LAST:
if c & self.SAVE:
r = "unstuck"
else:
r = "stop"
d = None
else:
if c & self.ifACTIVE:
r = "restart"
else:
r = "start"
flags = ""
self.reading = d & 1
self.first = True
if self.reading:
flags += "/read"
d &= 0xfe
if d == self.MAG:
flags += "/mag"
d = None
if d == self.ACC:
flags += "/acc"
d = None
elif self.reading:
if c & self.LAST:
r = "read"
else:
r = "read/continue"
if c & self.DISCARD:
flags += "/discard"
else:
if c & self.SAVE:
flags += "/save"
if self.first:
r = "register"
if d & 0x80:
flags += "/auto"
d &= 0x7f
else:
r = "write"
if c & self.SEND:
flags = "/send"+flags
if c & self.ifACK:
flags = "/ifack"+flags
if d is None:
self.output(f"{r}{flags}")
else:
self.output(f"{r}{flags}[0x{d:02x}]")
def cmdh(self, m, s, buf=False, rel=False):
self._n += 1
self.psize = s+1
self.pmagic = 0x12c0+m
flags = ""
if rel:
flags += "/release"
if buf:
flags += "/buffer"
self.output(f"header{flags}[{m},{s}]")
def cmdm(self, ms=0, restart=False, rel=False):
self._n += 1
flags = ""
if rel:
flags += "/release"
if restart:
flags += "/loop"
self.output(f"wait{flags}[{ms}]")
def cmda(self, a, n=0):
self._n = n
self._a = a
if n:
self.output(f"run[0x{a:02x},{n}]")
else:
self.output(f"addr[0x{a:02x}]")
class ACC(i2c):
name = "ACC"
SAD = i2c.ACC
I_am = 0x33
REG = {
"STATUS_AUX": 0x07,
"ADC1": 0x08,
"ADC2": 0x0a,
"TEMP": 0x0c,
"WHO": 0x0f,
"CTRL0": 0x1e,
"TEMP_CFG": 0x1f,
"CTRL1": 0x20,
"CTRL2": 0x21,
"CTRL3": 0x22,
"CTRL4": 0x23,
"CTRL5": 0x24,
"CTRL6": 0x25,
"REF": 0x26,
"STATUS": 0x27,
"X": 0x28,
"Y": 0x2a,
"Z": 0x2c,
"FIFO_CTRL": 0x2e,
"FIFO_SRC": 0x2f,
"INT1_CFG": 0x30,
"INT1_SRC": 0x31,
"INT1_THS": 0x32,
"INT1_DUR": 0x33,
"INT2_CFG": 0x34,
"INT2_SRC": 0x35,
"INT2_THS": 0x36,
"INT2_DUR": 0x37,
"CLICK_CFG": 0x38,
"CLICK_SRC": 0x39,
"CLICK_THS": 0x3a,
"TIME_LIMIT": 0x3b,
"TIME_LAT": 0x3c,
"TIME_WIN": 0x3d,
"ACT_THS": 0x3e,
"ACT_DUR": 0x3f,
}
CONF = {
"TEMP_CFG": 0xc0, # TEMP_EN enabled
"CTRL1": 0x27, # X,Y,Z enabled, ODR=10 Hz
"CTRL4": 0x88, # BDU (needed for ADC?) HR mode (12-bit)
"CTRL5": 0x40, # FIFO enable
"FIFO_CTRL": 0x94, # stream mode, FTH=20
}
def b2g(self, x):
if x & 0x8000:
x -= 0x10000
return round(x/0x4000, 4)
def read_g(self, n=32, what="g"):
d = self.readv("STATUS", 1+n*6)
if what == "raw":
return d
g = [[f"{x:04x}" for x in d[:3]]]
for i in range(n):
g.append([self.b2g(x) for x in d[3+3*i:6+3*i]])
if what=="g":
return g
return sphere(numpy.array(g[1:]))
def read_adcs(self):
"""
* https://github.com/STMicroelectronics/lis3dh-pid
* Data Format:
* Outputs are Left Justified in 2 complements
* range 800mV
* code zero means an analogue value of about 1.2V
* Voltage values smaller than centre values are positive
* (Example: 800mV = 7Fh / 127 dec)
* Voltage values bigger than centre values are negative
* (Example: 1600mV = 80h / -128 dec)
"""
return self.readv("STATUS_AUX")
class MAG(i2c):
name = "MAG"
SAD = i2c.MAG
I_am = 0x3d
REG = {
"OFFSET_X": 0x05,
"OFFSET_Y": 0x07,
"OFFSET_Z": 0x09,
"WHO": 0x0f,
"CTRL1": 0x20,
"CTRL2": 0x21,
"CTRL3": 0x22,
"CTRL4": 0x23,
"CTRL5": 0x24,
"STATUS": 0x27,
"X": 0x28,
"Y": 0x2a,
"Z": 0x2c,
"TEMP": 0x2e,
"INT_CFG": 0x30,
"INT_SRC": 0x31,
"INT_THS": 0x32,
}
CONF = {
"CTRL1": 0xfc, # TEMP enabled, UHP, ODR=80 Hz
"CTRL4": 0x0c, # UHP-Z
"CTRL3": 0x00, # continuous mode
"CTRL5": 0x40, # BDU
}