import sys, time, numpy def _connect(ifc): i2c.ifc = ifc class I2CError(IOError): pass def sphere(v): r = numpy.empty_like(v) r[:,0] = (v**2).sum(-1) r[:,1] = numpy.arctan2((v[:,:2]**2).sum(-1), v[:,2]) r[:,2] = numpy.arctan2(v[:,1], v[:,0]) return r.round(3) class i2c: NOM = 0 START = 1 LAST = 2 STOP = START | LAST ifACTIVE = 4 RESTART = ifACTIVE | START ifACK = 8 RESUME = 0x10 SAVE = 0x20 DISCARD = SAVE UNSTUCK = STOP | SAVE SEND = 0x40 def _connect(self, ifc): self.ifc = ifc def cmdi(self, c, d=0): self._log(5, f"I²C[{self._n+self._a:3d}]: data(0x{c:02x}, 0x{d:02x})") self._n += 1 return self.ifc.reg(0x33, (c<<8) | d) def cmdh(self, m, s, buf=False, rel=False): self._log(5, f"I²C[{self._n+self._a:3d}]: head({m}, {s}, {buf}, {rel})") self._n += 1 self.psize = s+1 self.pmagic = 0x12c0+m return self.ifc.reg(0x33, 0x8000 | (rel<<13) | (buf<<12) | (m<<8) | s) def cmdm(self, ms=0, restart=False, rel=False): self._log(5, f"I²C[{self._n+self._a:3d}]: wait({ms}, {restart}, {rel})") self._n += 1 return self.ifc.reg(0x33, 0xc000 | (rel<<13) | (restart<<12) | ms) def cmda(self, a, n=0): self._log(5, f"I²C addr({a}, {n})") self._n = n self._a = a return self.ifc.reg(0x32, (n<<8) | a) def reset(self): self._a = 0 self._n = 0 self._log(5, "I²C reset") return self.ifc.reg(7, 0x1000) def status(self): a = self.ifc.reg(0x32) return a&0xff, a>>8 def readpacket(self, magic=None, size=None): if magic is not None: self.pmagic = magic if size is not None: self.psize = size self._d = self.ifc.readfifo(1, self.pmagic, 0xffff, self.psize) if self._d[0] != self.pmagic: raise I2CError(f"magic mismatch {self.d[0]:04x} expect {self.pmagic:04x}") self._log(4, f"I2C readpacket: magic={self._d[0]:04x}, len={len(self._d)}") return self._d ACC = 0x32 # 8 bits, SA0 = 1 MAG = 0x3c # 8 bits, SA1 = 1 def conf1(s, k, v): s._log(3, f"I²C {s._a:03d}: conf[{s.name}/{k}] = 0x{v:02x}") s.cmdi(s.START, s.SAD) s.cmdi(s.NOM, s.REG[k]) s.cmdi(s.NOM, v) s.cmdi(s.STOP) return 4 def conf(self, a=0): self.cmda(a) s = 0 for k, v in self.CONF.items(): s += self.conf1(k, v) self._log(2, f"I²C conf({self.name}), len={s}") self.cmda(a, s) def readi(s, k): s.cmdi(s.START, s.SAD) s.cmdi(s.SAVE, s.REG[k]) s.cmdi(s.START, s.SAD + 1) s.cmdi(s.LAST, 0) s.cmdi(s.STOP) return 5 def read1(s, k): s.cmda(0) s.cmdh(s.I_am & 0b1111, 2) s.cmda(0, s.readi(k)+1) time.sleep(0.01) s._v = s.readpacket()[-1] >> 8 s._log(3, f"I²C read({s.name}/{k}) = 0x{s._v:04x}") return s._v def verify(self): self.cmda(0) self.cmdh(self.I_am & 0b1111, 1 + len(self.CONF)) s = 1 for k in self.CONF: s += self.readi(k) self.cmda(0, s) time.sleep(0.1) self.readpacket() errors = 0 for i, k in enumerate(self.CONF): x = (self.CONF[k] << 8) | self.REG[k] if self._d[i+2] != x: self._log(1, f"I²C verify MISMATCH {i}: {self.name}/{k}, {x:04x} → {self._d[i+2]:04x}") errors += 1 else: self._log(2, f"I²C verify match {i}: {self.name}/{k}, {x:04x} → {self._d[i+2]:04x}") self._log(2, f"I²C verify({self.name}), len={s}") return errors def readv(s, k, n=7): s.cmda(0) s.cmdh(s.I_am & 0b1111, (3+n)//2) s.cmdi(s.START, s.SAD) s.cmdi(s.SAVE, s.REG[k] | 0x80) s.cmdi(s.START, s.SAD + 1) s.cmdi(s.LAST, n-1) s.cmdi(s.STOP) s.cmda(0, 6) time.sleep(0.001*(10+n)) s.readpacket() vv = " ".join([f"{d:02x}" for d in s._d]) s._log(3, f"I²C vector({s.name}, {k}) = {vv}") return s._d def read_vector(self): return self.readv("STATUS") def who(self, a=None, v=None): if a is None: a = self.SAD v = self.I_am self._who = self.read1("WHO") self._log(1, f"WHO[SAD=0x{a:02x}] = 0x{self._who:02x}") if v is not None and self._who != v: raise I2CError(f"who am I got {self._who:02x} expect {v:02x}") return self._who def _log(self, v, m): if self.verbosity >= v: print(m, file=sys.stderr) verbosity = 5 def test(s): s.cmda(8) s.cmdh(1, 5) s.cmdi(s.START|s.SEND, s.MAG) s.cmdi(s.SAVE, 0x0f) s.cmdi(s.START, s.MAG + 1) s.cmdi(s.LAST, 0) s.cmdi(s.STOP) s.cmdi(s.START|s.SEND, s.ACC) s.cmdi(s.SAVE, 0x0f) s.cmdi(s.START, s.ACC + 1) s.cmdi(s.LAST, 0) s.cmdi(s.STOP) s.cmda(8, 10) def unstuck(s): s.cmda(0) s.cmdh(0, 3) s.cmdi(s.UNSTUCK|s.SEND) s.cmdi(s.STOP|s.SEND) s.cmda(0, 3) def flight_seq(s, a=0): s.cmda(a) s.cmdm(2, rel=True) # release the last packet s.cmdh(5, 132, buf=True, rel=False) # [0:1] single packet, 133 words s.cmdi(s.START, MAG.SAD) s.cmdi(s.SAVE, MAG.REG['STATUS'] | 0x80) s.cmdi(s.RESTART, MAG.SAD | 1) s.cmdi(s.LAST, 6) # [2:6] s.cmdi(s.STOP) s.cmdi(s.START, MAG.SAD) s.cmdi(s.SAVE, MAG.REG['TEMP'] | 0x80) # save will be ignored s.cmdi(s.RESTART, MAG.SAD | 1) s.cmdi(s.LAST, 1) # [6:7] s.cmdi(s.STOP) s.cmdi(s.START, ACC.SAD) s.cmdi(s.SAVE, ACC.REG['STATUS_AUX'] | 0x80) s.cmdi(s.RESTART, ACC.SAD | 1) s.cmdi(s.LAST, 6) # [7:11] s.cmdi(s.STOP) s.cmdm(96) s.cmdi(s.START, MAG.SAD) s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored s.cmdi(s.RESTART, MAG.SAD | 1) s.cmdi(s.LAST, 5) # [11:65] s.cmdi(s.STOP) for n in range(17): s.cmdm(99) s.cmdi(s.START, MAG.SAD) s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored s.cmdi(s.RESTART, MAG.SAD | 1) s.cmdi(s.LAST, 5) # [11:65] s.cmdi(s.STOP) s.cmdi(s.START, ACC.SAD) s.cmdi(s.SAVE, ACC.REG['FIFO_SRC']) s.cmdi(s.RESTART, ACC.SAD | 1) s.cmdi(s.LAST, 0) # [65:66] s.cmdi(s.STOP) s.cmdi(s.START, ACC.SAD) s.cmdi(s.SAVE, ACC.REG['STATUS'] | 0x80) s.cmdi(s.RESTART, ACC.SAD | 1) s.cmdi(s.LAST, 21*6) # words [66:130] s.cmdi(s.STOP) s.cmdm(82) s.cmdi(s.START, MAG.SAD) s.cmdi(s.SAVE, MAG.REG['X'] | 0x80) # save will be ignored s.cmdi(s.RESTART, MAG.SAD | 1) s.cmdi(s.LAST, 5) # words [130:133] s.cmdi(s.STOP) s.cmdm(97, restart=True) s._log(2, f"flight sequence loaded @{s._a}, size {s._n}, end {s.status()[0]}") return (s._a, s._n) class irena(i2c): f = sys.stdout prefix = "@i2c/" postfix = "" # for 2nd FPGA: postfix = "/del[1]" def output(self, r): self.f.write(f"{self.prefix}{r}{self.postfix}\n") def cmdi(self, c, d=0): self._n += 1 flags = "" if c & self.ifACTIVE: flags = "/ifactive" if c & self.START: if c & self.LAST: if c & self.SAVE: r = "unstuck" else: r = "stop" d = None else: if c & self.ifACTIVE: r = "restart" else: r = "start" flags = "" self.reading = d & 1 self.first = True if self.reading: flags += "/read" d &= 0xfe if d == self.MAG: flags += "/mag" d = None if d == self.ACC: flags += "/acc" d = None elif self.reading: if c & self.LAST: r = "read" else: r = "read/continue" if c & self.DISCARD: flags += "/discard" else: if c & self.SAVE: flags += "/save" if self.first: r = "register" if d & 0x80: flags += "/auto" d &= 0x7f else: r = "write" if c & self.SEND: flags = "/send"+flags if c & self.ifACK: flags = "/ifack"+flags if d is None: self.output(f"{r}{flags}") else: self.output(f"{r}{flags}[0x{d:02x}]") def cmdh(self, m, s, buf=False, rel=False): self._n += 1 self.psize = s+1 self.pmagic = 0x12c0+m flags = "" if rel: flags += "/release" if buf: flags += "/buffer" self.output(f"header{flags}[{m},{s}]") def cmdm(self, ms=0, restart=False, rel=False): self._n += 1 flags = "" if rel: flags += "/release" if restart: flags += "/loop" self.output(f"wait{flags}[{ms}]") def cmda(self, a, n=0): self._n = n self._a = a if n: self.output(f"run[0x{a:02x},{n}]") else: self.output(f"addr[0x{a:02x}]") class ACC(i2c): name = "ACC" SAD = i2c.ACC I_am = 0x33 REG = { "STATUS_AUX": 0x07, "ADC1": 0x08, "ADC2": 0x0a, "TEMP": 0x0c, "WHO": 0x0f, "CTRL0": 0x1e, "TEMP_CFG": 0x1f, "CTRL1": 0x20, "CTRL2": 0x21, "CTRL3": 0x22, "CTRL4": 0x23, "CTRL5": 0x24, "CTRL6": 0x25, "REF": 0x26, "STATUS": 0x27, "X": 0x28, "Y": 0x2a, "Z": 0x2c, "FIFO_CTRL": 0x2e, "FIFO_SRC": 0x2f, "INT1_CFG": 0x30, "INT1_SRC": 0x31, "INT1_THS": 0x32, "INT1_DUR": 0x33, "INT2_CFG": 0x34, "INT2_SRC": 0x35, "INT2_THS": 0x36, "INT2_DUR": 0x37, "CLICK_CFG": 0x38, "CLICK_SRC": 0x39, "CLICK_THS": 0x3a, "TIME_LIMIT": 0x3b, "TIME_LAT": 0x3c, "TIME_WIN": 0x3d, "ACT_THS": 0x3e, "ACT_DUR": 0x3f, } CONF = { "TEMP_CFG": 0xc0, # TEMP_EN enabled "CTRL1": 0x27, # X,Y,Z enabled, ODR=10 Hz "CTRL4": 0x88, # BDU (needed for ADC?) HR mode (12-bit) "CTRL5": 0x40, # FIFO enable "FIFO_CTRL": 0x94, # stream mode, FTH=20 } def b2g(self, x): if x & 0x8000: x -= 0x10000 return round(x/0x4000, 4) def read_g(self, n=32, what="g"): d = self.readv("STATUS", 1+n*6) if what == "raw": return d g = [[f"{x:04x}" for x in d[:3]]] for i in range(n): g.append([self.b2g(x) for x in d[3+3*i:6+3*i]]) if what=="g": return g return sphere(numpy.array(g[1:])) def read_adcs(self): """ * https://github.com/STMicroelectronics/lis3dh-pid * Data Format: * Outputs are Left Justified in 2’ complements * range 800mV * code zero means an analogue value of about 1.2V * Voltage values smaller than centre values are positive * (Example: 800mV = 7Fh / 127 dec) * Voltage values bigger than centre values are negative * (Example: 1600mV = 80h / -128 dec) """ return self.readv("STATUS_AUX") class MAG(i2c): name = "MAG" SAD = i2c.MAG I_am = 0x3d REG = { "OFFSET_X": 0x05, "OFFSET_Y": 0x07, "OFFSET_Z": 0x09, "WHO": 0x0f, "CTRL1": 0x20, "CTRL2": 0x21, "CTRL3": 0x22, "CTRL4": 0x23, "CTRL5": 0x24, "STATUS": 0x27, "X": 0x28, "Y": 0x2a, "Z": 0x2c, "TEMP": 0x2e, "INT_CFG": 0x30, "INT_SRC": 0x31, "INT_THS": 0x32, } CONF = { "CTRL1": 0xfc, # TEMP enabled, UHP, ODR=80 Hz "CTRL4": 0x0c, # UHP-Z "CTRL3": 0x00, # continuous mode "CTRL5": 0x40, # BDU }