summaryrefslogtreecommitdiff
path: root/prototype/fw/tools/ser_test.py
blob: 34ef8f2d747e8e2863cf3eeaf90efe77e8ceab9c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env python3

import struct
import itertools
import binascii
import string
import serial
import time
import zlib
import sqlite3

from cobs import cobs

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('port')
    parser.add_argument('-b', '--baudrate', type=int, default=115200)
    parser.add_argument('-p', '--print-hex', action='store_true', default=False)
    parser.add_argument('-d', '--database', default=':memory:')
    parser.add_argument('-q', '--quiet', action='store_true', default=False)
    args = parser.parse_args()

    db = sqlite3.connect(args.database)
    db.execute('''CREATE TABLE IF NOT EXISTS packets (
    run_id INTEGER,
    timestamp_us INTEGER,
    data BLOB)''')

    run_id, = db.execute('SELECT IFNULL(MAX(run_id), 0) FROM packets').fetchone()
    run_id += 1
    print(f'Run ID #{run_id}')

    ser = serial.Serial(args.port, args.baudrate, timeout=0)

    byte_count = 0
    line = b''
    packet = b''
    start_time = time.time()
    seq = 0

    make_color = lambda x: f'\033[38;5;{x}m'
    field_colors = [ make_color(x) for x in [ 48, 48, 48, 48, 220, 220, 220, 220, 207, 207, 207, 207 ] ]
    last_tx = time.time()
    lastc = -10000
    last_seq = None
    while True:
        data = ser.read()
        for c in data:
            if args.print_hex:
                if byte_count == 0:
                    print(f'\033[38;5;244m{time.time() - start_time: 8.3f} \033[0m', end='')

            col = '\033[91m' if c == 0 else '\033[0m' # if c == (lastc - 1) % 256 else '\033[92m')
            if args.print_hex:
                print(f'{col}{c:02x}', end=' ')
            line += bytes([c])
            byte_count += 1
            if c == 0:
                if args.print_hex:
                    print('   ' * (16 - byte_count), end='\033[0m\n')
                byte_count =  16
                try:
                    payload = cobs.decode(packet)
                    if len(payload) > 4:
                        crc = zlib.crc32(payload[:-4])
                        ref_crc = struct.pack('<I', crc)
                        crc_ok = (crc,) == struct.unpack('<I', payload[-4:])
                    else:
                        crc_ok = False
                        ref_crc = ''
                    #if len(payload) == 40:
                    #    print(' '.join(f'{col if col else ""}{c if c else 0:02x}' for col, c in itertools.zip_longest(field_colors,
                    #    payload)), 'OK' if crc_ok else f'WRONG')
                    if crc_ok:
                        seq, *data = struct.unpack('<I16H', payload[:-4])
                        db.execute('INSERT INTO packets(run_id, timestamp_us, data) VALUES (?,?,?)',
                                (run_id, int(time.time()*1000000), payload))
                        db.commit()
                        if seq != last_seq:
                            print()
                            if last_seq is not None:
                                if seq - last_seq > 2:
                                    for i in range(last_seq+1, seq):
                                        print(make_color(196), i, end=' ', flush=True)
                                elif seq - last_seq == 2:
                                    print(make_color(248), last_seq+1, end=' ', flush=True)
                            print(make_color(48), seq, end=' ', flush=True)
                            last_seq = seq
                        else:
                            print('.', end='', flush=True)
                except cobs.DecodeError:
                    if not args.quiet:
                        print('COBS framing error', len(packet))
                packet = b''
            else:
                packet += bytes([c])
            isprint = lambda c: c in b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
            if byte_count == 16:
                if args.print_hex:
                    printable = ''.join( chr(c) if isprint(c) else '.' for c in line )
                    print(f'\033[93m | {printable}\033[0m')
                byte_count = 0
                line = b''
            lastc = c


        if time.time() - last_tx > 0.01:
            seq = int((time.time() - start_time) * 10)
            data = struct.pack('<I', seq)
            #seq += 1
            #ser.write(bytes(list(range(255, -1, -1))))
            ser.write(cobs.encode(data + struct.pack('<I', zlib.crc32(data))) + b'\0')
            last_tx = time.time()
        #time.sleep(0.01)