summaryrefslogtreecommitdiff
path: root/doc/olsndot.py
blob: 557226a62ac993f882222d709ea7b9a541c2d75c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3

import serial
import struct
from cobs import cobs
from collections import namedtuple
from enum import Enum
import time

EOP = b'\0'

#def cobs_encode(data):
#    return b''.join(bytes([len(x)+1]) + x for x in data.split(EOP))
#
#def cobs_decode(data):
#    out = b''
#    while data:
#        n, *rest = data
#        out += b'\0' + bytes(rest[:n-1])
#        data = rest[n-1:]
#    return out[1:]

def address_pkt(addr):
    return struct.pack('<I', addr)

def framebuf_pkt(data):
    return struct.pack('<32I', data)

class Driver:
    type_drivers = {}

    def __init__(self, port, devices=None, addrs=None, baudrate=500000, timeout=0.100):
        self._ser = serial.Serial(port, baudrate)
        self._ser.write(b'\0')
        self._ser.flushInput()
        self.timeout = timeout
        if addrs is not None:
            self.nodes = [ type_drivers[device_type](addr, self) for addr, device_type in addrs ]
        if devices is not None:
            for dev in devices:
                dev.driver = self
            self.nodes = devices

    def probe_devices(self):
        while True:
            self._ser.write(b'\x00')
            addr, device_type = self.recv_struct('IB', timeout=0.010)
            yield addr, device_type, self.type_drivers.get[device_type]
            time.sleep(0.010)

    def recv_struct(self, fmt, timeout=None):
        self._ser.timeout = self.timeout if timeout is None else timeout
        data = self._ser.read_until(b'\0')
        return struct.unpack('<'+fmt, cobs.decode(data[:-1]))

    def send_struct(self, fmt, *args):
        data = struct.pack('<'+fmt, *args)
        self._ser.write(cobs.encode(data)+EOP)

    @classmethod
    def register_device(drv_kls, device_type):
        def wrapper(dev_kls):
            drv_kls.type_drivers[device_type] = dev_kls
            return dev_kls
        return wrapper

@Driver.register_device(device_type=0x01)
class Olsndot:
    CMD_READ_STATUS = 0x01

    class ColorSpec(Enum):
        white            = 0X00
        single_color     = 0X01
        rgb              = 0X02
        rgbw             = 0X03
        cold_warm_white  = 0X04
        wwa              = 0x05

    def __init__(self, addr, driver=None):
        self.addr = addr
        self._driver = driver

    @property
    def driver(self):
        return self._driver

    @driver.setter
    def driver(self, driver):
        self._driver = driver
        self.fetch_status()

    def send_framebuf(self, data):
        self._driver.send_struct('I', self.addr)
        self._driver.send_struct('{}{}'.format(self.nchannels, self.channel_spec), *data)

    def send_cmd(self, cmd):
        self._driver.send_struct('I', self.addr)
        self._driver.send_struct('B', cmd)

    Status = namedtuple('Status',
            ['uptime_s', 'uart_overruns', 'frame_overruns', 'invalid_frames', 'vcc_mv', 'temp_celsius'])
    def fetch_status(self):
        self.send_cmd(Olsndot.CMD_READ_STATUS)
        (self.fw_ver, self.hw_ver,
        self.nbits, chspec, cs, self.nchannels,
        uptime_s,
        uart_overruns, frame_overruns, invalid_frames,
        vcc_mv, temp_celsius) = self._driver.recv_struct('BBBBBHIIIIHH')
        self.color_spec = Olsndot.ColorSpec(cs)
        self.channel_spec = chr(chspec)
        return Olsndot.Status(uptime_s, uart_overruns, frame_overruns, invalid_frames, vcc_mv, temp_celsius)

    def __str__(self):
        st = self.fetch_status()
        return '<Olsndot {}.{}@{} {}ch*{} up {}s vcc {:4.3}V temp {}C>'.format(
                self.fw_ver, self.hw_ver, self.addr, self.nchannels, self.channel_format,
                st.uptime_s, st.vcc_mv/1000, st.temp_celsius)

    @property
    def channel_format(self):
        return '{}{}'.format(self.color_spec.name, self.nbits)

if __name__ == '__main__':
    d = Driver('/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A50285BI-if00-port0')
    for addr, tid, drv in d.probe_devices():
        print(addr, tid, drv)