1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
#!/usr/bin/env python3
import struct
import itertools
import binascii
import string
import serial
import time
import zlib
from cobs import cobs
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('port')
parser.add_argument('-b', '--baudrate', type=int, default=115200)
args = parser.parse_args()
ser = serial.Serial(args.port, args.baudrate, timeout=0)
byte_count = 0
line = b''
packet = b''
start_time = time.time()
seq = 0
make_color = lambda x: f'\033[38;5;{x}m'
field_colors = [ make_color(x) for x in [ 48, 48, 48, 48, 220, 220, 220, 220, 207, 207, 207, 207 ] ]
last_tx = time.time()
lastc = -10000
while True:
data = ser.read()
for c in data:
if byte_count == 0:
print(f'\033[38;5;244m{time.time() - start_time: 8.3f} \033[0m', end='')
col = '\033[91m' if c == 0 else '\033[0m' # if c == (lastc - 1) % 256 else '\033[92m')
print(f'{col}{c:02x}', end=' ')
line += bytes([c])
byte_count += 1
if c == 0:
print(' ' * (16 - byte_count), end='\033[0m\n')
byte_count = 16
try:
payload = cobs.decode(packet)
if len(payload) > 4:
crc = zlib.crc32(payload[:-4])
ref_crc = struct.pack('<I', crc)
crc_ok = (crc,) == struct.unpack('<I', payload[-4:])
else:
crc_ok = False
ref_crc = ''
print(' '.join(f'{col if col else ""}{c if c else 0:02x}' for col, c in itertools.zip_longest(field_colors,
payload)), 'OK' if crc_ok else f'WRONG')
except cobs.DecodeError:
print('COBS framing error')
packet = b''
else:
packet += bytes([c])
isprint = lambda c: c in b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
if byte_count == 16:
printable = ''.join( chr(c) if isprint(c) else '.' for c in line )
print(f'\033[93m | {printable}\033[0m')
byte_count = 0
line = b''
lastc = c
if time.time() - last_tx > 0.01:
seq = int((time.time() - start_time) * 10)
data = struct.pack('<I', seq)
#seq += 1
#ser.write(bytes(list(range(255, -1, -1))))
ser.write(cobs.encode(data + struct.pack('<I', zlib.crc32(data))) + b'\0')
last_tx = time.time()
#time.sleep(0.01)
|