diff options
Diffstat (limited to 'prototype/fw/test/microcobs.py')
-rw-r--r-- | prototype/fw/test/microcobs.py | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/prototype/fw/test/microcobs.py b/prototype/fw/test/microcobs.py new file mode 100644 index 0000000..1d09efd --- /dev/null +++ b/prototype/fw/test/microcobs.py @@ -0,0 +1,145 @@ + +import itertools +import unittest +import os +import subprocess +import binascii +import random +import tempfile + +from cobs import cobs + +INTERESTING = bytes([1, 2, 3, 0, 1, 2, 3, 0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 3] + list(range(256))) +class MicrocobsScatterGatherTest(unittest.TestCase): + def do_test(self, data_sglist): + data_sglist = list(data_sglist) + ref = cobs.encode(b''.join(data_sglist)) + input = b'\n'.join(binascii.hexlify(x) for x in data_sglist) + + debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False) + debug_file.write(input) + + try: + test = subprocess.check_output(os.getenv('MICROCOBS_SG_TEST_BINARY', 'build/microcobs_test_sg'), + input=input, stderr=subprocess.DEVNULL) + test = binascii.unhexlify(test.strip()) + + self.assertEqual(test[-1], 0, 'Missing terminating null byte') + self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}') + + debug_file.close() + os.remove(debug_file.name) + except Exception as e: + raise SystemError(f'Test error for input {debug_file.name}') from e + + + def test_empty_chunks(self): + for i in range(256): + self.do_test([b''] * i) + + + def test_interspersed_empty_chunks(self): + testdata = INTERESTING + for i in range(len(testdata) + 1): + self.do_test([testdata[:i], b'', testdata[i:]]) + + def test_interspersed_double_empty_chunks(self): + testdata = INTERESTING + for i in range(len(testdata) + 1): + self.do_test([testdata[:i], b'', b'', testdata[i:]]) + + def test_one_byte_chunks(self): + testdata = INTERESTING + self.do_test(itertools.chain.from_iterable(zip([bytes([x]) for x in testdata], [b''] * len(testdata)))) + + def test_one_byte(self): + for i in range(256): + self.do_test([bytes([i])]) + + def test_lengths(self): + for i in range(260): + self.do_test([bytes([0xff] * i)]) + + def test_null_then_lengths(self): + for i in range(256): + self.do_test([bytes([0] + [0xff] * i)]) + + def test_lengths_then_null(self): + for i in range(256): + self.do_test([bytes([0xff] * i + [0])]) + + def test_two_byte(self): + for i in range(4): + for j in range(4): + self.do_test([bytes([i, j])]) + self.do_test([bytes([i]), bytes([j])]) + + def test_long(self): + for i in range(5): + self.do_test([b'A' * (100 + 256*i)]) + self.do_test([b'A' * 100] * 256*i) + + def test_random(self): + for i in range(1000): + testdata = os.urandom(random.randint(0, 600)) + + chunks = [testdata] + for i in range(random.randint(0, len(testdata) // 5)): + idx = random.randint(0, len(chunks)-1) + chunk = chunks[idx] + bidx = random.randint(0, len(chunk)) + chunks = chunks[:idx] + [chunk[:bidx], chunk[bidx:]] + chunks[idx+1:] + + self.do_test(chunks) + +class MicrocobsTest(unittest.TestCase): + def do_test(self, data): + ref = cobs.encode(data) + input = binascii.hexlify(data) + + debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False) + debug_file.write(input) + + try: + test = subprocess.check_output(os.getenv('MICROCOBS_TEST_BINARY', 'build/microcobs_test'), + input=input, stderr=subprocess.DEVNULL) + test = binascii.unhexlify(test.strip()) + + self.assertEqual(test[-1], 0, 'Missing terminating null byte') + self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}') + + debug_file.close() + os.remove(debug_file.name) + except Exception as e: + raise SystemError(f'Test error for input {debug_file.name}') from e + + + def test_one_byte(self): + for i in range(256): + self.do_test(bytes([i])) + + def test_lengths(self): + for i in range(260): + self.do_test(bytes([0xff] * i)) + + def test_null_then_lengths(self): + for i in range(256): + self.do_test(bytes([0] + [0xff] * i)) + + def test_lengths_then_null(self): + for i in range(256): + self.do_test(bytes([0xff] * i + [0])) + + def test_two_byte(self): + for i in range(4): + for j in range(4): + self.do_test(bytes([i, j])) + + def test_long(self): + for i in range(5): + self.do_test(b'A' * (100 + 256*i)) + + def test_random(self): + for i in range(10000): + self.do_test(os.urandom(random.randint(0, 600))) + |