1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
#!/usr/bin/env python
import sys
import subprocess
import binascii
from statistics import mean
from dataclasses import dataclass, fields
import struct
from pprint import pprint
from cobs import cobs
time = [*sys.argv, '1s'][1]
proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True)
data = [line.partition(':')[2] for line in proc.stdout.splitlines()]
data = bytes([int(x, 16) for x in data if x])
class Serialized:
@classmethod
def deserialize(kls, data):
fields = struct.unpack(kls._struct_format(), data)
mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)]
return kls(*mapped)
@classmethod
def _struct_format(kls):
return kls._parse_fields()[0]
@classmethod
def _struct_casts(kls):
return kls._parse_fields()[1]
@classmethod
def _parse_fields(kls):
fmt = '<'
casts = []
for field in fields(kls):
if isinstance(field.type, tuple):
struct_type, cast = field.type
else:
struct_type, cast = field.type, int
fmt += struct_type
casts.append(cast)
return fmt, casts
@dataclass
class Header(Serialized):
crc: 'I'
src: 'B'
dst: 'B'
pid: 'B'
packet_type: 'B'
@dataclass
class ADCPacket(Serialized):
timestamp: 'Q'
sampling_interval: 'I'
total_samples: 'I'
sample_count: 'I'
samples: ('96s', bytes)
def __post_init__(self):
data = self.samples
foo = lambda x: x if x < 0x800000 else x-0x1000000
self.samples = [[
foo(struct.unpack('<I', data[3*(2*sample + channel):][:3] + b'\0')[0])
for sample in range(16)
] for channel in range(2)]
norm_a, norm_b = 0, 0
for packet in data.split(b'\0'):
try:
packet = cobs.decode(packet)
hdr = Header.deserialize(packet[:8])
if hdr.packet_type == 2:
packet = ADCPacket.deserialize(packet[8:])
diff_a = max([abs(x - norm_a) for x in packet.samples[0][:packet.sample_count]])
diff_b = max([abs(x - norm_b) for x in packet.samples[1][:packet.sample_count]])
if diff_a > 10000 or diff_b > 10000:
pprint(packet)
norm_a = mean(packet.samples[0][:packet.sample_count])
norm_b = mean(packet.samples[1][:packet.sample_count])
elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]):
pprint('nonzero', packet)
except (cobs.DecodeError, struct.error):
print('Decoding error')
|