summaryrefslogtreecommitdiff
path: root/tools/decode_logic_analzyer.py
blob: be1d4da49707076beca19a56841bc7e23d10740a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python

import sys
import subprocess
import binascii
from statistics import mean
from dataclasses import dataclass, fields
import struct
from pprint import pprint

from cobs import cobs

time = [*sys.argv, '1s'][1]
proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True)
data = [line.partition(':')[2] for line in proc.stdout.splitlines()]
data = bytes([int(x, 16) for x in data if x])

class Serialized:
    @classmethod
    def deserialize(kls, data):
        fields = struct.unpack(kls._struct_format(), data)
        mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)]
        return kls(*mapped)

    @classmethod
    def _struct_format(kls):
        return kls._parse_fields()[0]

    @classmethod
    def _struct_casts(kls):
        return kls._parse_fields()[1]

    @classmethod
    def _parse_fields(kls):
        fmt = '<'
        casts = []
        for field in fields(kls):
            if isinstance(field.type, tuple):
                struct_type, cast = field.type
            else:
                struct_type, cast = field.type, int
            fmt += struct_type
            casts.append(cast)
        return fmt, casts

@dataclass
class Header(Serialized):
    crc: 'I'
    src: 'B'
    dst: 'B'
    pid: 'B'
    packet_type: 'B'

@dataclass
class ADCPacket(Serialized):
    timestamp: 'Q'
    sampling_interval: 'I'
    total_samples: 'I'
    sample_count: 'I'
    samples: ('96s', bytes)

    def __post_init__(self):
        data = self.samples
        foo = lambda x: x if x < 0x800000 else x-0x1000000
        self.samples = [[
            foo(struct.unpack('<I', data[3*(2*sample + channel):][:3] + b'\0')[0])
            for sample in range(16)
            ] for channel in range(2)]


norm_a, norm_b = 0, 0
for packet in data.split(b'\0'):
    try:
        packet = cobs.decode(packet)
        hdr = Header.deserialize(packet[:8])
        if hdr.packet_type == 2:
            packet = ADCPacket.deserialize(packet[8:])
            diff_a = max([abs(x - norm_a) for x in packet.samples[0][:packet.sample_count]])
            diff_b = max([abs(x - norm_b) for x in packet.samples[1][:packet.sample_count]])
            if diff_a > 10000 or diff_b > 10000:
                pprint(packet)
                norm_a = mean(packet.samples[0][:packet.sample_count])
                norm_b = mean(packet.samples[1][:packet.sample_count])
            elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]):
                pprint('nonzero', packet)
    except (cobs.DecodeError, struct.error):
        print('Decoding error')