#!/usr/bin/env python import sys import subprocess import binascii from statistics import mean from dataclasses import dataclass, fields import struct from pprint import pprint from cobs import cobs time = [*sys.argv, '1s'][1] proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True) data = [line.partition(':')[2] for line in proc.stdout.splitlines()] data = bytes([int(x, 16) for x in data if x]) class Serialized: @classmethod def deserialize(kls, data): fields = struct.unpack(kls._struct_format(), data) mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)] return kls(*mapped) @classmethod def _struct_format(kls): return kls._parse_fields()[0] @classmethod def _struct_casts(kls): return kls._parse_fields()[1] @classmethod def _parse_fields(kls): fmt = '<' casts = [] for field in fields(kls): if isinstance(field.type, tuple): struct_type, cast = field.type else: struct_type, cast = field.type, int fmt += struct_type casts.append(cast) return fmt, casts @dataclass class Header(Serialized): crc: 'I' src: 'B' dst: 'B' pid: 'B' packet_type: 'B' @dataclass class ADCPacket(Serialized): timestamp: 'Q' sampling_interval: 'I' total_samples: 'I' sample_count: 'I' samples: ('96s', bytes) def __post_init__(self): data = self.samples foo = lambda x: x if x < 0x800000 else x-0x1000000 self.samples = [[ foo(struct.unpack(' 10000 or diff_b > 10000: pprint(packet) norm_a = mean(packet.samples[0][:packet.sample_count]) norm_b = mean(packet.samples[1][:packet.sample_count]) elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]): pprint('nonzero', packet) except (cobs.DecodeError, struct.error): print('Decoding error')