diff options
Diffstat (limited to 'driver_fw/tools/decode_logic_analzyer.py')
-rw-r--r-- | driver_fw/tools/decode_logic_analzyer.py | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/driver_fw/tools/decode_logic_analzyer.py b/driver_fw/tools/decode_logic_analzyer.py new file mode 100644 index 0000000..be1d4da --- /dev/null +++ b/driver_fw/tools/decode_logic_analzyer.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python + +import sys +import subprocess +import binascii +from statistics import mean +from dataclasses import dataclass, fields +import struct +from pprint import pprint + +from cobs import cobs + +time = [*sys.argv, '1s'][1] +proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True) +data = [line.partition(':')[2] for line in proc.stdout.splitlines()] +data = bytes([int(x, 16) for x in data if x]) + +class Serialized: + @classmethod + def deserialize(kls, data): + fields = struct.unpack(kls._struct_format(), data) + mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)] + return kls(*mapped) + + @classmethod + def _struct_format(kls): + return kls._parse_fields()[0] + + @classmethod + def _struct_casts(kls): + return kls._parse_fields()[1] + + @classmethod + def _parse_fields(kls): + fmt = '<' + casts = [] + for field in fields(kls): + if isinstance(field.type, tuple): + struct_type, cast = field.type + else: + struct_type, cast = field.type, int + fmt += struct_type + casts.append(cast) + return fmt, casts + +@dataclass +class Header(Serialized): + crc: 'I' + src: 'B' + dst: 'B' + pid: 'B' + packet_type: 'B' + +@dataclass +class ADCPacket(Serialized): + timestamp: 'Q' + sampling_interval: 'I' + total_samples: 'I' + sample_count: 'I' + samples: ('96s', bytes) + + def __post_init__(self): + data = self.samples + foo = lambda x: x if x < 0x800000 else x-0x1000000 + self.samples = [[ + foo(struct.unpack('<I', data[3*(2*sample + channel):][:3] + b'\0')[0]) + for sample in range(16) + ] for channel in range(2)] + + +norm_a, norm_b = 0, 0 +for packet in data.split(b'\0'): + try: + packet = cobs.decode(packet) + hdr = Header.deserialize(packet[:8]) + if hdr.packet_type == 2: + packet = ADCPacket.deserialize(packet[8:]) + diff_a = max([abs(x - norm_a) for x in packet.samples[0][:packet.sample_count]]) + diff_b = max([abs(x - norm_b) for x in packet.samples[1][:packet.sample_count]]) + if diff_a > 10000 or diff_b > 10000: + pprint(packet) + norm_a = mean(packet.samples[0][:packet.sample_count]) + norm_b = mean(packet.samples[1][:packet.sample_count]) + elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]): + pprint('nonzero', packet) + except (cobs.DecodeError, struct.error): + print('Decoding error') + |