diff options
author | jaseg <git@jaseg.net> | 2018-04-21 16:22:12 +0200 |
---|---|---|
committer | jaseg <git@jaseg.net> | 2018-04-21 16:22:12 +0200 |
commit | 231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd (patch) | |
tree | 7ea1fbe05b6dfb75e9cca0eec37f2947a04bea3a | |
parent | 0c5b35fa3eecbe8c0035fdc5ca608a5c510df165 (diff) | |
download | olsndot-231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd.tar.gz olsndot-231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd.tar.bz2 olsndot-231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd.zip |
Add python driver for new comms protocol
-rw-r--r-- | firmware/olsndot.py | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/firmware/olsndot.py b/firmware/olsndot.py new file mode 100644 index 0000000..016ac3a --- /dev/null +++ b/firmware/olsndot.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 + +import serial +import struct +from cobs import cobs +from collections import namedtuple +from enum import Enum +import time + +EOP = b'\0' + +#def cobs_encode(data): +# return b''.join(bytes([len(x)+1]) + x for x in data.split(EOP)) +# +#def cobs_decode(data): +# out = b'' +# while data: +# n, *rest = data +# out += b'\0' + bytes(rest[:n-1]) +# data = rest[n-1:] +# return out[1:] + +def address_pkt(addr): + return struct.pack('<I', addr) + +def framebuf_pkt(data): + return struct.pack('<32I', data) + +class Driver: + type_drivers = {} + + def __init__(self, port, devices=None, addrs=None, baudrate=500000, timeout=0.100): + self._ser = serial.Serial(port, baudrate) + self._ser.write(b'\0') + self._ser.flushInput() + self.timeout = timeout + if addrs is not None: + self.nodes = [ type_drivers[device_type](addr, self) for addr, device_type in addrs ] + if devices is not None: + for dev in devices: + dev.driver = self + self.nodes = devices + + def probe_devices(self): + while True: + self._ser.write(b'\x00') + addr, device_type = self.recv_struct('IB', timeout=0.010) + yield addr, device_type, self.type_drivers.get[device_type] + time.sleep(0.010) + + def recv_struct(self, fmt, timeout=None): + self._ser.timeout = self.timeout if timeout is None else timeout + data = self._ser.read_until(b'\0') + return struct.unpack('<'+fmt, cobs.decode(data[:-1])) + + def send_struct(self, fmt, *args): + data = struct.pack('<'+fmt, *args) + self._ser.write(cobs.encode(data)+EOP) + + @classmethod + def register_device(drv_kls, device_type): + def wrapper(dev_kls): + drv_kls.type_drivers[device_type] = dev_kls + return dev_kls + return wrapper + +@Driver.register_device(device_type=0x01) +class Olsndot: + CMD_READ_STATUS = 0x01 + + class ColorSpec(Enum): + white = 0X00 + single_color = 0X01 + rgb = 0X02 + rgbw = 0X03 + cold_warm_white = 0X04 + wwa = 0x05 + + def __init__(self, addr, driver=None): + self.addr = addr + self._driver = driver + + @property + def driver(self): + return self._driver + + @driver.setter + def driver(self, driver): + self._driver = driver + self.fetch_status() + + def send_framebuf(self, data): + self._driver.send_struct('I', self.addr) + self._driver.send_struct('{}{}'.format(self.nchannels, self.channel_spec), *data) + + def send_cmd(self, cmd): + self._driver.send_struct('I', self.addr) + self._driver.send_struct('B', cmd) + + Status = namedtuple('Status', + ['uptime_s', 'uart_overruns', 'frame_overruns', 'invalid_frames', 'vcc_mv', 'temp_celsius']) + def fetch_status(self): + self.send_cmd(Olsndot.CMD_READ_STATUS) + (self.fw_ver, self.hw_ver, + self.nbits, chspec, cs, self.nchannels, + uptime_s, + uart_overruns, frame_overruns, invalid_frames, + vcc_mv, temp_celsius) = self._driver.recv_struct('BBBBBHIIIIHH') + self.color_spec = Olsndot.ColorSpec(cs) + self.channel_spec = chr(chspec) + return Olsndot.Status(uptime_s, uart_overruns, frame_overruns, invalid_frames, vcc_mv, temp_celsius) + + def __str__(self): + st = self.fetch_status() + return '<Olsndot@{} {}.{} {}ch*{} up {}s vcc {:4.3}V temp {}C>'.format( + self.addr, self.fw_ver, self.hw_ver, self.nchannels, self.channel_format, + st.uptime_s, st.vcc_mv/1000, st.temp_celsius) + + @property + def channel_format(self): + return '{}{}'.format(self.color_spec.name, self.nbits) + +if __name__ == '__main__': + d = Driver('/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A50285BI-if00-port0') + for addr, tid, drv in d.probe_devices(): + print(addr, tid, drv) |