#define PY_SSIZE_T_CLEAN #include "Python.h" #include #include #include #ifndef DEBUG # define DEBUG 1 #endif #define dprintf(lvl, fmt...) if (DEBUG >= lvl) fprintf(stderr, "spidev: " fmt) static struct ssp_job { int fd; Py_ssize_t count; unsigned char *command; unsigned int flags; } job = { .fd = -1 }; #define INVERT_BITS_OUT 0x00001000 static int set_job_fd(struct ssp_job *j, int fd) { if (fd >= 0) { j->fd = fd; dprintf(2, "fd = %d\n", fd); } else if (j->fd < 0) { PyErr_SetString(PyExc_ValueError, "No file descriptor"); return -1; } return 0; } static unsigned char buf[4096]; #define MAX_TRANSFER ((Py_ssize_t)sizeof(buf)) static inline __u32 bit_reverse(__u32 i) { return (i>>7) & 0x01010101 | (i>>5) & 0x02020202 | (i>>3) & 0x04040404 | (i>>1) & 0x08080808 | (i<<1) & 0x10101010 | (i<<3) & 0x20202020 | (i<<5) & 0x40404040 | (i<<7) & 0x80808080 ; } static inline void bit_reverse_copy(void *d, void *s, int n) { n = (n+3)>>2; __u32 *dd=d, *ss=s; while (n--) *(dd++) = bit_reverse(*(ss++)); } static void hexdump(FILE *f, const char *header, const unsigned char *b, Py_ssize_t n) { fprintf(stderr, "%s", header); for (int i=0; icommand; transfer.rx_buf = (__u64) buf; if (j->count > MAX_TRANSFER) n = MAX_TRANSFER; else n = j->count; transfer.len = n; if (j->flags & INVERT_BITS_OUT) { bit_reverse_copy(buf, j->command, n); transfer.tx_buf = (__u64) buf; } else if (DEBUG >= 3) memset(buf, 0x5a, MAX_TRANSFER); if (DEBUG >= 4 || n <= 128 && DEBUG >= 3) hexdump(stderr, "TX:", j->command, n); if (n && ioctl(j->fd, SPI_IOC_MESSAGE(1), &transfer) < 0) return -1; if (DEBUG>=4 || n<=128 && DEBUG>=3) hexdump(stderr, "RX:", buf, n); dprintf(2, "tranfered %d bytes, flags 0x%x\n", n, j->flags); j->count -= n; j->command += n; return 0; } static PyObject *set_mode(PyObject *self, PyObject *args, PyObject *kw) { static char *arg_names[] = {"mode", "cpha", "cpol", "fd", NULL}; int mode = 0, cpha = 0, cpol = 0, fd = -1; if (!PyArg_ParseTupleAndKeywords(args, kw, "|ippi", arg_names, &mode, &cpha, &cpol, &fd)) return NULL; if (set_job_fd(&job, fd)) return NULL; if (cpha) mode |= SPI_CPHA; if (cpol) mode |= SPI_CPOL; __u8 val = mode & (SPI_CPHA|SPI_CPOL); if (ioctl(job.fd, SPI_IOC_WR_MODE, &val) < 0) { PyErr_SetFromErrno(PyExc_IOError); return NULL; } dprintf(1, "CPHA: 0x%lx CPOL: 0x%lx\n", mode & SPI_CPHA, mode & SPI_CPOL); Py_RETURN_NONE; } static PyObject *set_lsb_first(PyObject *self, PyObject *args, PyObject *kw) { static char *arg_names[] = {"mode", "fd", NULL}; int mode = 0, fd = -1; if (!PyArg_ParseTupleAndKeywords(args, kw, "|pi", arg_names, &mode, &fd)) return NULL; if (set_job_fd(&job, fd)) return NULL; __u8 val = mode; if (ioctl(job.fd, SPI_IOC_WR_LSB_FIRST, &val) < 0) { PyErr_SetFromErrno(PyExc_IOError); return NULL; } dprintf(1, "lsb first mode %d\n", mode); Py_RETURN_NONE; } static PyObject *set_speed(PyObject *self, PyObject *args, PyObject *kw) { static char *arg_names[] = {"hz", "fd", NULL}; int hz = 0, fd = -1; if (!PyArg_ParseTupleAndKeywords(args, kw, "|ii", arg_names, &hz, &fd)) return NULL; if (set_job_fd(&job, fd)) return NULL; if (ioctl(job.fd, SPI_IOC_WR_MAX_SPEED_HZ, &hz) < 0) { PyErr_SetFromErrno(PyExc_IOError); return NULL; } dprintf(1, "speed %d Hz\n", hz); Py_RETURN_NONE; } static PyObject *sync_transfer(PyObject *self, PyObject *args, PyObject *kw) { static char *arg_names[] = {"cmd", "csize", "rsize", "flags", "fd", NULL}; Py_buffer cmd; Py_ssize_t csize = 0, rsize=MAX_TRANSFER; int fd=-1, flags=0; if (!PyArg_ParseTupleAndKeywords(args, kw, "y*|nnii", arg_names, &cmd, &csize, &rsize, &flags, &fd)) return NULL; if (set_job_fd(&job, fd)) goto error; if (rsize>MAX_TRANSFER) { PyErr_SetString(PyExc_ValueError, "resp request too large"); goto error; } if (csize && csize > cmd.len) { PyErr_SetString(PyExc_ValueError, "cmd too short"); goto error; } if (!csize) csize = cmd.len; job.command = cmd.buf; job.flags = flags; int vsize = 0; while (csize) { vsize = csize; if (csize>MAX_TRANSFER) if (csize-MAX_TRANSFER < rsize) job.count = csize - rsize; else job.count = MAX_TRANSFER; else job.count = csize; csize -= job.count; if (do_spi_transfer(&job)<0) { PyErr_SetFromErrno(PyExc_IOError); error: PyBuffer_Release(&cmd); return NULL; } } PyBuffer_Release(&cmd); if (rsize > vsize) rsize = vsize; return Py_BuildValue("y#", buf+vsize-rsize, rsize); } static PyMethodDef methods[] = { {"set_mode", (PyCFunction)set_mode, METH_VARARGS | METH_KEYWORDS, "Set clock mode cpol and cpha."}, {"set_speed", (PyCFunction)set_speed, METH_VARARGS | METH_KEYWORDS, "Set max speed in Hz."}, {"set_lsb_first", (PyCFunction)set_lsb_first, METH_VARARGS | METH_KEYWORDS, "Set LSB first mode."}, {"sync_transfer", (PyCFunction)sync_transfer, METH_VARARGS | METH_KEYWORDS, "Synchronous write/read"}, {NULL, NULL, 0, NULL} /* Sentinel */ }; static PyModuleDef module_def = { .m_base = PyModuleDef_HEAD_INIT, .m_name = "spidev", .m_doc = "RPiRENA spi interface", .m_size = -1, .m_methods = methods, }; PyMODINIT_FUNC PyInit_spidev(void) { PyObject *m = PyModule_Create(&module_def); PyModule_AddIntMacro(m, INVERT_BITS_OUT); return m; }