diff options
Diffstat (limited to 'prototype/fw/test')
-rw-r--r-- | prototype/fw/test/crc32_test.c | 20 | ||||
-rw-r--r-- | prototype/fw/test/microcobs.py | 145 | ||||
-rw-r--r-- | prototype/fw/test/microcobs_test.c | 72 | ||||
-rw-r--r-- | prototype/fw/test/microcobs_test_sg.c | 94 |
4 files changed, 331 insertions, 0 deletions
diff --git a/prototype/fw/test/crc32_test.c b/prototype/fw/test/crc32_test.c new file mode 100644 index 0000000..e23bb37 --- /dev/null +++ b/prototype/fw/test/crc32_test.c @@ -0,0 +1,20 @@ + +#include <stdio.h> + +#include "crc32.h" + +int main(void) { + crc32_t state = crc32_reset(); + + do { + char c; + if (fread(&c, 1, 1, stdin) != 1) + break; + + state = crc32_update(state, (uint8_t)c); + } while(23); + + state = crc32_finalize(state); + printf("%08x\n", state); + return 0; +} diff --git a/prototype/fw/test/microcobs.py b/prototype/fw/test/microcobs.py new file mode 100644 index 0000000..1d09efd --- /dev/null +++ b/prototype/fw/test/microcobs.py @@ -0,0 +1,145 @@ + +import itertools +import unittest +import os +import subprocess +import binascii +import random +import tempfile + +from cobs import cobs + +INTERESTING = bytes([1, 2, 3, 0, 1, 2, 3, 0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 3] + list(range(256))) +class MicrocobsScatterGatherTest(unittest.TestCase): + def do_test(self, data_sglist): + data_sglist = list(data_sglist) + ref = cobs.encode(b''.join(data_sglist)) + input = b'\n'.join(binascii.hexlify(x) for x in data_sglist) + + debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False) + debug_file.write(input) + + try: + test = subprocess.check_output(os.getenv('MICROCOBS_SG_TEST_BINARY', 'build/microcobs_test_sg'), + input=input, stderr=subprocess.DEVNULL) + test = binascii.unhexlify(test.strip()) + + self.assertEqual(test[-1], 0, 'Missing terminating null byte') + self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}') + + debug_file.close() + os.remove(debug_file.name) + except Exception as e: + raise SystemError(f'Test error for input {debug_file.name}') from e + + + def test_empty_chunks(self): + for i in range(256): + self.do_test([b''] * i) + + + def test_interspersed_empty_chunks(self): + testdata = INTERESTING + for i in range(len(testdata) + 1): + self.do_test([testdata[:i], b'', testdata[i:]]) + + def test_interspersed_double_empty_chunks(self): + testdata = INTERESTING + for i in range(len(testdata) + 1): + self.do_test([testdata[:i], b'', b'', testdata[i:]]) + + def test_one_byte_chunks(self): + testdata = INTERESTING + self.do_test(itertools.chain.from_iterable(zip([bytes([x]) for x in testdata], [b''] * len(testdata)))) + + def test_one_byte(self): + for i in range(256): + self.do_test([bytes([i])]) + + def test_lengths(self): + for i in range(260): + self.do_test([bytes([0xff] * i)]) + + def test_null_then_lengths(self): + for i in range(256): + self.do_test([bytes([0] + [0xff] * i)]) + + def test_lengths_then_null(self): + for i in range(256): + self.do_test([bytes([0xff] * i + [0])]) + + def test_two_byte(self): + for i in range(4): + for j in range(4): + self.do_test([bytes([i, j])]) + self.do_test([bytes([i]), bytes([j])]) + + def test_long(self): + for i in range(5): + self.do_test([b'A' * (100 + 256*i)]) + self.do_test([b'A' * 100] * 256*i) + + def test_random(self): + for i in range(1000): + testdata = os.urandom(random.randint(0, 600)) + + chunks = [testdata] + for i in range(random.randint(0, len(testdata) // 5)): + idx = random.randint(0, len(chunks)-1) + chunk = chunks[idx] + bidx = random.randint(0, len(chunk)) + chunks = chunks[:idx] + [chunk[:bidx], chunk[bidx:]] + chunks[idx+1:] + + self.do_test(chunks) + +class MicrocobsTest(unittest.TestCase): + def do_test(self, data): + ref = cobs.encode(data) + input = binascii.hexlify(data) + + debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False) + debug_file.write(input) + + try: + test = subprocess.check_output(os.getenv('MICROCOBS_TEST_BINARY', 'build/microcobs_test'), + input=input, stderr=subprocess.DEVNULL) + test = binascii.unhexlify(test.strip()) + + self.assertEqual(test[-1], 0, 'Missing terminating null byte') + self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}') + + debug_file.close() + os.remove(debug_file.name) + except Exception as e: + raise SystemError(f'Test error for input {debug_file.name}') from e + + + def test_one_byte(self): + for i in range(256): + self.do_test(bytes([i])) + + def test_lengths(self): + for i in range(260): + self.do_test(bytes([0xff] * i)) + + def test_null_then_lengths(self): + for i in range(256): + self.do_test(bytes([0] + [0xff] * i)) + + def test_lengths_then_null(self): + for i in range(256): + self.do_test(bytes([0xff] * i + [0])) + + def test_two_byte(self): + for i in range(4): + for j in range(4): + self.do_test(bytes([i, j])) + + def test_long(self): + for i in range(5): + self.do_test(b'A' * (100 + 256*i)) + + def test_random(self): + for i in range(10000): + self.do_test(os.urandom(random.randint(0, 600))) + diff --git a/prototype/fw/test/microcobs_test.c b/prototype/fw/test/microcobs_test.c new file mode 100644 index 0000000..74ee803 --- /dev/null +++ b/prototype/fw/test/microcobs_test.c @@ -0,0 +1,72 @@ + +#include <stdint.h> +#include <stdio.h> +#include <malloc.h> +#include <string.h> + +#include "microcobs.h" + +static int parse_hex(char c) { + if ('0' <= c && c <= '9') + return c - '0'; + if ('a' <= c && c <= 'f') + return c - 'a' + 0xa; + if ('A' <= c && c <= 'F') + return c - 'A' + 0xA; + return -1; +} + +int main(void) { + char buf[2]; + + size_t buf_size = 1024; + uint8_t *data_buf = malloc(buf_size); + if (!data_buf) + return -99; + size_t total_bytes = 0; + + do { + ssize_t nread = fread(buf, 1, 2, stdin); + if (nread < 2) { + fprintf(stderr, "bk %d\n", nread); + break; + } + fprintf(stderr, "read: %d\n", nread); + + int a = parse_hex(buf[0]), b = parse_hex(buf[1]); + if (a < 0 || b < 0) + fprintf(stderr, "Error: even number of hex digits expected\n"); + + int byte = a<<4 | b; + + if (total_bytes >= buf_size) { + buf_size += 1024; + data_buf = realloc(data_buf, buf_size); + if (!data_buf) + return -99; + } + + data_buf[total_bytes] = byte; + total_bytes += 1; + } while(23); + + /* Terminate list */ + fprintf(stderr, "Got %d bytes\n", total_bytes); + + /* Reserve extra bytes for long input lines. Arbitrary length support means this cobs implemenetation is no longer + * fixed overhead of 1 byte, but instead variable overhead of worst-case O(1 + n/254) for input length n. */ + size_t output_size = total_bytes*2 + 100; + uint8_t *output_buf = malloc(output_size); + + ssize_t rv = cobs_encode(data_buf, total_bytes, output_buf, output_size); + if (rv > 0) { + for (size_t i=0; i<rv; i++) { + printf("%02x", output_buf[i]); + } + } else { + printf("NONZERO RETURN VALUE: %d\n", rv); + } + printf("\n"); + + return 0; +} diff --git a/prototype/fw/test/microcobs_test_sg.c b/prototype/fw/test/microcobs_test_sg.c new file mode 100644 index 0000000..592dfc5 --- /dev/null +++ b/prototype/fw/test/microcobs_test_sg.c @@ -0,0 +1,94 @@ + +#include <stdint.h> +#include <stdio.h> +#include <malloc.h> +#include <string.h> + +#include "microcobs.h" + +static int parse_hex(char c) { + if ('0' <= c && c <= '9') + return c - '0'; + if ('a' <= c && c <= 'f') + return c - 'a' + 0xa; + if ('A' <= c && c <= 'F') + return c - 'A' + 0xA; + return -1; +} + +int main(void) { + char buf[2]; + + size_t sgl_len = 100; + struct sg_entry *sgl = calloc(sgl_len, sizeof(struct sg_entry)); + + size_t sg_i = 0; + size_t sgi_size = 1024; + size_t total_bytes = 0; + + do { + ssize_t nread = fread(buf, 1, 1, stdin); + if (nread == 0) + break; + fprintf(stderr, "read: %d\n", nread); + + if (buf[0] == '\n') { + fprintf(stderr, "Found newline for chunk %d\n", sg_i); + sg_i += 1; + sgi_size = 1024; + if (sg_i+1 >= sgl_len) { /* always keep last entry free for termination */ + size_t old_sgl_len = sgl_len; + sgl_len += 50; + sgl = reallocarray(sgl, sgl_len, sizeof(struct sg_entry)); + if (!sgl) + return -99; + memset(sgl+old_sgl_len, 0, (sgl_len - old_sgl_len) * sizeof(struct sg_entry)); + } + continue; + } + + nread = fread(buf + 1, 1, 1, stdin); + if (nread == 0) + break; + fprintf(stderr, "read: %d\n", nread); + + int a = parse_hex(buf[0]), b = parse_hex(buf[1]); + if (a < 0 || b < 0) + fprintf(stderr, "Error: even number of hex digits expected\n"); + + int byte = a<<4 | b; + if (sgl[sg_i].target == 0) { + sgl[sg_i].target = malloc(sgi_size); + } else if (sgl[sg_i].size == sgi_size) { + sgi_size += 1024; + sgl[sg_i].target = realloc(sgl[sg_i].target, sgi_size); + if (!sgl[sg_i].target) + return -99; + } + sgl[sg_i].target[sgl[sg_i].size] = byte; + sgl[sg_i].size += 1; + total_bytes += 1; + } while(23); + + /* Terminate list */ + sg_i += 1; + sgl[sg_i].size = -1; + fprintf(stderr, "Got %d chunks\n", sg_i); + + /* Reserve extra bytes for long input lines. Arbitrary length support means this cobs implemenetation is no longer + * fixed overhead of 1 byte, but instead variable overhead of worst-case O(1 + n/254) for input length n. */ + size_t output_size = total_bytes + 10000; + uint8_t *output_buf = malloc(output_size); + + ssize_t rv = cobs_encode_sg(sgl, output_buf, output_size); + if (rv > 0) { + for (size_t i=0; i<rv; i++) { + printf("%02x", output_buf[i]); + } + } else { + printf("NONZERO RETURN VALUE: %d\n", rv); + } + printf("\n"); + + return 0; +} |