aboutsummaryrefslogtreecommitdiff
path: root/driver_fw/tools/decode_logic_analzyer.py
diff options
context:
space:
mode:
Diffstat (limited to 'driver_fw/tools/decode_logic_analzyer.py')
-rw-r--r--driver_fw/tools/decode_logic_analzyer.py88
1 files changed, 88 insertions, 0 deletions
diff --git a/driver_fw/tools/decode_logic_analzyer.py b/driver_fw/tools/decode_logic_analzyer.py
new file mode 100644
index 0000000..be1d4da
--- /dev/null
+++ b/driver_fw/tools/decode_logic_analzyer.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+
+import sys
+import subprocess
+import binascii
+from statistics import mean
+from dataclasses import dataclass, fields
+import struct
+from pprint import pprint
+
+from cobs import cobs
+
+time = [*sys.argv, '1s'][1]
+proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True)
+data = [line.partition(':')[2] for line in proc.stdout.splitlines()]
+data = bytes([int(x, 16) for x in data if x])
+
+class Serialized:
+ @classmethod
+ def deserialize(kls, data):
+ fields = struct.unpack(kls._struct_format(), data)
+ mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)]
+ return kls(*mapped)
+
+ @classmethod
+ def _struct_format(kls):
+ return kls._parse_fields()[0]
+
+ @classmethod
+ def _struct_casts(kls):
+ return kls._parse_fields()[1]
+
+ @classmethod
+ def _parse_fields(kls):
+ fmt = '<'
+ casts = []
+ for field in fields(kls):
+ if isinstance(field.type, tuple):
+ struct_type, cast = field.type
+ else:
+ struct_type, cast = field.type, int
+ fmt += struct_type
+ casts.append(cast)
+ return fmt, casts
+
+@dataclass
+class Header(Serialized):
+ crc: 'I'
+ src: 'B'
+ dst: 'B'
+ pid: 'B'
+ packet_type: 'B'
+
+@dataclass
+class ADCPacket(Serialized):
+ timestamp: 'Q'
+ sampling_interval: 'I'
+ total_samples: 'I'
+ sample_count: 'I'
+ samples: ('96s', bytes)
+
+ def __post_init__(self):
+ data = self.samples
+ foo = lambda x: x if x < 0x800000 else x-0x1000000
+ self.samples = [[
+ foo(struct.unpack('<I', data[3*(2*sample + channel):][:3] + b'\0')[0])
+ for sample in range(16)
+ ] for channel in range(2)]
+
+
+norm_a, norm_b = 0, 0
+for packet in data.split(b'\0'):
+ try:
+ packet = cobs.decode(packet)
+ hdr = Header.deserialize(packet[:8])
+ if hdr.packet_type == 2:
+ packet = ADCPacket.deserialize(packet[8:])
+ diff_a = max([abs(x - norm_a) for x in packet.samples[0][:packet.sample_count]])
+ diff_b = max([abs(x - norm_b) for x in packet.samples[1][:packet.sample_count]])
+ if diff_a > 10000 or diff_b > 10000:
+ pprint(packet)
+ norm_a = mean(packet.samples[0][:packet.sample_count])
+ norm_b = mean(packet.samples[1][:packet.sample_count])
+ elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]):
+ pprint('nonzero', packet)
+ except (cobs.DecodeError, struct.error):
+ print('Decoding error')
+