From c67f7d626b76238e561304b53f31afdad7e2f671 Mon Sep 17 00:00:00 2001 From: jaseg Date: Thu, 18 Mar 2021 10:11:41 +0100 Subject: fw: dump accelerometer measurements over uart --- prototype/fw/tools/ser_test.py | 57 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 10 deletions(-) (limited to 'prototype/fw/tools') diff --git a/prototype/fw/tools/ser_test.py b/prototype/fw/tools/ser_test.py index ebd95ed..34ef8f2 100644 --- a/prototype/fw/tools/ser_test.py +++ b/prototype/fw/tools/ser_test.py @@ -7,6 +7,7 @@ import string import serial import time import zlib +import sqlite3 from cobs import cobs @@ -15,8 +16,21 @@ if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('port') parser.add_argument('-b', '--baudrate', type=int, default=115200) + parser.add_argument('-p', '--print-hex', action='store_true', default=False) + parser.add_argument('-d', '--database', default=':memory:') + parser.add_argument('-q', '--quiet', action='store_true', default=False) args = parser.parse_args() + db = sqlite3.connect(args.database) + db.execute('''CREATE TABLE IF NOT EXISTS packets ( + run_id INTEGER, + timestamp_us INTEGER, + data BLOB)''') + + run_id, = db.execute('SELECT IFNULL(MAX(run_id), 0) FROM packets').fetchone() + run_id += 1 + print(f'Run ID #{run_id}') + ser = serial.Serial(args.port, args.baudrate, timeout=0) byte_count = 0 @@ -29,18 +43,22 @@ if __name__ == '__main__': field_colors = [ make_color(x) for x in [ 48, 48, 48, 48, 220, 220, 220, 220, 207, 207, 207, 207 ] ] last_tx = time.time() lastc = -10000 + last_seq = None while True: data = ser.read() for c in data: - #if byte_count == 0: - #print(f'\033[38;5;244m{time.time() - start_time: 8.3f} \033[0m', end='') + if args.print_hex: + if byte_count == 0: + print(f'\033[38;5;244m{time.time() - start_time: 8.3f} \033[0m', end='') col = '\033[91m' if c == 0 else '\033[0m' # if c == (lastc - 1) % 256 else '\033[92m') - #print(f'{col}{c:02x}', end=' ') + if args.print_hex: + print(f'{col}{c:02x}', end=' ') line += bytes([c]) byte_count += 1 if c == 0: - #print(' ' * (16 - byte_count), end='\033[0m\n') + if args.print_hex: + print(' ' * (16 - byte_count), end='\033[0m\n') byte_count = 16 try: payload = cobs.decode(packet) @@ -51,18 +69,37 @@ if __name__ == '__main__': else: crc_ok = False ref_crc = '' - if len(payload) == 12: - print(' '.join(f'{col if col else ""}{c if c else 0:02x}' for col, c in itertools.zip_longest(field_colors, - payload)), 'OK' if crc_ok else f'WRONG') + #if len(payload) == 40: + # print(' '.join(f'{col if col else ""}{c if c else 0:02x}' for col, c in itertools.zip_longest(field_colors, + # payload)), 'OK' if crc_ok else f'WRONG') + if crc_ok: + seq, *data = struct.unpack(' 2: + for i in range(last_seq+1, seq): + print(make_color(196), i, end=' ', flush=True) + elif seq - last_seq == 2: + print(make_color(248), last_seq+1, end=' ', flush=True) + print(make_color(48), seq, end=' ', flush=True) + last_seq = seq + else: + print('.', end='', flush=True) except cobs.DecodeError: - print('COBS framing error') + if not args.quiet: + print('COBS framing error', len(packet)) packet = b'' else: packet += bytes([c]) isprint = lambda c: c in b'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ' if byte_count == 16: - printable = ''.join( chr(c) if isprint(c) else '.' for c in line ) - #print(f'\033[93m | {printable}\033[0m') + if args.print_hex: + printable = ''.join( chr(c) if isprint(c) else '.' for c in line ) + print(f'\033[93m | {printable}\033[0m') byte_count = 0 line = b'' lastc = c -- cgit