aboutsummaryrefslogtreecommitdiff
path: root/fw/tools/test.py
blob: 63cb324d8ca52b73525f36d815659fda5bdebd94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
import serial
import struct
from itertools import takewhile

def chunked(data, chunk_size):
    for i in range(0, len(data), chunk_size):
        yield data[i:i+chunk_size]

def frame_packet(data):
    if len(data) > 254:
        raise ValueError('Input too long')
    out = b''
    for run in data.split(b'\0'):
        out += bytes([len(run)+1])
        out += run
    out += b'\0'
    return out

def format_packet(data):
    out = b''
    for a, b, c, d, e, f, g, h in chunked(data, 8):
        ah, bh, ch, dh = a>>8,   b>>8,   c>>8,   d>>8
        eh, fh, gh, hh = e>>8,   f>>8,   g>>8,   h>>8
        al, bl, cl, dl = a&0xff, b&0xff, c&0xff, d&0xff
        el, fl, gl, hl = e&0xff, f&0xff, g&0xff, h&0xff
        # FIXME check order of high bits
        out += bytes([al, bl, cl, dl, el, fl, gl, hl,
            (ah<<6 | bh<<4 | ch<<2 | dh<<0)&0xff,
            (eh<<6 | fh<<4 | gh<<2 | hh<<0)&0xff])
    out += bytes([1, 0, 0, 0]) # global intensity
    return out

def chariter(ser):
    while True:
        yield ser.read(1)

def read_frame(ser):
    return b''.join(takewhile(lambda c: c and c[0], chariter(ser)))

def unstuff(data):
    out = b''
    while data:
        stuff = data[0]
        if out:
            out += b'\0'
        out += data[1:stuff]
        data = data[stuff:]
    return out

def receive_frame(ser):
    return unstuff(read_frame(ser))

def mac_frame(mac):
    return frame_packet(struct.pack('<I', mac))

def send_framebuffer(ser, mac, frame):
    formatted = format_packet(frame)
    framed = mac_frame(mac) + frame_packet(formatted[:162]) + frame_packet(formatted[162:])
    ser.write(framed)

def discover_macs(ser, count=20):
    found_macs = []
    while True:
        ser.flushInput()
        ser.write(b'\0')
        frame = receive_frame(ser)
        if len(frame) == 4:
            mac, = struct.unpack('<I', frame)
            if mac not in found_macs:
                print('Discovered new MAC: {:2} {:08x}'.format(len(found_macs), mac))
                found_macs.append(mac)
                if len(found_macs) == count:
                    return found_macs
        elif len(frame) != 0:
            print('Invalid frame of length {}:'.format(len(frame)), frame)
        time.sleep(0.05)

def parse_status_frame(frame):
    print('frame len:', len(frame))
    if not frame:
        return None
    (   firmware_version, 
        hardware_version, 
        digit_rows, 
        digit_cols, 
        uptime_s, 
        framerate_millifps, 
        uart_overruns, 
        frame_overruns, 
        invalid_frames, 
        vcc_mv, 
        temp_celsius, 
        nbits ) = struct.unpack('<4B5IhhB', frame)
    del frame
    return locals()

def fetch_status(ser, mac):
    ser.flushInput()
    ser.write(mac_frame(mac))
    ser.write(frame_packet(b'\x01'))
    return parse_status_frame(receive_frame(ser))

if __name__ == '__main__':
    import argparse
    import time
    from binascii import hexlify
    
    parser = argparse.ArgumentParser()
    parser.add_argument('serial')
    args = parser.parse_args()

    ser = serial.Serial(args.serial, 2000000, timeout=0.05)

    frame_len = 4*8*8
    black, red = [0]*frame_len, [255]*frame_len
    frames = \
            [black]
            #[[0]*i + [255]*(256-i) for i in range(257)]
            #[[(i + d)%256 for d in range(frame_len)] for i in range(256)]
            #[black]*10 +\
            #[red]*10 +\
            #[[i]*frame_len for i in range(256)] +\
            #[[(i + (d//8)*8) % 256*8 for d in range(frame_len)] for i in range(256)]

    #frames = [red, black]*5
    #frames = [ x for l in [[([0]*i+[255]+[0]*(7-i))*32]*2 for i in range(8)] for x in l ]
    found_macs = [0xdeadbeef] #discover_macs(ser, 1)
    mac, = found_macs

    import pprint
    while True:
        try:
            pprint.pprint(fetch_status(ser, mac))
        except e:
            print(e)
        for i, frame in enumerate(frames):
            send_framebuffer(ser, mac, frame)
            time.sleep(0.1)
        # to produce framing errors: ser.write(b'\02a\0')