From ec28fcd9f905358759eea98161f451567135d17e Mon Sep 17 00:00:00 2001 From: jaseg Date: Sun, 27 Aug 2023 22:31:09 +0200 Subject: new driver blinkenlights --- driver_fw/tools/decode_logic_analzyer.py | 88 ++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 driver_fw/tools/decode_logic_analzyer.py (limited to 'driver_fw/tools/decode_logic_analzyer.py') diff --git a/driver_fw/tools/decode_logic_analzyer.py b/driver_fw/tools/decode_logic_analzyer.py new file mode 100644 index 0000000..be1d4da --- /dev/null +++ b/driver_fw/tools/decode_logic_analzyer.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python + +import sys +import subprocess +import binascii +from statistics import mean +from dataclasses import dataclass, fields +import struct +from pprint import pprint + +from cobs import cobs + +time = [*sys.argv, '1s'][1] +proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True) +data = [line.partition(':')[2] for line in proc.stdout.splitlines()] +data = bytes([int(x, 16) for x in data if x]) + +class Serialized: + @classmethod + def deserialize(kls, data): + fields = struct.unpack(kls._struct_format(), data) + mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)] + return kls(*mapped) + + @classmethod + def _struct_format(kls): + return kls._parse_fields()[0] + + @classmethod + def _struct_casts(kls): + return kls._parse_fields()[1] + + @classmethod + def _parse_fields(kls): + fmt = '<' + casts = [] + for field in fields(kls): + if isinstance(field.type, tuple): + struct_type, cast = field.type + else: + struct_type, cast = field.type, int + fmt += struct_type + casts.append(cast) + return fmt, casts + +@dataclass +class Header(Serialized): + crc: 'I' + src: 'B' + dst: 'B' + pid: 'B' + packet_type: 'B' + +@dataclass +class ADCPacket(Serialized): + timestamp: 'Q' + sampling_interval: 'I' + total_samples: 'I' + sample_count: 'I' + samples: ('96s', bytes) + + def __post_init__(self): + data = self.samples + foo = lambda x: x if x < 0x800000 else x-0x1000000 + self.samples = [[ + foo(struct.unpack(' 10000 or diff_b > 10000: + pprint(packet) + norm_a = mean(packet.samples[0][:packet.sample_count]) + norm_b = mean(packet.samples[1][:packet.sample_count]) + elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]): + pprint('nonzero', packet) + except (cobs.DecodeError, struct.error): + print('Decoding error') + -- cgit