#!/usr/bin/env python3 import struct import itertools import binascii import string import serial import time import zlib import sqlite3 from cobs import cobs if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('port') parser.add_argument('-b', '--baudrate', type=int, default=115200) parser.add_argument('-p', '--print-hex', action='store_true', default=False) parser.add_argument('-d', '--database', default=':memory:') parser.add_argument('-q', '--quiet', action='store_true', default=False) args = parser.parse_args() db = sqlite3.connect(args.database) db.execute('''CREATE TABLE IF NOT EXISTS packets ( run_id INTEGER, timestamp_us INTEGER, data BLOB)''') run_id, = db.execute('SELECT IFNULL(MAX(run_id), 0) FROM packets').fetchone() run_id += 1 print(f'Run ID #{run_id}') ser = serial.Serial(args.port, args.baudrate, timeout=0) byte_count = 0 line = b'' packet = b'' start_time = time.time() seq = 0 make_color = lambda x: f'\033[38;5;{x}m' field_colors = [ make_color(x) for x in [ 48, 48, 48, 48, 220, 220, 220, 220, 207, 207, 207, 207 ] ] last_tx = time.time() lastc = -10000 last_seq = None while True: data = ser.read() for c in data: if args.print_hex: if byte_count == 0: print(f'\033[38;5;244m{time.time() - start_time: 8.3f} \033[0m', end='') col = '\033[91m' if c == 0 else '\033[0m' # if c == (lastc - 1) % 256 else '\033[92m') if args.print_hex: print(f'{col}{c:02x}', end=' ') line += bytes([c]) byte_count += 1 if c == 0: if args.print_hex: print(' ' * (16 - byte_count), end='\033[0m\n') byte_count = 16 try: payload = cobs.decode(packet) if len(payload) > 4: crc = zlib.crc32(payload[:-4]) ref_crc = struct.pack(' 2: for i in range(last_seq+1, seq): print(make_color(196), i, end=' ', flush=True) elif seq - last_seq == 2: print(make_color(248), last_seq+1, end=' ', flush=True) print(make_color(48), seq, end=' ', flush=True) last_seq = seq else: print('.', end='', flush=True) except cobs.DecodeError: if not args.quiet: print('COBS framing error', len(packet)) packet = b'' else: packet += bytes([c]) isprint = lambda c: c in b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ' if byte_count == 16: if args.print_hex: printable = ''.join( chr(c) if isprint(c) else '.' for c in line ) print(f'\033[93m | {printable}\033[0m') byte_count = 0 line = b'' lastc = c if time.time() - last_tx > 0.01: seq = int((time.time() - start_time) * 10) data = struct.pack('