summaryrefslogtreecommitdiff
path: root/prototype/fw
diff options
context:
space:
mode:
authorjaseg <git@jaseg.de>2020-11-26 17:42:05 +0100
committerjaseg <git@jaseg.de>2020-11-26 17:42:05 +0100
commit44548df52f8f5e1dcfdc7b3cfdd7024d9d3bf50a (patch)
tree9198f87b0f41ef96aa4f51db1595b9156b7955a1 /prototype/fw
parent9b5625f12ae067a1583252d41febef848f055588 (diff)
downloadihsm-44548df52f8f5e1dcfdc7b3cfdd7024d9d3bf50a.tar.gz
ihsm-44548df52f8f5e1dcfdc7b3cfdd7024d9d3bf50a.tar.bz2
ihsm-44548df52f8f5e1dcfdc7b3cfdd7024d9d3bf50a.zip
Add COBS encoder
Diffstat (limited to 'prototype/fw')
-rw-r--r--prototype/fw/.gitignore1
-rw-r--r--prototype/fw/Makefile16
-rw-r--r--prototype/fw/src/microcobs.c99
-rw-r--r--prototype/fw/src/microcobs.h15
-rw-r--r--prototype/fw/test/crc32_test.c (renamed from prototype/fw/src/crc32_test.c)0
-rw-r--r--prototype/fw/test/microcobs.py145
-rw-r--r--prototype/fw/test/microcobs_test.c72
-rw-r--r--prototype/fw/test/microcobs_test_sg.c94
8 files changed, 439 insertions, 3 deletions
diff --git a/prototype/fw/.gitignore b/prototype/fw/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/prototype/fw/.gitignore
@@ -0,0 +1 @@
+__pycache__
diff --git a/prototype/fw/Makefile b/prototype/fw/Makefile
index 3fbd029..18e9447 100644
--- a/prototype/fw/Makefile
+++ b/prototype/fw/Makefile
@@ -196,12 +196,19 @@ $(BUILDDIR)/%.o: %.s
mkdir -p $(@D)
$(CC) $(COMMON_CFLAGS) $(CFLAGS) $(EXT_CFLAGS) -o $@ -c $<
-$(BUILDDIR)/crc32_test: src/crc32_test.c src/crc32.c
- $(HOSTCC) $(HOST_CFLAGS) -o $@ $^
+$(BUILDDIR)/crc32_test: src/crc32.c test/crc32_test.c
+ $(HOSTCC) $(HOST_CFLAGS) -o $@ -Isrc $^
+
+$(BUILDDIR)/microcobs_test_sg: src/microcobs.c test/microcobs_test_sg.c
+ $(HOSTCC) $(HOST_CFLAGS) -o $@ -Isrc $^
+
+$(BUILDDIR)/microcobs_test: src/microcobs.c test/microcobs_test.c
+ $(HOSTCC) $(HOST_CFLAGS) -o $@ -Isrc $^
.PHONY: run_tests
-run_tests: $(BUILDDIR)/crc32_test
+run_tests: $(BUILDDIR)/crc32_test $(BUILDDIR)/microcobs_test_sg $(BUILDDIR)/microcobs_test
$(PYTHON3) -m unittest test.crc32_ref
+ $(PYTHON3) -m unittest test.microcobs
venv:
test -d venv || python3 -m venv --system-site-packages venv
@@ -214,6 +221,9 @@ clean:
rm -f $(BUILDDIR)/$(BINARY:.elf=.map)
rm -f $(BUILDDIR)/$(BINARY:.elf=-symbol-sizes.dot)
rm -f $(BUILDDIR)/$(BINARY:.elf=-symbol-sizes.pdf)
+ rm -f $(BUILDDIR)/crc32_test
+ rm -f $(BUILDDIR)/microcobs_test_sg
+ rm -f $(BUILDDIR)/microcobs_test
mrproper: clean
rm -rf build
diff --git a/prototype/fw/src/microcobs.c b/prototype/fw/src/microcobs.c
new file mode 100644
index 0000000..58d5e7b
--- /dev/null
+++ b/prototype/fw/src/microcobs.c
@@ -0,0 +1,99 @@
+
+#include <stdint.h>
+#include "microcobs.h"
+#include <stdio.h>
+
+ssize_t cobs_encode_sg(const struct sg_entry input[], uint8_t *output, size_t output_len)
+{
+ size_t idx_pos = 0;
+ size_t out_pos = 1;
+ size_t sg_idx = 0;
+
+ const struct sg_entry *e = input;
+ fprintf(stderr, "\nsg_entry %016x %zd\n", e->target, e->size);
+ while (e->size >= 0) {
+ if (sg_idx == e->size) {
+ sg_idx = 0;
+ e++;
+ fprintf(stderr, "\nsg_entry %016x %zd\n", e->target, e->size);
+ continue;
+ }
+
+ if (out_pos >= output_len)
+ return -1;
+
+ uint8_t inbyte = e->target[sg_idx];
+ fprintf(stderr, "%02x ", inbyte);
+ sg_idx += 1;
+
+ if (out_pos - idx_pos >= 255) {
+ output[idx_pos] = 255;
+ idx_pos = out_pos;
+ out_pos += 1;
+
+ if (out_pos >= output_len)
+ return -1;
+ }
+
+ if (inbyte) {
+ output[out_pos] = inbyte;
+ out_pos += 1;
+ } else {
+ output[idx_pos] = out_pos - idx_pos;
+ idx_pos = out_pos;
+ out_pos += 1;
+ }
+ }
+
+ if (out_pos >= output_len)
+ return -1;
+ output[idx_pos] = out_pos - idx_pos;
+ output[out_pos] = 0x00;
+ fprintf(stderr, "\n");
+ fprintf(stderr, "Finishing %d %d %d\n", idx_pos, out_pos, out_pos - idx_pos);
+
+ return out_pos + 1;
+}
+
+ssize_t cobs_encode(const uint8_t *input, size_t input_len, uint8_t *output, size_t output_len)
+{
+ size_t idx_pos = 0;
+ size_t out_pos = 1;
+ size_t in_pos = 0;
+
+ while (in_pos < input_len) {
+ if (out_pos >= output_len)
+ return -1;
+
+ uint8_t inbyte = input[in_pos];
+ fprintf(stderr, "%02x ", inbyte);
+ in_pos += 1;
+
+ if (out_pos - idx_pos >= 255) {
+ output[idx_pos] = 255;
+ idx_pos = out_pos;
+ out_pos += 1;
+
+ if (out_pos >= output_len)
+ return -1;
+ }
+
+ if (inbyte) {
+ output[out_pos] = inbyte;
+ out_pos += 1;
+ } else {
+ output[idx_pos] = out_pos - idx_pos;
+ idx_pos = out_pos;
+ out_pos += 1;
+ }
+ }
+
+ if (out_pos >= output_len)
+ return -1;
+ output[idx_pos] = out_pos - idx_pos;
+ output[out_pos] = 0x00;
+ fprintf(stderr, "\n");
+ fprintf(stderr, "Finishing %d %d %d\n", idx_pos, out_pos, out_pos - idx_pos);
+
+ return out_pos + 1;
+}
diff --git a/prototype/fw/src/microcobs.h b/prototype/fw/src/microcobs.h
new file mode 100644
index 0000000..774a27b
--- /dev/null
+++ b/prototype/fw/src/microcobs.h
@@ -0,0 +1,15 @@
+#ifndef __MICROCOBS_H__
+#define __MICROCOBS_H__
+
+#include <stdint.h>
+#include <unistd.h>
+
+struct sg_entry {
+ uint8_t *target;
+ ssize_t size;
+};
+
+ssize_t cobs_encode_sg(const struct sg_entry input[], uint8_t *output, size_t output_len);
+ssize_t cobs_encode(const uint8_t *input, size_t input_len, uint8_t *output, size_t output_len);
+
+#endif /* __MICROCOBS_H__ */
diff --git a/prototype/fw/src/crc32_test.c b/prototype/fw/test/crc32_test.c
index e23bb37..e23bb37 100644
--- a/prototype/fw/src/crc32_test.c
+++ b/prototype/fw/test/crc32_test.c
diff --git a/prototype/fw/test/microcobs.py b/prototype/fw/test/microcobs.py
new file mode 100644
index 0000000..1d09efd
--- /dev/null
+++ b/prototype/fw/test/microcobs.py
@@ -0,0 +1,145 @@
+
+import itertools
+import unittest
+import os
+import subprocess
+import binascii
+import random
+import tempfile
+
+from cobs import cobs
+
+INTERESTING = bytes([1, 2, 3, 0, 1, 2, 3, 0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 3] + list(range(256)))
+class MicrocobsScatterGatherTest(unittest.TestCase):
+ def do_test(self, data_sglist):
+ data_sglist = list(data_sglist)
+ ref = cobs.encode(b''.join(data_sglist))
+ input = b'\n'.join(binascii.hexlify(x) for x in data_sglist)
+
+ debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False)
+ debug_file.write(input)
+
+ try:
+ test = subprocess.check_output(os.getenv('MICROCOBS_SG_TEST_BINARY', 'build/microcobs_test_sg'),
+ input=input, stderr=subprocess.DEVNULL)
+ test = binascii.unhexlify(test.strip())
+
+ self.assertEqual(test[-1], 0, 'Missing terminating null byte')
+ self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}')
+
+ debug_file.close()
+ os.remove(debug_file.name)
+ except Exception as e:
+ raise SystemError(f'Test error for input {debug_file.name}') from e
+
+
+ def test_empty_chunks(self):
+ for i in range(256):
+ self.do_test([b''] * i)
+
+
+ def test_interspersed_empty_chunks(self):
+ testdata = INTERESTING
+ for i in range(len(testdata) + 1):
+ self.do_test([testdata[:i], b'', testdata[i:]])
+
+ def test_interspersed_double_empty_chunks(self):
+ testdata = INTERESTING
+ for i in range(len(testdata) + 1):
+ self.do_test([testdata[:i], b'', b'', testdata[i:]])
+
+ def test_one_byte_chunks(self):
+ testdata = INTERESTING
+ self.do_test(itertools.chain.from_iterable(zip([bytes([x]) for x in testdata], [b''] * len(testdata))))
+
+ def test_one_byte(self):
+ for i in range(256):
+ self.do_test([bytes([i])])
+
+ def test_lengths(self):
+ for i in range(260):
+ self.do_test([bytes([0xff] * i)])
+
+ def test_null_then_lengths(self):
+ for i in range(256):
+ self.do_test([bytes([0] + [0xff] * i)])
+
+ def test_lengths_then_null(self):
+ for i in range(256):
+ self.do_test([bytes([0xff] * i + [0])])
+
+ def test_two_byte(self):
+ for i in range(4):
+ for j in range(4):
+ self.do_test([bytes([i, j])])
+ self.do_test([bytes([i]), bytes([j])])
+
+ def test_long(self):
+ for i in range(5):
+ self.do_test([b'A' * (100 + 256*i)])
+ self.do_test([b'A' * 100] * 256*i)
+
+ def test_random(self):
+ for i in range(1000):
+ testdata = os.urandom(random.randint(0, 600))
+
+ chunks = [testdata]
+ for i in range(random.randint(0, len(testdata) // 5)):
+ idx = random.randint(0, len(chunks)-1)
+ chunk = chunks[idx]
+ bidx = random.randint(0, len(chunk))
+ chunks = chunks[:idx] + [chunk[:bidx], chunk[bidx:]] + chunks[idx+1:]
+
+ self.do_test(chunks)
+
+class MicrocobsTest(unittest.TestCase):
+ def do_test(self, data):
+ ref = cobs.encode(data)
+ input = binascii.hexlify(data)
+
+ debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False)
+ debug_file.write(input)
+
+ try:
+ test = subprocess.check_output(os.getenv('MICROCOBS_TEST_BINARY', 'build/microcobs_test'),
+ input=input, stderr=subprocess.DEVNULL)
+ test = binascii.unhexlify(test.strip())
+
+ self.assertEqual(test[-1], 0, 'Missing terminating null byte')
+ self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}')
+
+ debug_file.close()
+ os.remove(debug_file.name)
+ except Exception as e:
+ raise SystemError(f'Test error for input {debug_file.name}') from e
+
+
+ def test_one_byte(self):
+ for i in range(256):
+ self.do_test(bytes([i]))
+
+ def test_lengths(self):
+ for i in range(260):
+ self.do_test(bytes([0xff] * i))
+
+ def test_null_then_lengths(self):
+ for i in range(256):
+ self.do_test(bytes([0] + [0xff] * i))
+
+ def test_lengths_then_null(self):
+ for i in range(256):
+ self.do_test(bytes([0xff] * i + [0]))
+
+ def test_two_byte(self):
+ for i in range(4):
+ for j in range(4):
+ self.do_test(bytes([i, j]))
+
+ def test_long(self):
+ for i in range(5):
+ self.do_test(b'A' * (100 + 256*i))
+
+ def test_random(self):
+ for i in range(10000):
+ self.do_test(os.urandom(random.randint(0, 600)))
+
diff --git a/prototype/fw/test/microcobs_test.c b/prototype/fw/test/microcobs_test.c
new file mode 100644
index 0000000..74ee803
--- /dev/null
+++ b/prototype/fw/test/microcobs_test.c
@@ -0,0 +1,72 @@
+
+#include <stdint.h>
+#include <stdio.h>
+#include <malloc.h>
+#include <string.h>
+
+#include "microcobs.h"
+
+static int parse_hex(char c) {
+ if ('0' <= c && c <= '9')
+ return c - '0';
+ if ('a' <= c && c <= 'f')
+ return c - 'a' + 0xa;
+ if ('A' <= c && c <= 'F')
+ return c - 'A' + 0xA;
+ return -1;
+}
+
+int main(void) {
+ char buf[2];
+
+ size_t buf_size = 1024;
+ uint8_t *data_buf = malloc(buf_size);
+ if (!data_buf)
+ return -99;
+ size_t total_bytes = 0;
+
+ do {
+ ssize_t nread = fread(buf, 1, 2, stdin);
+ if (nread < 2) {
+ fprintf(stderr, "bk %d\n", nread);
+ break;
+ }
+ fprintf(stderr, "read: %d\n", nread);
+
+ int a = parse_hex(buf[0]), b = parse_hex(buf[1]);
+ if (a < 0 || b < 0)
+ fprintf(stderr, "Error: even number of hex digits expected\n");
+
+ int byte = a<<4 | b;
+
+ if (total_bytes >= buf_size) {
+ buf_size += 1024;
+ data_buf = realloc(data_buf, buf_size);
+ if (!data_buf)
+ return -99;
+ }
+
+ data_buf[total_bytes] = byte;
+ total_bytes += 1;
+ } while(23);
+
+ /* Terminate list */
+ fprintf(stderr, "Got %d bytes\n", total_bytes);
+
+ /* Reserve extra bytes for long input lines. Arbitrary length support means this cobs implemenetation is no longer
+ * fixed overhead of 1 byte, but instead variable overhead of worst-case O(1 + n/254) for input length n. */
+ size_t output_size = total_bytes*2 + 100;
+ uint8_t *output_buf = malloc(output_size);
+
+ ssize_t rv = cobs_encode(data_buf, total_bytes, output_buf, output_size);
+ if (rv > 0) {
+ for (size_t i=0; i<rv; i++) {
+ printf("%02x", output_buf[i]);
+ }
+ } else {
+ printf("NONZERO RETURN VALUE: %d\n", rv);
+ }
+ printf("\n");
+
+ return 0;
+}
diff --git a/prototype/fw/test/microcobs_test_sg.c b/prototype/fw/test/microcobs_test_sg.c
new file mode 100644
index 0000000..592dfc5
--- /dev/null
+++ b/prototype/fw/test/microcobs_test_sg.c
@@ -0,0 +1,94 @@
+
+#include <stdint.h>
+#include <stdio.h>
+#include <malloc.h>
+#include <string.h>
+
+#include "microcobs.h"
+
+static int parse_hex(char c) {
+ if ('0' <= c && c <= '9')
+ return c - '0';
+ if ('a' <= c && c <= 'f')
+ return c - 'a' + 0xa;
+ if ('A' <= c && c <= 'F')
+ return c - 'A' + 0xA;
+ return -1;
+}
+
+int main(void) {
+ char buf[2];
+
+ size_t sgl_len = 100;
+ struct sg_entry *sgl = calloc(sgl_len, sizeof(struct sg_entry));
+
+ size_t sg_i = 0;
+ size_t sgi_size = 1024;
+ size_t total_bytes = 0;
+
+ do {
+ ssize_t nread = fread(buf, 1, 1, stdin);
+ if (nread == 0)
+ break;
+ fprintf(stderr, "read: %d\n", nread);
+
+ if (buf[0] == '\n') {
+ fprintf(stderr, "Found newline for chunk %d\n", sg_i);
+ sg_i += 1;
+ sgi_size = 1024;
+ if (sg_i+1 >= sgl_len) { /* always keep last entry free for termination */
+ size_t old_sgl_len = sgl_len;
+ sgl_len += 50;
+ sgl = reallocarray(sgl, sgl_len, sizeof(struct sg_entry));
+ if (!sgl)
+ return -99;
+ memset(sgl+old_sgl_len, 0, (sgl_len - old_sgl_len) * sizeof(struct sg_entry));
+ }
+ continue;
+ }
+
+ nread = fread(buf + 1, 1, 1, stdin);
+ if (nread == 0)
+ break;
+ fprintf(stderr, "read: %d\n", nread);
+
+ int a = parse_hex(buf[0]), b = parse_hex(buf[1]);
+ if (a < 0 || b < 0)
+ fprintf(stderr, "Error: even number of hex digits expected\n");
+
+ int byte = a<<4 | b;
+ if (sgl[sg_i].target == 0) {
+ sgl[sg_i].target = malloc(sgi_size);
+ } else if (sgl[sg_i].size == sgi_size) {
+ sgi_size += 1024;
+ sgl[sg_i].target = realloc(sgl[sg_i].target, sgi_size);
+ if (!sgl[sg_i].target)
+ return -99;
+ }
+ sgl[sg_i].target[sgl[sg_i].size] = byte;
+ sgl[sg_i].size += 1;
+ total_bytes += 1;
+ } while(23);
+
+ /* Terminate list */
+ sg_i += 1;
+ sgl[sg_i].size = -1;
+ fprintf(stderr, "Got %d chunks\n", sg_i);
+
+ /* Reserve extra bytes for long input lines. Arbitrary length support means this cobs implemenetation is no longer
+ * fixed overhead of 1 byte, but instead variable overhead of worst-case O(1 + n/254) for input length n. */
+ size_t output_size = total_bytes + 10000;
+ uint8_t *output_buf = malloc(output_size);
+
+ ssize_t rv = cobs_encode_sg(sgl, output_buf, output_size);
+ if (rv > 0) {
+ for (size_t i=0; i<rv; i++) {
+ printf("%02x", output_buf[i]);
+ }
+ } else {
+ printf("NONZERO RETURN VALUE: %d\n", rv);
+ }
+ printf("\n");
+
+ return 0;
+}