diff options
Diffstat (limited to 'firmware/olsndot.py')
-rw-r--r-- | firmware/olsndot.py | 126 |
1 files changed, 0 insertions, 126 deletions
diff --git a/firmware/olsndot.py b/firmware/olsndot.py deleted file mode 100644 index 557226a..0000000 --- a/firmware/olsndot.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python3 - -import serial -import struct -from cobs import cobs -from collections import namedtuple -from enum import Enum -import time - -EOP = b'\0' - -#def cobs_encode(data): -# return b''.join(bytes([len(x)+1]) + x for x in data.split(EOP)) -# -#def cobs_decode(data): -# out = b'' -# while data: -# n, *rest = data -# out += b'\0' + bytes(rest[:n-1]) -# data = rest[n-1:] -# return out[1:] - -def address_pkt(addr): - return struct.pack('<I', addr) - -def framebuf_pkt(data): - return struct.pack('<32I', data) - -class Driver: - type_drivers = {} - - def __init__(self, port, devices=None, addrs=None, baudrate=500000, timeout=0.100): - self._ser = serial.Serial(port, baudrate) - self._ser.write(b'\0') - self._ser.flushInput() - self.timeout = timeout - if addrs is not None: - self.nodes = [ type_drivers[device_type](addr, self) for addr, device_type in addrs ] - if devices is not None: - for dev in devices: - dev.driver = self - self.nodes = devices - - def probe_devices(self): - while True: - self._ser.write(b'\x00') - addr, device_type = self.recv_struct('IB', timeout=0.010) - yield addr, device_type, self.type_drivers.get[device_type] - time.sleep(0.010) - - def recv_struct(self, fmt, timeout=None): - self._ser.timeout = self.timeout if timeout is None else timeout - data = self._ser.read_until(b'\0') - return struct.unpack('<'+fmt, cobs.decode(data[:-1])) - - def send_struct(self, fmt, *args): - data = struct.pack('<'+fmt, *args) - self._ser.write(cobs.encode(data)+EOP) - - @classmethod - def register_device(drv_kls, device_type): - def wrapper(dev_kls): - drv_kls.type_drivers[device_type] = dev_kls - return dev_kls - return wrapper - -@Driver.register_device(device_type=0x01) -class Olsndot: - CMD_READ_STATUS = 0x01 - - class ColorSpec(Enum): - white = 0X00 - single_color = 0X01 - rgb = 0X02 - rgbw = 0X03 - cold_warm_white = 0X04 - wwa = 0x05 - - def __init__(self, addr, driver=None): - self.addr = addr - self._driver = driver - - @property - def driver(self): - return self._driver - - @driver.setter - def driver(self, driver): - self._driver = driver - self.fetch_status() - - def send_framebuf(self, data): - self._driver.send_struct('I', self.addr) - self._driver.send_struct('{}{}'.format(self.nchannels, self.channel_spec), *data) - - def send_cmd(self, cmd): - self._driver.send_struct('I', self.addr) - self._driver.send_struct('B', cmd) - - Status = namedtuple('Status', - ['uptime_s', 'uart_overruns', 'frame_overruns', 'invalid_frames', 'vcc_mv', 'temp_celsius']) - def fetch_status(self): - self.send_cmd(Olsndot.CMD_READ_STATUS) - (self.fw_ver, self.hw_ver, - self.nbits, chspec, cs, self.nchannels, - uptime_s, - uart_overruns, frame_overruns, invalid_frames, - vcc_mv, temp_celsius) = self._driver.recv_struct('BBBBBHIIIIHH') - self.color_spec = Olsndot.ColorSpec(cs) - self.channel_spec = chr(chspec) - return Olsndot.Status(uptime_s, uart_overruns, frame_overruns, invalid_frames, vcc_mv, temp_celsius) - - def __str__(self): - st = self.fetch_status() - return '<Olsndot {}.{}@{} {}ch*{} up {}s vcc {:4.3}V temp {}C>'.format( - self.fw_ver, self.hw_ver, self.addr, self.nchannels, self.channel_format, - st.uptime_s, st.vcc_mv/1000, st.temp_celsius) - - @property - def channel_format(self): - return '{}{}'.format(self.color_spec.name, self.nbits) - -if __name__ == '__main__': - d = Driver('/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A50285BI-if00-port0') - for addr, tid, drv in d.probe_devices(): - print(addr, tid, drv) |