diff options
author | jaseg <git@jaseg.de> | 2024-06-08 18:45:51 +0200 |
---|---|---|
committer | jaseg <git@jaseg.de> | 2024-06-08 18:45:51 +0200 |
commit | b0d059fb24a77ecb244f0722bba2c163668a7eac (patch) | |
tree | f168b2f0242992159d28bc6a054d0281d1af5f15 /tools | |
download | submit-button-fw-b0d059fb24a77ecb244f0722bba2c163668a7eac.tar.gz submit-button-fw-b0d059fb24a77ecb244f0722bba2c163668a7eac.tar.bz2 submit-button-fw-b0d059fb24a77ecb244f0722bba2c163668a7eac.zip |
Initial commit
Diffstat (limited to 'tools')
-rw-r--r-- | tools/decode_logic_analzyer.py | 88 | ||||
-rw-r--r-- | tools/extract_pinmap.py | 11 | ||||
-rw-r--r-- | tools/gen_isr_header.py | 66 | ||||
-rw-r--r-- | tools/ldparser.py | 126 | ||||
-rw-r--r-- | tools/linkmem.py | 276 | ||||
-rw-r--r-- | tools/linksize.py | 62 | ||||
-rw-r--r-- | tools/linktracer.py | 118 | ||||
-rw-r--r-- | tools/mapparse.py | 129 | ||||
-rw-r--r-- | tools/musl_include_shims/bits/alltypes.h | 23 | ||||
-rw-r--r-- | tools/musl_include_shims/endian.h | 80 | ||||
-rw-r--r-- | tools/musl_include_shims/features.h | 40 | ||||
-rw-r--r-- | tools/musl_include_shims/fp_arch.h | 6 | ||||
-rw-r--r-- | tools/musl_include_shims/libm.h | 270 | ||||
-rw-r--r-- | tools/usb_test.py | 188 |
14 files changed, 1483 insertions, 0 deletions
diff --git a/tools/decode_logic_analzyer.py b/tools/decode_logic_analzyer.py new file mode 100644 index 0000000..be1d4da --- /dev/null +++ b/tools/decode_logic_analzyer.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python + +import sys +import subprocess +import binascii +from statistics import mean +from dataclasses import dataclass, fields +import struct +from pprint import pprint + +from cobs import cobs + +time = [*sys.argv, '1s'][1] +proc = subprocess.run(f'sigrok-cli --driver dreamsourcelab-dslogic --config samplerate=10M --channels 0,1 --protocol-decoders uart:baudrate=250000:rx=1 --protocol-decoder-annotations uart=rx-data,uart=tx-data --time {time}'.split(), check=True, capture_output=True, text=True) +data = [line.partition(':')[2] for line in proc.stdout.splitlines()] +data = bytes([int(x, 16) for x in data if x]) + +class Serialized: + @classmethod + def deserialize(kls, data): + fields = struct.unpack(kls._struct_format(), data) + mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)] + return kls(*mapped) + + @classmethod + def _struct_format(kls): + return kls._parse_fields()[0] + + @classmethod + def _struct_casts(kls): + return kls._parse_fields()[1] + + @classmethod + def _parse_fields(kls): + fmt = '<' + casts = [] + for field in fields(kls): + if isinstance(field.type, tuple): + struct_type, cast = field.type + else: + struct_type, cast = field.type, int + fmt += struct_type + casts.append(cast) + return fmt, casts + +@dataclass +class Header(Serialized): + crc: 'I' + src: 'B' + dst: 'B' + pid: 'B' + packet_type: 'B' + +@dataclass +class ADCPacket(Serialized): + timestamp: 'Q' + sampling_interval: 'I' + total_samples: 'I' + sample_count: 'I' + samples: ('96s', bytes) + + def __post_init__(self): + data = self.samples + foo = lambda x: x if x < 0x800000 else x-0x1000000 + self.samples = [[ + foo(struct.unpack('<I', data[3*(2*sample + channel):][:3] + b'\0')[0]) + for sample in range(16) + ] for channel in range(2)] + + +norm_a, norm_b = 0, 0 +for packet in data.split(b'\0'): + try: + packet = cobs.decode(packet) + hdr = Header.deserialize(packet[:8]) + if hdr.packet_type == 2: + packet = ADCPacket.deserialize(packet[8:]) + diff_a = max([abs(x - norm_a) for x in packet.samples[0][:packet.sample_count]]) + diff_b = max([abs(x - norm_b) for x in packet.samples[1][:packet.sample_count]]) + if diff_a > 10000 or diff_b > 10000: + pprint(packet) + norm_a = mean(packet.samples[0][:packet.sample_count]) + norm_b = mean(packet.samples[1][:packet.sample_count]) + elif any(x != 0 for x in packet.samples[0][packet.sample_count:] + packet.samples[1][packet.sample_count:]): + pprint('nonzero', packet) + except (cobs.DecodeError, struct.error): + print('Decoding error') + diff --git a/tools/extract_pinmap.py b/tools/extract_pinmap.py new file mode 100644 index 0000000..d2c52e8 --- /dev/null +++ b/tools/extract_pinmap.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python + +import click + +@click.command() +@click.option('sch_file') +def cli(): + + +if __name__ == '__main__': + cli() diff --git a/tools/gen_isr_header.py b/tools/gen_isr_header.py new file mode 100644 index 0000000..6382b1f --- /dev/null +++ b/tools/gen_isr_header.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +import subprocess +import os +import re +import datetime + +def cpp_preprocess(input_path, cpp='cpp'): + return subprocess.check_output([cpp, '-P', input_path]).decode() + +def gen_isr_header(f, cpp='cpp'): + stripped_code = cpp_preprocess(args.input, args.use_cpp) + + armed = False + for line in stripped_code.splitlines(): + line = line.strip() + + if armed: + if not line.startswith('.word'): + break + + word, value = line.split() + assert word == '.word' + if value == '0': + yield None + else: + yield value + + else: + if line.startswith('g_pfnVectors:'): + armed = True + + else: + raise ValueError('Cannot find interrupt vector definition!') + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--use-cpp', type=str, default=os.getenv('CPP', 'cpp'), help='cpp (C preprocessor) executable to use') + parser.add_argument('-g', '--generate-include-guards', action='store_true', help='Whether to generate include guards') + parser.add_argument('input', help='Input stm32****_startup.s file') + args = parser.parse_args() + + print('/* AUTOGENERATED FILE! DO NOT MODIFY! */') + print('/* Generated {datetime.datetime.utcnow()} from {args.input} */') + if args.generate_include_guards: + include_guard_id = '__ISR_HEADER_' + re.sub('[^A-Za-z0-9]', '_', args.input.split('/')[-1]) + '__' + print(f'#ifndef {include_guard_id}') + print(f'#define {include_guard_id}') + + print() + for i, handler_name in enumerate(gen_isr_header(args.input, args.use_cpp)): + if handler_name is None: + print(f'/* IRQ {i} is undefined for this part. */') + else: + print(f'void {handler_name}(void); {" " * (30-len(handler_name))} /* {i:> 3} */') + print() + + print(f'#define NUM_IRQs {i+1}') + print('extern uint32_t g_pfnVectors[NUM_IRQs];') + print('#define isr_vector g_pfnVectors') + print() + + if args.generate_include_guards: + print(f'#endif /* {include_guard_id} */') + diff --git a/tools/ldparser.py b/tools/ldparser.py new file mode 100644 index 0000000..c620fe2 --- /dev/null +++ b/tools/ldparser.py @@ -0,0 +1,126 @@ + +import sys + +import pyparsing as pp +from pyparsing import pyparsing_common as ppc + +LPAREN, RPAREN, LBRACE, RBRACE, LBROK, RBROK, COLON, SEMICOLON, EQUALS, COMMA = map(pp.Suppress, '(){}<>:;=,') + +parse_suffix_int = lambda lit: int(lit[:-1]) * (10**(3*(1 + 'kmgtpe'.find(lit[-1].lower())))) +si_suffix = pp.oneOf('k m g t p e', caseless=True) + +numeric_literal = pp.Regex('0x[0-9a-fA-F]+').setName('hex int').setParseAction(pp.tokenMap(int, 16)) \ + | (pp.Regex('[0-9]+[kKmMgGtTpPeE]')).setName('size int').setParseAction(pp.tokenMap(parse_suffix_int)) \ + | pp.Word(pp.nums).setName('int').setParseAction(pp.tokenMap(int)) +access_def = pp.Regex('[rR]?[wW]?[xX]?').setName('access literal').setParseAction(pp.tokenMap(str.lower)) + +origin_expr = pp.Suppress(pp.CaselessKeyword('ORIGIN')) + EQUALS + numeric_literal +length_expr = pp.Suppress(pp.CaselessKeyword('LENGTH')) + EQUALS + numeric_literal +mem_expr = pp.Group(ppc.identifier + LPAREN + access_def + RPAREN + COLON + origin_expr + COMMA + length_expr) +mem_contents = pp.ZeroOrMore(mem_expr) + +mem_toplevel = pp.CaselessKeyword("MEMORY") + pp.Group(LBRACE + pp.Optional(mem_contents, []) + RBRACE) + +glob = pp.Word(pp.alphanums + '._*') +match_expr = pp.Forward() +assignment = pp.Forward() +funccall = pp.Group(pp.Word(pp.alphas + '_') + LPAREN + (assignment | numeric_literal | match_expr | glob | ppc.identifier) + RPAREN + pp.Optional(SEMICOLON)) +value = numeric_literal | funccall | ppc.identifier | '.' +formula = (value + pp.oneOf('+ = * / %') + value) | value +# suppress stray semicolons +assignment << (SEMICOLON | pp.Group((ppc.identifier | '.') + EQUALS + (formula | value) + pp.Optional(SEMICOLON))) +match_expr << (glob + LPAREN + pp.OneOrMore(funccall | glob) + RPAREN) + +section_contents = pp.ZeroOrMore(assignment | funccall | match_expr); + +section_name = pp.Regex('\.[a-zA-Z0-9_.]+') +section_def = pp.Group(section_name + pp.Optional(numeric_literal) + COLON + LBRACE + pp.Group(section_contents) + + RBRACE + pp.Optional(RBROK + ppc.identifier + pp.Optional('AT' + RBROK + ppc.identifier))) +sec_contents = pp.ZeroOrMore(section_def | assignment) + +sections_toplevel = pp.Group(pp.CaselessKeyword("SECTIONS").suppress() + LBRACE + sec_contents + RBRACE) + +toplevel_elements = mem_toplevel | funccall | sections_toplevel | assignment +ldscript = pp.Group(pp.ZeroOrMore(toplevel_elements)) +ldscript.ignore(pp.cppStyleComment) + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('linker_script', type=argparse.FileType('r')) + args = parser.parse_args() + + #print(mem_expr.parseString('FLASH (rx) : ORIGIN = 0x0800000, LENGTH = 512K', parseAll=True)) + # print(ldscript.parseString(''' + # /* Entry Point */ + # ENTRY(Reset_Handler) + # + # /* Highest address of the user mode stack */ + # _estack = 0x20020000; /* end of RAM */ + # /* Generate a link error if heap and stack don't fit into RAM */ + # _Min_Heap_Size = 0x200;; /* required amount of heap */ + # _Min_Stack_Size = 0x400;; /* required amount of stack */ + # ''', parseAll=True)) + + print(ldscript.parseFile(args.linker_script, parseAll=True)) + #print(funccall.parseString('KEEP(*(.isr_vector))')) + #print(section_contents.parseString(''' + # . = ALIGN(4); + # KEEP(*(.isr_vector)) /* Startup code */ + # . = ALIGN(4); + # ''', parseAll=True)) + + #print(section_def.parseString(''' + # .text : + # { + # . = ALIGN(4); + # *(.text) /* .text sections (code) */ + # *(.text*) /* .text* sections (code) */ + # *(.glue_7) /* glue arm to thumb code */ + # *(.glue_7t) /* glue thumb to arm code */ + # *(.eh_frame) + # + # KEEP (*(.init)) + # KEEP (*(.fini)) + # + # . = ALIGN(4); + # _etext = .; /* define a global symbols at end of code */ + # } >FLASH + # ''', parseAll=True)) + + #print(section_def.parseString('.ARM.extab : { *(.ARM.extab* .gnu.linkonce.armextab.*) } >FLASH', parseAll=True)) + + #print(assignment.parseString('__preinit_array_start = .', parseAll=True)) + #print(assignment.parseString('a = 23', parseAll=True)) + #print(funccall.parseString('foo (a=23)', parseAll=True)) + #print(funccall.parseString('PROVIDE_HIDDEN (__preinit_array_start = .);', parseAll=True)) + #print(section_def.parseString(''' + # .preinit_array : + # { + # PROVIDE_HIDDEN (__preinit_array_start = .); + # KEEP (*(.preinit_array*)) + # PROVIDE_HIDDEN (__preinit_array_end = .); + # } >FLASH''', parseAll=True)) + #print(match_expr.parseString('*(SORT(.init_array.*))', parseAll=True)) + #print(funccall.parseString('KEEP (*(SORT(.init_array.*)))', parseAll=True)) + #print(section_def.parseString(''' + # .init_array : + # { + # PROVIDE_HIDDEN (__init_array_start = .); + # KEEP (*(SORT(.init_array.*))) + # KEEP (*(.init_array*)) + # PROVIDE_HIDDEN (__init_array_end = .); + # } >FLASH + # ''', parseAll=True)) + + #print(match_expr.parseString('*(.ARM.extab* .gnu.linkonce.armextab.*)', parseAll=True)) + #print(formula.parseString('. + _Min_Heap_Size', parseAll=True)) + #print(assignment.parseString('. = . + _Min_Heap_Size;', parseAll=True)) + #print(sections_toplevel.parseString(''' + # SECTIONS + # { + # .ARMattributes : { } + # } + # ''', parseAll=True)) + #sys.exit(0) + diff --git a/tools/linkmem.py b/tools/linkmem.py new file mode 100644 index 0000000..452471d --- /dev/null +++ b/tools/linkmem.py @@ -0,0 +1,276 @@ + +import tempfile +import os +from os import path +import sys +import re +import subprocess +from contextlib import contextmanager +from collections import defaultdict +import colorsys + +import cxxfilt +from elftools.elf.elffile import ELFFile +from elftools.elf.enums import ENUM_ST_SHNDX +from elftools.elf.descriptions import describe_symbol_type, describe_sh_type +import libarchive +import matplotlib.cm + +@contextmanager +def chdir(newdir): + old_cwd = os.getcwd() + try: + os.chdir(newdir) + yield + finally: + os.chdir(old_cwd) + +def keep_last(it, first=None): + last = first + for elem in it: + yield last, elem + last = elem + +def delim(start, end, it, first_only=True): + found = False + for elem in it: + if end(elem): + if first_only: + return + found = False + elif start(elem): + found = True + elif found: + yield elem + +def delim_prefix(start, end, it): + yield from delim(lambda l: l.startswith(start), lambda l: end is not None and l.startswith(end), it) + +def trace_source_files(linker, cmdline, trace_sections=[], total_sections=['.text', '.data', '.rodata']): + with tempfile.TemporaryDirectory() as tempdir: + out_path = path.join(tempdir, 'output.elf') + output = subprocess.check_output([linker, '-o', out_path, f'-Wl,--print-map', *cmdline]) + lines = [ line.strip() for line in output.decode().splitlines() ] + # FIXME also find isr vector table references + + defs = {} + objs = defaultdict(lambda: 0) + aliases = {} + sec_name = None + last_loc = None + last_sym = None + line_cont = None + for last_line, line in keep_last(delim_prefix('Linker script and memory map', 'OUTPUT', lines), first=''): + if not line or line.startswith('LOAD '): + sec_name = None + continue + + # first part of continuation line + if m := re.match(r'^(\.[0-9a-zA-Z-_.]+)$', line): + line_cont = line + sec_name = None + continue + + if line_cont: + line = line_cont + ' ' + line + line_cont = None + + # -ffunction-sections/-fdata-sections section + if m := re.match(r'^(\.[0-9a-zA-Z-_.]+)\.([0-9a-zA-Z-_.]+)\s+(0x[0-9a-f]+)\s+(0x[0-9a-f]+)\s+(\S+)$', line): + sec, sym, loc, size, obj = m.groups() + *_, sym = sym.rpartition('.') + sym = cxxfilt.demangle(sym) + size = int(size, 16) + obj = path.abspath(obj) + + if sec not in total_sections: + size = 0 + + objs[obj] += size + defs[sym] = (sec, size, obj) + + sec_name, last_loc, last_sym = sec, loc, sym + continue + + # regular (no -ffunction-sections/-fdata-sections) section + if m := re.match(r'^(\.[0-9a-zA-Z-_]+)\s+(0x[0-9a-f]+)\s+(0x[0-9a-f]+)\s+(\S+)$', line): + sec, _loc, size, obj = m.groups() + size = int(size, 16) + obj = path.abspath(obj) + + if sec in total_sections: + objs[obj] += size + + sec_name = sec + last_loc, last_sym = None, None + continue + + # symbol def + if m := re.match(r'^(0x[0-9a-f]+)\s+(\S+)$', line): + loc, sym = m.groups() + sym = cxxfilt.demangle(sym) + loc = int(loc, 16) + if sym in defs: + continue + + if loc == last_loc: + assert last_sym is not None + aliases[sym] = last_sym + else: + assert sec_name + defs[sym] = (sec_name, None, obj) + last_loc, last_sym = loc, sym + + continue + + refs = defaultdict(lambda: set()) + for sym, (sec, size, obj) in defs.items(): + fn, _, member = re.match(r'^([^()]+)(\((.+)\))?$', obj).groups() + fn = path.abspath(fn) + + if member: + subprocess.check_call(['ar', 'x', '--output', tempdir, fn, member]) + fn = path.join(tempdir, member) + + with open(fn, 'rb') as f: + elf = ELFFile(f) + + symtab = elf.get_section_by_name('.symtab') + + symtab_demangled = { cxxfilt.demangle(nsym.name).replace(' ', ''): i + for i, nsym in enumerate(symtab.iter_symbols()) } + + s = set() + sec_map = { sec.name: i for i, sec in enumerate(elf.iter_sections()) } + matches = [ i for name, i in sec_map.items() if re.match(fr'\.rel\..*\.{sym}', name) ] + if matches: + sec = elf.get_section(matches[0]) + for reloc in sec.iter_relocations(): + refsym = symtab.get_symbol(reloc['r_info_sym']) + name = refsym.name if refsym.name else elf.get_section(refsym['st_shndx']).name.split('.')[-1] + s.add(name) + refs[sym] = s + + for tsec in trace_sections: + matches = [ i for name, i in sec_map.items() if name == f'.rel{tsec}' ] + s = set() + if matches: + sec = elf.get_section(matches[0]) + for reloc in sec.iter_relocations(): + refsym = symtab.get_symbol(reloc['r_info_sym']) + s.add(refsym.name) + refs[tsec.replace('.', '_')] |= s + + return objs, aliases, defs, refs + +@contextmanager +def wrap(leader='', print=print, left='{', right='}'): + print(leader, left) + yield lambda *args, **kwargs: print(' ', *args, **kwargs) + print(right) + +def mangle(name): + return re.sub('[^a-zA-Z0-9_]', '_', name) + +hexcolor = lambda r, g, b, *_a: f'#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}' +def vhex(val): + r,g,b,_a = matplotlib.cm.viridis(1.0-val) + fc = hexcolor(r, g, b) + h,s,v = colorsys.rgb_to_hsv(r,g,b) + cc = '#000000' if v > 0.8 else '#ffffff' + return fc, cc + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--trace-sections', type=str, action='append', default=[]) + parser.add_argument('--trim-stubs', type=str, action='append', default=[]) + parser.add_argument('--highlight-subdirs', type=str, default=None) + parser.add_argument('linker_binary') + parser.add_argument('linker_args', nargs=argparse.REMAINDER) + args = parser.parse_args() + + trace_sections = args.trace_sections + trace_sections_mangled = { sec.replace('.', '_') for sec in trace_sections } + objs, aliases, syms, refs = trace_source_files(args.linker_binary, args.linker_args, trace_sections) + + clusters = defaultdict(lambda: []) + for sym, (sec, size, obj) in syms.items(): + clusters[obj].append((sym, sec, size)) + + max_ssize = max(size or 0 for _sec, size, _obj in syms.values()) + max_osize = max(objs.values()) + + subdir_prefix = path.abspath(args.highlight_subdirs) + '/' if args.highlight_subdirs else '### NO HIGHLIGHT ###' + first_comp = lambda le_path: path.dirname(le_path).partition(os.sep)[0] + subdir_colors = sorted({ first_comp(obj[len(subdir_prefix):]) for obj in objs if obj.startswith(subdir_prefix) }) + subdir_colors = { path: hexcolor(*matplotlib.cm.Pastel1(i/len(subdir_colors))) for i, path in enumerate(subdir_colors) } + + subdir_sizes = defaultdict(lambda: 0) + for obj, size in objs.items(): + if not isinstance(size, int): + continue + if obj.startswith(subdir_prefix): + subdir_sizes[first_comp(obj[len(subdir_prefix):])] += size + else: + subdir_sizes['<others>'] += size + + print('Subdir sizes:', file=sys.stderr) + for subdir, size in sorted(subdir_sizes.items(), key=lambda x: x[1]): + print(f'{subdir:>20}: {size:>6,d} B', file=sys.stderr) + + def lookup_highlight(path): + if args.highlight_subdirs: + if obj.startswith(subdir_prefix): + highlight_head = first_comp(path[len(subdir_prefix):]) + return subdir_colors[highlight_head], highlight_head + else: + return '#e0e0e0', None + else: + return '#ddf7f4', None + + with wrap('digraph G', print) as lvl1print: + print('size="23.4,16.5!";') + print('graph [fontsize=40];') + print('node [fontsize=40];') + #print('ratio="fill";') + + print('rankdir=LR;') + print('ranksep=5;') + print('nodesep=0.2;') + print() + + for i, (obj, obj_syms) in enumerate(clusters.items()): + with wrap(f'subgraph cluster_{i}', lvl1print) as lvl2print: + print('style = "filled";') + highlight_color, highlight_head = lookup_highlight(obj) + print(f'bgcolor = "{highlight_color}";') + print('pencolor = none;') + fc, cc = vhex(objs[obj]/max_osize) + highlight_subdir_part = f'<font face="carlito" color="{cc}" point-size="40">{highlight_head} / </font>' if highlight_head else '' + lvl2print(f'label = <<table border="0"><tr><td border="0" cellpadding="5" bgcolor="{fc}">' + f'{highlight_subdir_part}' + f'<font face="carlito" color="{cc}"><b>{path.basename(obj)} ({objs[obj]}B)</b></font>' + f'</td></tr></table>>;') + lvl2print() + for sym, sec, size in obj_syms: + has_size = isinstance(size, int) and size > 0 + size_s = f' ({size}B)' if has_size else '' + fc, cc = vhex(size/max_ssize) if has_size else ('#ffffff', '#000000') + shape = 'box' if sec == '.text' else 'oval' + lvl2print(f'{mangle(sym)}[label = "{sym}{size_s}", style="rounded,filled", shape="{shape}", fillcolor="{fc}", fontname="carlito", fontcolor="{cc}" color=none];') + lvl1print() + + edges = set() + for start, ends in refs.items(): + for end in ends: + end = aliases.get(end, end) + if (start in syms or start in trace_sections_mangled) and end in syms: + edges.add((start, end)) + + for start, end in edges: + lvl1print(f'{mangle(start)} -> {mangle(end)} [style="bold", color="#333333"];') + + for sec in trace_sections: + lvl1print(f'{sec.replace(".", "_")} [label = "section {sec}", shape="box", style="filled,bold"];') + diff --git a/tools/linksize.py b/tools/linksize.py new file mode 100644 index 0000000..c41a951 --- /dev/null +++ b/tools/linksize.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 + +def parse_linker_script(data): + pass + +def link(groups): + defined_symbols = {} + undefined_symbols = set() + for group, files in groups: + while True: + found_something = False + + for fn in files: + symbols = load_symbols(fn) + for symbol in symbols: + if symbol in defined_symbols: + + if not group or not found_something: + break + + +if __name__ == '__main__': + + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('-T', '--script', type=str, help='Linker script to use') + parser.add_argument('-o', '--output', type=str, help='Output file to produce') + args, rest = parser.parse_known_intermixed_args() + print(rest) + + addprefix = lambda *xs: [ prefix + opt for opt in xs for prefix in ('', '-Wl,') ] + START_GROUP = addprefix('-(', '--start-group') + END_GROUP = addprefix('-)', '--end-group') + GROUP_OPTS = [*START_GROUP, *END_GROUP] + input_files = [ arg for arg in rest if not arg.startswith('-') or arg in GROUP_OPTS ] + + def input_file_iter(input_files): + group = False + files = [] + for arg in input_files: + if arg in START_GROUP: + assert not group + + if files: + yield False, files # nested -Wl,--start-group + group, files = True, [] + + elif arg in END_GROUP: + assert group # missing -Wl,--start-group + if files: + yield True, files + group, files = False, [] + + else: + files.append(arg) + + assert not group # missing -Wl,--end-group + if files: + yield False, files + + + diff --git a/tools/linktracer.py b/tools/linktracer.py new file mode 100644 index 0000000..0c53a60 --- /dev/null +++ b/tools/linktracer.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 + +import re +import subprocess +import tempfile +import pprint + +ARCHIVE_RE = r'([^(]*)(\([^)]*\))?' + +def trace_source_files(linker, cmdline): + with tempfile.NamedTemporaryFile() as mapfile: + output = subprocess.check_output([linker, f'-Wl,--Map={mapfile.name}', *cmdline]) + + # intentionally use generator here + idx = 0 + lines = [ line.rstrip() for line in mapfile.read().decode().splitlines() if line.strip() ] + + for idx, line in enumerate(lines[idx:], start=idx): + #print('Dropping', line) + if line == 'Linker script and memory map': + break + + idx += 1 + objects = [] + symbols = {} + sections = {} + current_object = None + last_offset = None + last_symbol = None + cont_sec = None + cont_ind = None + current_section = None + for idx, line in enumerate(lines[idx:], start=idx): + print(f'Processing >{line}') + if line.startswith('LOAD'): + _load, obj = line.split() + objects.append(obj) + continue + + if line.startswith('OUTPUT'): + break + + m = re.match(r'^( ?)([^ ]+)? +(0x[0-9a-z]+) +(0x[0-9a-z]+)?(.*)?$', line) + if m is None: + m = re.match(r'^( ?)([^ ]+)?$', line) + if m: + cont_ind, cont_sec = m.groups() + else: + cont_ind, cont_sec = None, None + last_offset, last_symbol = None, None + continue + indent, sec, offx, size, sym_or_src = m.groups() + if sec is None: + sec = cont_sec + ind = cont_ind + cont_sec = None + cont_ind = None + print(f'vals: indent={indent} sec={sec} offx={offx} size={size} sym_or_src={sym_or_src}') + if not re.match('^[a-zA-Z_0-9<>():*]+$', sym_or_src): + continue + + if indent == '': + print(f'Section: {sec} 0x{size:x}') + current_section = sec + sections[sec] = size + last_offset = None + last_symbol = None + continue + + if offx is not None: + offx = int(offx, 16) + if size is not None: + size = int(size, 16) + + if size is not None and sym_or_src is not None: + # archive/object line + archive, _member = re.match(ARCHIVE_RE, sym_or_src).groups() + current_object = archive + last_offset = offx + else: + if sym_or_src is not None: + assert size is None + if last_offset is not None: + last_size = offx - last_offset + symbols[last_symbol] = (last_size, current_section) + print(f'Symbol: {last_symbol} 0x{last_size:x} @{current_section}') + last_offset = offx + last_symbol = sym_or_src + + idx += 1 + + for idx, line in enumerate(lines[idx:], start=idx): + if line == 'Cross Reference Table': + break + + idx += 1 + + # map which symbol was pulled from which object in the end + used_defs = {} + for line in lines: + *left, right = line.split() + + archive, _member = re.match(ARCHIVE_RE, right).groups() + if left: + used_defs[''.join(left)] = archive + + #pprint.pprint(symbols) + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('linker_binary') + parser.add_argument('linker_args', nargs=argparse.REMAINDER) + args = parser.parse_args() + + source_files = trace_source_files(args.linker_binary, args.linker_args) + diff --git a/tools/mapparse.py b/tools/mapparse.py new file mode 100644 index 0000000..c1f460a --- /dev/null +++ b/tools/mapparse.py @@ -0,0 +1,129 @@ + +import re +from collections import defaultdict, namedtuple + +Section = namedtuple('Section', ['name', 'offset', 'objects']) +ObjectEntry = namedtuple('ObjectEntry', ['filename', 'object', 'offset', 'size']) +FileEntry = namedtuple('FileEntry', ['section', 'object', 'offset', 'length']) + +class Memory: + def __init__(self, name, origin, length, attrs=''): + self.name, self.origin, self.length, self.attrs = name, origin, length, attrs + self.sections = {} + self.files = defaultdict(lambda: []) + self.totals = defaultdict(lambda: 0) + + def add_toplevel(self, name, offx, length): + self.sections[name] = Section(offx, length, []) + + def add_obj(self, name, offx, length, fn, obj): + base_section, sep, subsec = name[1:].partition('.') + base_section = '.'+base_section + if base_section in self.sections: + sec = secname, secoffx, secobjs = self.sections[base_section] + secobjs.append(ObjectEntry(fn, obj, offx, length)) + else: + sec = None + self.files[fn].append(FileEntry(sec, obj, offx, length)) + self.totals[fn] += length + +class MapFile: + def __init__(self, s): + self._lines = s.splitlines() + self.memcfg = {} + self.defaultmem = Memory('default', 0, 0xffffffffffffffff) + self._parse() + + def __getitem__(self, offx_or_name): + ''' Lookup a memory area by name or address ''' + if offx_or_name in self.memcfg: + return self.memcfg[offx_or_name] + + elif isinstance(offx_or_name, int): + for mem in self.memcfg.values(): + if mem.origin <= offx_or_name < mem.origin+mem.length: + return mem + else: + return self.defaultmem + + raise ValueError('Invalid argument type for indexing') + + def _skip(self, regex): + matcher = re.compile(regex) + for l in self: + if matcher.match(l): + break + + def __iter__(self): + while self._lines: + yield self._lines.pop(0) + + def _parse(self): + self._skip('^Memory Configuration') + + # Parse memory segmentation info + self._skip('^Name') + for l in self: + if not l: + break + name, origin, length, *attrs = l.split() + if not name.startswith('*'): + self.memcfg[name] = Memory(name, int(origin, 16), int(length, 16), attrs[0] if attrs else '') + + # Parse section information + toplevel_m = re.compile('^(\.[a-zA-Z0-9_.]+)\s+(0x[0-9a-fA-F]+)\s+(0x[0-9a-fA-F]+)') + secondlevel_m = re.compile('^ (\.[a-zA-Z0-9_.]+)\s+(0x[0-9a-fA-F]+)\s+(0x[0-9a-fA-F]+)\s+(.*)$') + secondlevel_linebreak_m = re.compile('^ (\.[a-zA-Z0-9_.]+)\n') + filelike = re.compile('^(/?[^()]*\.[a-zA-Z0-9-_]+)(\(.*\))?') + linebreak_section = None + for l in self: + # Toplevel section + match = toplevel_m.match(l) + if match: + name, offx, length = match.groups() + offx, length = int(offx, 16), int(length, 16) + self[offx].add_toplevel(name, offx, length) + + match = secondlevel_linebreak_m.match(l) + if match: + linebreak_section, = match.groups() + continue + + if linebreak_section: + l = ' {} {}'.format(linebreak_section, l) + linebreak_section = None + + # Second-level section + match = secondlevel_m.match(l) + if match: + name, offx, length, misc = match.groups() + match = filelike.match(misc) + if match: + fn, obj = match.groups() + obj = obj.strip('()') if obj else None + offx, length = int(offx, 16), int(length, 16) + self[offx].add_obj(name, offx, length, fn, obj) + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description='Parser GCC map file') + parser.add_argument('mapfile', type=argparse.FileType('r'), help='The GCC .map file to parse') + parser.add_argument('-m', '--memory', type=str, help='The memory segments to print, comma-separated') + args = parser.parse_args() + mf = MapFile(args.mapfile.read()) + args.mapfile.close() + + mems = args.memory.split(',') if args.memory else mf.memcfg.keys() + + for name in mems: + mem = mf.memcfg[name] + print('Symbols by file for memory', name) + for tot, fn in reversed(sorted( (tot, fn) for fn, tot in mem.totals.items() )): + print(' {:>8} {}'.format(tot, fn)) + for length, offx, sec, obj in reversed(sorted(( (length, offx, sec, obj) for sec, obj, offx, length in + mem.files[fn] ), key=lambda e: e[0] )): + name = sec.name if sec else None + print(' {:>8} {:>#08x} {}'.format(length, offx, obj)) + #print('{:>16} 0x{:016x} 0x{:016x} ({:>24}) {}'.format(name, origin, length, length, attrs)) + diff --git a/tools/musl_include_shims/bits/alltypes.h b/tools/musl_include_shims/bits/alltypes.h new file mode 100644 index 0000000..581ca85 --- /dev/null +++ b/tools/musl_include_shims/bits/alltypes.h @@ -0,0 +1,23 @@ + +/* shim file for musl */ + +#ifndef __MUSL_SHIM_BITS_ALLTYPES_H__ +#define __MUSL_SHIM_BITS_ALLTYPES_H__ + +#define _REDIR_TIME64 1 +#define _Addr int +#define _Int64 long long +#define _Reg int + +#define __BYTE_ORDER 1234 + +#define __LONG_MAX 0x7fffffffL + +#ifndef __cplusplus +typedef unsigned wchar_t; +#endif + +typedef float float_t; +typedef double double_t; + +#endif /* __MUSL_SHIM_BITS_ALLTYPES_H__ */ diff --git a/tools/musl_include_shims/endian.h b/tools/musl_include_shims/endian.h new file mode 100644 index 0000000..172c432 --- /dev/null +++ b/tools/musl_include_shims/endian.h @@ -0,0 +1,80 @@ +#ifndef _ENDIAN_H +#define _ENDIAN_H + +#include <features.h> + +#define __NEED_uint16_t +#define __NEED_uint32_t +#define __NEED_uint64_t + +#include <bits/alltypes.h> + +#define __PDP_ENDIAN 3412 + +#define BIG_ENDIAN __BIG_ENDIAN +#define LITTLE_ENDIAN __LITTLE_ENDIAN +#define PDP_ENDIAN __PDP_ENDIAN +#define BYTE_ORDER __BYTE_ORDER + +static __inline uint16_t __bswap16(uint16_t __x) +{ + return __x<<8 | __x>>8; +} + +static __inline uint32_t __bswap32(uint32_t __x) +{ + return __x>>24 | __x>>8&0xff00 | __x<<8&0xff0000 | __x<<24; +} + +static __inline uint64_t __bswap64(uint64_t __x) +{ + return __bswap32(__x)+0ULL<<32 | __bswap32(__x>>32); +} + +#if __BYTE_ORDER == __LITTLE_ENDIAN +#define htobe16(x) __bswap16(x) +#define be16toh(x) __bswap16(x) +#define htobe32(x) __bswap32(x) +#define be32toh(x) __bswap32(x) +#define htobe64(x) __bswap64(x) +#define be64toh(x) __bswap64(x) +#define htole16(x) (uint16_t)(x) +#define le16toh(x) (uint16_t)(x) +#define htole32(x) (uint32_t)(x) +#define le32toh(x) (uint32_t)(x) +#define htole64(x) (uint64_t)(x) +#define le64toh(x) (uint64_t)(x) +#else +#define htobe16(x) (uint16_t)(x) +#define be16toh(x) (uint16_t)(x) +#define htobe32(x) (uint32_t)(x) +#define be32toh(x) (uint32_t)(x) +#define htobe64(x) (uint64_t)(x) +#define be64toh(x) (uint64_t)(x) +#define htole16(x) __bswap16(x) +#define le16toh(x) __bswap16(x) +#define htole32(x) __bswap32(x) +#define le32toh(x) __bswap32(x) +#define htole64(x) __bswap64(x) +#define le64toh(x) __bswap64(x) +#endif + +#if defined(_GNU_SOURCE) || defined(_BSD_SOURCE) +#if __BYTE_ORDER == __LITTLE_ENDIAN +#define betoh16(x) __bswap16(x) +#define betoh32(x) __bswap32(x) +#define betoh64(x) __bswap64(x) +#define letoh16(x) (uint16_t)(x) +#define letoh32(x) (uint32_t)(x) +#define letoh64(x) (uint64_t)(x) +#else +#define betoh16(x) (uint16_t)(x) +#define betoh32(x) (uint32_t)(x) +#define betoh64(x) (uint64_t)(x) +#define letoh16(x) __bswap16(x) +#define letoh32(x) __bswap32(x) +#define letoh64(x) __bswap64(x) +#endif +#endif + +#endif diff --git a/tools/musl_include_shims/features.h b/tools/musl_include_shims/features.h new file mode 100644 index 0000000..85cfb72 --- /dev/null +++ b/tools/musl_include_shims/features.h @@ -0,0 +1,40 @@ +#ifndef _FEATURES_H +#define _FEATURES_H + +#if defined(_ALL_SOURCE) && !defined(_GNU_SOURCE) +#define _GNU_SOURCE 1 +#endif + +#if defined(_DEFAULT_SOURCE) && !defined(_BSD_SOURCE) +#define _BSD_SOURCE 1 +#endif + +#if !defined(_POSIX_SOURCE) && !defined(_POSIX_C_SOURCE) \ + && !defined(_XOPEN_SOURCE) && !defined(_GNU_SOURCE) \ + && !defined(_BSD_SOURCE) && !defined(__STRICT_ANSI__) +#define _BSD_SOURCE 1 +#define _XOPEN_SOURCE 700 +#endif + +#if __STDC_VERSION__ >= 199901L +#define __restrict restrict +#elif !defined(__GNUC__) +#define __restrict +#endif + +#if __STDC_VERSION__ >= 199901L || defined(__cplusplus) +#define __inline inline +#elif !defined(__GNUC__) +#define __inline +#endif + +#if __STDC_VERSION__ >= 201112L +#elif defined(__GNUC__) +#define _Noreturn __attribute__((__noreturn__)) +#else +#define _Noreturn +#endif + +#define __REDIR(x,y) __typeof__(x) x __asm__(#y) + +#endif diff --git a/tools/musl_include_shims/fp_arch.h b/tools/musl_include_shims/fp_arch.h new file mode 100644 index 0000000..f5bab6d --- /dev/null +++ b/tools/musl_include_shims/fp_arch.h @@ -0,0 +1,6 @@ +#ifndef __MUSL_SHIM_FP_ARCH_H__ +#define __MUSL_SHIM_FP_ARCH_H__ + +#define hidden + +#endif /* __MUSL_SHIM_FP_ARCH_H__ */ diff --git a/tools/musl_include_shims/libm.h b/tools/musl_include_shims/libm.h new file mode 100644 index 0000000..d48135d --- /dev/null +++ b/tools/musl_include_shims/libm.h @@ -0,0 +1,270 @@ +#ifndef _LIBM_H +#define _LIBM_H + +#include <stdint.h> +#include <float.h> +#include <math.h> +#include <endian.h> +#include "fp_arch.h" + +#if LDBL_MANT_DIG == 53 && LDBL_MAX_EXP == 1024 +#elif LDBL_MANT_DIG == 64 && LDBL_MAX_EXP == 16384 && __BYTE_ORDER == __LITTLE_ENDIAN +union ldshape { + long double f; + struct { + uint64_t m; + uint16_t se; + } i; +}; +#elif LDBL_MANT_DIG == 64 && LDBL_MAX_EXP == 16384 && __BYTE_ORDER == __BIG_ENDIAN +/* This is the m68k variant of 80-bit long double, and this definition only works + * on archs where the alignment requirement of uint64_t is <= 4. */ +union ldshape { + long double f; + struct { + uint16_t se; + uint16_t pad; + uint64_t m; + } i; +}; +#elif LDBL_MANT_DIG == 113 && LDBL_MAX_EXP == 16384 && __BYTE_ORDER == __LITTLE_ENDIAN +union ldshape { + long double f; + struct { + uint64_t lo; + uint32_t mid; + uint16_t top; + uint16_t se; + } i; + struct { + uint64_t lo; + uint64_t hi; + } i2; +}; +#elif LDBL_MANT_DIG == 113 && LDBL_MAX_EXP == 16384 && __BYTE_ORDER == __BIG_ENDIAN +union ldshape { + long double f; + struct { + uint16_t se; + uint16_t top; + uint32_t mid; + uint64_t lo; + } i; + struct { + uint64_t hi; + uint64_t lo; + } i2; +}; +#else +#error Unsupported long double representation +#endif + +/* Support non-nearest rounding mode. */ +#define WANT_ROUNDING 1 +/* Support signaling NaNs. */ +#define WANT_SNAN 0 + +#if WANT_SNAN +#error SNaN is unsupported +#else +#define issignalingf_inline(x) 0 +#define issignaling_inline(x) 0 +#endif + +#ifndef TOINT_INTRINSICS +#define TOINT_INTRINSICS 0 +#endif + +#if TOINT_INTRINSICS +/* Round x to nearest int in all rounding modes, ties have to be rounded + consistently with converttoint so the results match. If the result + would be outside of [-2^31, 2^31-1] then the semantics is unspecified. */ +static double_t roundtoint(double_t); + +/* Convert x to nearest int in all rounding modes, ties have to be rounded + consistently with roundtoint. If the result is not representible in an + int32_t then the semantics is unspecified. */ +static int32_t converttoint(double_t); +#endif + +/* Helps static branch prediction so hot path can be better optimized. */ +#ifdef __GNUC__ +#define predict_true(x) __builtin_expect(!!(x), 1) +#define predict_false(x) __builtin_expect(x, 0) +#else +#define predict_true(x) (x) +#define predict_false(x) (x) +#endif + +/* Evaluate an expression as the specified type. With standard excess + precision handling a type cast or assignment is enough (with + -ffloat-store an assignment is required, in old compilers argument + passing and return statement may not drop excess precision). */ + +static inline float eval_as_float(float x) +{ + float y = x; + return y; +} + +static inline double eval_as_double(double x) +{ + double y = x; + return y; +} + +/* fp_barrier returns its input, but limits code transformations + as if it had a side-effect (e.g. observable io) and returned + an arbitrary value. */ + +#ifndef fp_barrierf +#define fp_barrierf fp_barrierf +static inline float fp_barrierf(float x) +{ + volatile float y = x; + return y; +} +#endif + +#ifndef fp_barrier +#define fp_barrier fp_barrier +static inline double fp_barrier(double x) +{ + volatile double y = x; + return y; +} +#endif + +#ifndef fp_barrierl +#define fp_barrierl fp_barrierl +static inline long double fp_barrierl(long double x) +{ + volatile long double y = x; + return y; +} +#endif + +/* fp_force_eval ensures that the input value is computed when that's + otherwise unused. To prevent the constant folding of the input + expression, an additional fp_barrier may be needed or a compilation + mode that does so (e.g. -frounding-math in gcc). Then it can be + used to evaluate an expression for its fenv side-effects only. */ + +#ifndef fp_force_evalf +#define fp_force_evalf fp_force_evalf +static inline void fp_force_evalf(float x) +{ + volatile float y; + y = x; +} +#endif + +#ifndef fp_force_eval +#define fp_force_eval fp_force_eval +static inline void fp_force_eval(double x) +{ + volatile double y; + y = x; +} +#endif + +#ifndef fp_force_evall +#define fp_force_evall fp_force_evall +static inline void fp_force_evall(long double x) +{ + volatile long double y; + y = x; +} +#endif + +#define FORCE_EVAL(x) do { \ + if (sizeof(x) == sizeof(float)) { \ + fp_force_evalf(x); \ + } else if (sizeof(x) == sizeof(double)) { \ + fp_force_eval(x); \ + } else { \ + fp_force_evall(x); \ + } \ +} while(0) + +#define asuint(f) ((union{float _f; uint32_t _i;}){f})._i +#define asfloat(i) ((union{uint32_t _i; float _f;}){i})._f +#define asuint64(f) ((union{double _f; uint64_t _i;}){f})._i +#define asdouble(i) ((union{uint64_t _i; double _f;}){i})._f + +#define EXTRACT_WORDS(hi,lo,d) \ +do { \ + uint64_t __u = asuint64(d); \ + (hi) = __u >> 32; \ + (lo) = (uint32_t)__u; \ +} while (0) + +#define GET_HIGH_WORD(hi,d) \ +do { \ + (hi) = asuint64(d) >> 32; \ +} while (0) + +#define GET_LOW_WORD(lo,d) \ +do { \ + (lo) = (uint32_t)asuint64(d); \ +} while (0) + +#define INSERT_WORDS(d,hi,lo) \ +do { \ + (d) = asdouble(((uint64_t)(hi)<<32) | (uint32_t)(lo)); \ +} while (0) + +#define SET_HIGH_WORD(d,hi) \ + INSERT_WORDS(d, hi, (uint32_t)asuint64(d)) + +#define SET_LOW_WORD(d,lo) \ + INSERT_WORDS(d, asuint64(d)>>32, lo) + +#define GET_FLOAT_WORD(w,d) \ +do { \ + (w) = asuint(d); \ +} while (0) + +#define SET_FLOAT_WORD(d,w) \ +do { \ + (d) = asfloat(w); \ +} while (0) + +hidden int __rem_pio2_large(double*,double*,int,int,int); + +hidden int __rem_pio2(double,double*); +hidden double __sin(double,double,int); +hidden double __cos(double,double); +hidden double __tan(double,double,int); +hidden double __expo2(double); + +hidden int __rem_pio2f(float,double*); +hidden float __sindf(double); +hidden float __cosdf(double); +hidden float __tandf(double,int); +hidden float __expo2f(float); + +hidden int __rem_pio2l(long double, long double *); +hidden long double __sinl(long double, long double, int); +hidden long double __cosl(long double, long double); +hidden long double __tanl(long double, long double, int); + +hidden long double __polevll(long double, const long double *, int); +hidden long double __p1evll(long double, const long double *, int); + +hidden double __lgamma_r(double, int *); +hidden float __lgammaf_r(float, int *); + +/* error handling functions */ +hidden float __math_xflowf(uint32_t, float); +hidden float __math_uflowf(uint32_t); +hidden float __math_oflowf(uint32_t); +hidden float __math_divzerof(uint32_t); +hidden float __math_invalidf(float); +hidden double __math_xflow(uint32_t, double); +hidden double __math_uflow(uint32_t); +hidden double __math_oflow(uint32_t); +hidden double __math_divzero(uint32_t); +hidden double __math_invalid(double); + +#endif diff --git a/tools/usb_test.py b/tools/usb_test.py new file mode 100644 index 0000000..0f61593 --- /dev/null +++ b/tools/usb_test.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python + +import time +from pprint import pprint +from enum import Enum +from functools import cache +from dataclasses import dataclass, fields, astuple +import struct +import binascii + +import numpy as np +import click +import serial +from cobs import cobs + +class CobsSerial: + def __init__(self, port, timeout): + self.ser = serial.Serial(port, timeout=timeout) + self.ser.flushOutput() + self.ser.flushInput() + self.ser.write(bytes([0])) # synchronize + self.ser.flushOutput() + + def write_packet(self, data): + self.ser.write(cobs.encode(data)) + self.ser.write(bytes([0])) + self.ser.flushOutput() + + def read_packet(self): + data = b'' + while (b := self.ser.read(1)): + if b[0] == 0: + break + data += b + + if data: + return parse_packet(cobs.decode(data)) + else: + return None + + def command(self, command, args=b''): + self.write_packet(bytes([command.value]) + args) + return self.read_packet() + + +class SerializableEnum(Enum): + def __int__(self): + return self.value + +class PacketType(SerializableEnum): + USBP_GET_STATUS = 0 + USBP_GET_MEASUREMENTS = 1 + USBP_SET_MOTOR = 2 + +class ErrorCode(Enum): + ERR_SUCCESS = 0 + ERR_TIMEOUT = 1 + ERR_PHYSICAL_LAYER = 2 + ERR_FRAMING = 3 + ERR_PROTOCOL = 4 + ERR_DMA = 5 + ERR_BUSY = 6 + ERR_BUFFER_OVERFLOW = 7 + ERR_RX_OVERRUN = 8 + ERR_TX_OVERRUN = 9 + +class BoardConfig(Enum): + BCFG_UNCONFIGURED = 0 + BCFG_DISPLAY = 1 + BCFG_MOTOR = 2 + BCFG_MEAS = 3 + +class Serialized: + @classmethod + def deserialize(kls, data): + fields = struct.unpack(kls._struct_format(), data) + mapped = [cast(val) for cast, val in zip(kls._struct_casts(), fields)] + return kls(*mapped) + + def serialize(self): + mapped = [uncast(val) for uncast, val in zip(self._struct_uncasts(), astuple(self))] + return struct.pack(self._struct_format(), *mapped) + + @classmethod + @cache + def _struct_format(kls): + return kls._parse_fields()[0] + + @classmethod + @cache + def _struct_casts(kls): + return kls._parse_fields()[1] + + @classmethod + @cache + def _struct_uncasts(kls): + return kls._parse_fields()[2] + + @classmethod + def _parse_fields(kls): + fmt = '<' + casts = [] + uncasts = [] + for field in fields(kls): + if isinstance(field.type, tuple): + struct_type, cast, uncast, *_ = *field.type, int + else: + struct_type, cast, uncast = field.type, int, int + fmt += struct_type + casts.append(cast) + uncasts.append(uncast) + return fmt, casts, uncasts + +def timestamp(value): + return float(value) / 1e6 + +@dataclass +class StatusPacket(Serialized): + packet_type: ('B', PacketType) + sys_time_us: ('Q', timestamp) + has_lcd: ('B', bool) + has_adc: ('B', bool) + board_config: ('B', BoardConfig) + bus_addr: 'B' + last_uart_error: ('B', ErrorCode) + last_uart_error_timestamp: ('Q', timestamp) + last_uart_rx: ('Q', timestamp) + last_uart_tx: ('Q', timestamp) + last_bus_error: ('B', ErrorCode) + last_bus_error_timestamp: ('Q', timestamp) + +@dataclass +class MotorPacket(Serialized): + packet_type: ('B', PacketType) + speed_rpm: 'i' + +def parse_packet(data): + packet_type = PacketType(data[0]) + if packet_type == PacketType.USBP_GET_STATUS: + return StatusPacket.deserialize(data) + if packet_type == PacketType.USBP_GET_MEASUREMENTS: + return MeasurementPacket.deserialize(data) + else: + raise ValueError(f'Unsupported packet type {packet_type}') + +@dataclass +class MeasurementPacket(Serialized): + packet_type: ('B', PacketType) + num_channels: 'B' + _num_samples_a: 'I' + _num_samples_b: 'I' + _measurements_raw: ('240s', bytes) + + @property + def measurements(self): + return np.frombuffer(self._measurements_raw, np.dtype(np.int32).newbyteorder('<')).reshape([2, 2, -1]) + + @property + def num_samples(self): + return [self._num_samples_a, self._num_samples_b] + +@click.group() +def cli(): + pass + +@cli.command() +@click.argument('port') +@click.option('--timeout', type=float, default=1) +def probe(port, timeout): + ser = CobsSerial(port, timeout) + pprint(ser.command(PacketType.USBP_GET_STATUS)) + while True: + time.sleep(0.01) + packet = ser.command(PacketType.USBP_GET_MEASUREMENTS) + for i in range(packet.num_samples[1]): + print(packet.measurements[1,1,i], packet.num_samples[1]) + +@cli.command() +@click.argument('port') +@click.argument('speed_rpm', type=int, default=0) +@click.option('--timeout', type=float, default=1) +def motor(port, speed_rpm, timeout): + ser = CobsSerial(port, timeout) + packet = MotorPacket(PacketType.USBP_SET_MOTOR, speed_rpm) + ser.write_packet(packet.serialize()) + +if __name__ == '__main__': + cli() |