summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjaseg <git@jaseg.net>2018-04-21 16:22:12 +0200
committerjaseg <git@jaseg.net>2018-04-21 16:22:12 +0200
commit231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd (patch)
tree7ea1fbe05b6dfb75e9cca0eec37f2947a04bea3a
parent0c5b35fa3eecbe8c0035fdc5ca608a5c510df165 (diff)
downloadolsndot-231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd.tar.gz
olsndot-231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd.tar.bz2
olsndot-231ce4b2a22d2895ee0bf6db4536cc2a56dbbcbd.zip
Add python driver for new comms protocol
-rw-r--r--firmware/olsndot.py126
1 files changed, 126 insertions, 0 deletions
diff --git a/firmware/olsndot.py b/firmware/olsndot.py
new file mode 100644
index 0000000..016ac3a
--- /dev/null
+++ b/firmware/olsndot.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python3
+
+import serial
+import struct
+from cobs import cobs
+from collections import namedtuple
+from enum import Enum
+import time
+
+EOP = b'\0'
+
+#def cobs_encode(data):
+# return b''.join(bytes([len(x)+1]) + x for x in data.split(EOP))
+#
+#def cobs_decode(data):
+# out = b''
+# while data:
+# n, *rest = data
+# out += b'\0' + bytes(rest[:n-1])
+# data = rest[n-1:]
+# return out[1:]
+
+def address_pkt(addr):
+ return struct.pack('<I', addr)
+
+def framebuf_pkt(data):
+ return struct.pack('<32I', data)
+
+class Driver:
+ type_drivers = {}
+
+ def __init__(self, port, devices=None, addrs=None, baudrate=500000, timeout=0.100):
+ self._ser = serial.Serial(port, baudrate)
+ self._ser.write(b'\0')
+ self._ser.flushInput()
+ self.timeout = timeout
+ if addrs is not None:
+ self.nodes = [ type_drivers[device_type](addr, self) for addr, device_type in addrs ]
+ if devices is not None:
+ for dev in devices:
+ dev.driver = self
+ self.nodes = devices
+
+ def probe_devices(self):
+ while True:
+ self._ser.write(b'\x00')
+ addr, device_type = self.recv_struct('IB', timeout=0.010)
+ yield addr, device_type, self.type_drivers.get[device_type]
+ time.sleep(0.010)
+
+ def recv_struct(self, fmt, timeout=None):
+ self._ser.timeout = self.timeout if timeout is None else timeout
+ data = self._ser.read_until(b'\0')
+ return struct.unpack('<'+fmt, cobs.decode(data[:-1]))
+
+ def send_struct(self, fmt, *args):
+ data = struct.pack('<'+fmt, *args)
+ self._ser.write(cobs.encode(data)+EOP)
+
+ @classmethod
+ def register_device(drv_kls, device_type):
+ def wrapper(dev_kls):
+ drv_kls.type_drivers[device_type] = dev_kls
+ return dev_kls
+ return wrapper
+
+@Driver.register_device(device_type=0x01)
+class Olsndot:
+ CMD_READ_STATUS = 0x01
+
+ class ColorSpec(Enum):
+ white = 0X00
+ single_color = 0X01
+ rgb = 0X02
+ rgbw = 0X03
+ cold_warm_white = 0X04
+ wwa = 0x05
+
+ def __init__(self, addr, driver=None):
+ self.addr = addr
+ self._driver = driver
+
+ @property
+ def driver(self):
+ return self._driver
+
+ @driver.setter
+ def driver(self, driver):
+ self._driver = driver
+ self.fetch_status()
+
+ def send_framebuf(self, data):
+ self._driver.send_struct('I', self.addr)
+ self._driver.send_struct('{}{}'.format(self.nchannels, self.channel_spec), *data)
+
+ def send_cmd(self, cmd):
+ self._driver.send_struct('I', self.addr)
+ self._driver.send_struct('B', cmd)
+
+ Status = namedtuple('Status',
+ ['uptime_s', 'uart_overruns', 'frame_overruns', 'invalid_frames', 'vcc_mv', 'temp_celsius'])
+ def fetch_status(self):
+ self.send_cmd(Olsndot.CMD_READ_STATUS)
+ (self.fw_ver, self.hw_ver,
+ self.nbits, chspec, cs, self.nchannels,
+ uptime_s,
+ uart_overruns, frame_overruns, invalid_frames,
+ vcc_mv, temp_celsius) = self._driver.recv_struct('BBBBBHIIIIHH')
+ self.color_spec = Olsndot.ColorSpec(cs)
+ self.channel_spec = chr(chspec)
+ return Olsndot.Status(uptime_s, uart_overruns, frame_overruns, invalid_frames, vcc_mv, temp_celsius)
+
+ def __str__(self):
+ st = self.fetch_status()
+ return '<Olsndot@{} {}.{} {}ch*{} up {}s vcc {:4.3}V temp {}C>'.format(
+ self.addr, self.fw_ver, self.hw_ver, self.nchannels, self.channel_format,
+ st.uptime_s, st.vcc_mv/1000, st.temp_celsius)
+
+ @property
+ def channel_format(self):
+ return '{}{}'.format(self.color_spec.name, self.nbits)
+
+if __name__ == '__main__':
+ d = Driver('/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A50285BI-if00-port0')
+ for addr, tid, drv in d.probe_devices():
+ print(addr, tid, drv)