#!/usr/bin/env python3 from time import time from binascii import hexlify import enum import struct import zlib import sys import serial from cobs import cobs class CtrlPacketTypes(enum.Enum): RESET = 1 ACK = 2 RETRANSMIT = 3 def unpack_head(fmt, data): split = struct.calcsize(fmt) return *struct.unpack(fmt, data[:split]), data[split:] def ctrl_packet(ptype, pid=0): return cobs.encode(struct.pack('BB', ptype.value, pid)) + b'\0' ctrl_reset = lambda: ctrl_packet(CtrlPacketTypes.RESET) ctrl_ack = lambda pid: ctrl_packet(CtrlPacketTypes.ACK, pid) ctrl_retransmit = lambda pid: ctrl_packet(CtrlPacketTypes.RETRANSMIT, pid) ser = serial.Serial('/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0', 250000, timeout=1.0) ser.write(b'foobar'*32) sys.exit(0) log = [] ser.flushInput() ser.write(ctrl_reset()) ser.flushOutput() for _ in range(100): #ser.write(cobs.encode(b'\x01\xff') + b'\0') data = ser.read_until(b'\0') if not data or data[-1] != 0x00: #print(f'{time():>7.3f} Timeout: resetting') #ser.write(cobs.encode(b'\x01\xff') + b'\0') # reset continue crc32, payload = unpack_head('I', cobs.decode(data[:-1])) pid, seq, data = unpack_head('xBH', payload) ser.write(ctrl_ack(pid)) ser.flushOutput() # Calculate byte-wise CRC32 #our_crc = zlib.crc32(bytes(b for x in payload for b in (0, 0, 0, x))) our_crc = 0 #log.append((time(), seq, crc32, our_crc, pid, data)) for time, seq, crc32, our_crc, pid, data in log: print(f'{time:>7.3f} {seq:05d} {crc32:08x} {our_crc:08x} {pid} {hexlify(data).decode()}')