#!/usr/bin/env python3 import os from time import time from binascii import hexlify import enum import struct import zlib import sys import sqlite3 import serial from cobs import cobs class CtrlPacketTypes(enum.Enum): RESET = 1 ACK = 2 RETRANSMIT = 3 def unpack_head(fmt, data): split = struct.calcsize(fmt) return [ *struct.unpack(fmt, data[:split]), data[split:] ] def ctrl_packet(ptype, pid=0): return cobs.encode(struct.pack('BB', ptype.value, pid)) + b'\0' ctrl_reset = lambda: ctrl_packet(CtrlPacketTypes.RESET) ctrl_ack = lambda pid: ctrl_packet(CtrlPacketTypes.ACK, pid) ctrl_retransmit = lambda pid: ctrl_packet(CtrlPacketTypes.RETRANSMIT, pid) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('-b', '--baudrate', type=int, default=250000) parser.add_argument('port', nargs='?', default=None) parser.add_argument('dbfile') args = parser.parse_args() if args.port is None: try: candidate, = os.listdir('/dev/serial/by-id') args.port = os.path.join('/dev/serial/by-id', candidate) print(f'No port given, guessing {args.port}') except: print('No port given and could not guess port. Exiting.') sys.exit(1) ser = serial.Serial(args.port, args.baudrate, timeout=1.0) db = sqlite3.connect(args.dbfile) db.execute('CREATE TABLE IF NOT EXISTS measurements (run_id INTEGER, rx_ts INTEGER, seq INTEGER, data BLOB)') db.execute('''CREATE TABLE IF NOT EXISTS errors ( run_id INTEGER, rx_ts INTEGER, type TEXT, seq INTEGER, pid INTEGER, pid_expected INTEGER, crc32 INTEGER, crc32_expected INTEGER, data BLOB)''') run_id, = db.execute('SELECT IFNULL(MAX(run_id), -1) + 1 FROM measurements').fetchone() ser.flushInput() ser.write(ctrl_reset()) ser.flushOutput() last_pid = None lines_written = 0 cur = db.cursor() capture_start = time() while True: #ser.write(cobs.encode(b'\x01\xff') + b'\0') data = ser.read_until(b'\0') for data in data.split(b'\0')[:-1]: # data always ends on \0 due to read_until, so split off the trailing empty bytes() try: if not data: #print(f'{time():>7.3f} Timeout: resetting') #ser.write(cobs.encode(b'\x01\xff') + b'\0') # reset ser.write(ctrl_ack(0)) # FIXME delet this cur.execute('INSERT INTO errors(run_id, rx_ts, type) VALUES (?, ?, "retransmission")', (run_id, int(time()*1000))) continue crc32, payload = unpack_head('I', cobs.decode(data)) pid, seq, data = unpack_head('xBH', payload) ts = time() # Calculate byte-wise CRC32 our_crc = zlib.crc32(bytes(b for x in payload for b in (0, 0, 0, x))) #log.append((time(), seq, crc32, our_crc, pid, data)) bars = '\u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588' sparkline = ''.join(bars[int(x/4096*8)] for x in struct.unpack('<32H', data)) print(f'\033[38;5;249m{ts-capture_start:>10.3f}', f'\033[94m{seq:05d}', f'\033[38;5;243m{crc32:08x}', f'\033[38;5;243m{our_crc:08x}', f'\033[38;5;243m{pid}', f'\033[0m{hexlify(data).decode()}', f'\033[94m{sparkline}\033[0m', end='') error = False suppress_ack = False if crc32 != our_crc: print(' \033[1;91mCRC ERROR\033[0m', end='') suppress_ack = True error = True if last_pid is not None and pid != (last_pid+1)%8: print(' \033[1;93mPID ERROR\033[0m', end='') error = True else: last_pid = pid if not suppress_ack: ser.write(ctrl_ack(pid)) ser.flushOutput() if not suppress_ack: cur.execute('INSERT INTO measurements VALUES (?, ?, ?, ?)', (run_id, int(ts*1000), seq, data)) if error: cur.execute('INSERT INTO errors VALUES (?, ?, "pid", ?, ?, ?, ?, ?, ?)', (run_id, int(ts*1000), seq, pid, (last_pid+1)%8, crc32, our_crc, data)) print() lines_written += 1 if lines_written == 80: lines_written = 0 print('\033[2J\033[H', end='') delta = ts-capture_start print(f'\033[7mRun {run_id}, capturing for {delta//3600//24:> 3.0f}:{delta//3600%24:02.0f}:{delta//60%60:02.0f}:{delta%60:06.3f}\033[0m') db.commit() except Exception as e: print(e, len(data)) ser.write(ctrl_ack(0)) # FIXME delet this