summaryrefslogtreecommitdiff
path: root/prototype/fw/test/microcobs.py
diff options
context:
space:
mode:
Diffstat (limited to 'prototype/fw/test/microcobs.py')
-rw-r--r--prototype/fw/test/microcobs.py145
1 files changed, 145 insertions, 0 deletions
diff --git a/prototype/fw/test/microcobs.py b/prototype/fw/test/microcobs.py
new file mode 100644
index 0000000..1d09efd
--- /dev/null
+++ b/prototype/fw/test/microcobs.py
@@ -0,0 +1,145 @@
+
+import itertools
+import unittest
+import os
+import subprocess
+import binascii
+import random
+import tempfile
+
+from cobs import cobs
+
+INTERESTING = bytes([1, 2, 3, 0, 1, 2, 3, 0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 3] + list(range(256)))
+class MicrocobsScatterGatherTest(unittest.TestCase):
+ def do_test(self, data_sglist):
+ data_sglist = list(data_sglist)
+ ref = cobs.encode(b''.join(data_sglist))
+ input = b'\n'.join(binascii.hexlify(x) for x in data_sglist)
+
+ debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False)
+ debug_file.write(input)
+
+ try:
+ test = subprocess.check_output(os.getenv('MICROCOBS_SG_TEST_BINARY', 'build/microcobs_test_sg'),
+ input=input, stderr=subprocess.DEVNULL)
+ test = binascii.unhexlify(test.strip())
+
+ self.assertEqual(test[-1], 0, 'Missing terminating null byte')
+ self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}')
+
+ debug_file.close()
+ os.remove(debug_file.name)
+ except Exception as e:
+ raise SystemError(f'Test error for input {debug_file.name}') from e
+
+
+ def test_empty_chunks(self):
+ for i in range(256):
+ self.do_test([b''] * i)
+
+
+ def test_interspersed_empty_chunks(self):
+ testdata = INTERESTING
+ for i in range(len(testdata) + 1):
+ self.do_test([testdata[:i], b'', testdata[i:]])
+
+ def test_interspersed_double_empty_chunks(self):
+ testdata = INTERESTING
+ for i in range(len(testdata) + 1):
+ self.do_test([testdata[:i], b'', b'', testdata[i:]])
+
+ def test_one_byte_chunks(self):
+ testdata = INTERESTING
+ self.do_test(itertools.chain.from_iterable(zip([bytes([x]) for x in testdata], [b''] * len(testdata))))
+
+ def test_one_byte(self):
+ for i in range(256):
+ self.do_test([bytes([i])])
+
+ def test_lengths(self):
+ for i in range(260):
+ self.do_test([bytes([0xff] * i)])
+
+ def test_null_then_lengths(self):
+ for i in range(256):
+ self.do_test([bytes([0] + [0xff] * i)])
+
+ def test_lengths_then_null(self):
+ for i in range(256):
+ self.do_test([bytes([0xff] * i + [0])])
+
+ def test_two_byte(self):
+ for i in range(4):
+ for j in range(4):
+ self.do_test([bytes([i, j])])
+ self.do_test([bytes([i]), bytes([j])])
+
+ def test_long(self):
+ for i in range(5):
+ self.do_test([b'A' * (100 + 256*i)])
+ self.do_test([b'A' * 100] * 256*i)
+
+ def test_random(self):
+ for i in range(1000):
+ testdata = os.urandom(random.randint(0, 600))
+
+ chunks = [testdata]
+ for i in range(random.randint(0, len(testdata) // 5)):
+ idx = random.randint(0, len(chunks)-1)
+ chunk = chunks[idx]
+ bidx = random.randint(0, len(chunk))
+ chunks = chunks[:idx] + [chunk[:bidx], chunk[bidx:]] + chunks[idx+1:]
+
+ self.do_test(chunks)
+
+class MicrocobsTest(unittest.TestCase):
+ def do_test(self, data):
+ ref = cobs.encode(data)
+ input = binascii.hexlify(data)
+
+ debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False)
+ debug_file.write(input)
+
+ try:
+ test = subprocess.check_output(os.getenv('MICROCOBS_TEST_BINARY', 'build/microcobs_test'),
+ input=input, stderr=subprocess.DEVNULL)
+ test = binascii.unhexlify(test.strip())
+
+ self.assertEqual(test[-1], 0, 'Missing terminating null byte')
+ self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}')
+
+ debug_file.close()
+ os.remove(debug_file.name)
+ except Exception as e:
+ raise SystemError(f'Test error for input {debug_file.name}') from e
+
+
+ def test_one_byte(self):
+ for i in range(256):
+ self.do_test(bytes([i]))
+
+ def test_lengths(self):
+ for i in range(260):
+ self.do_test(bytes([0xff] * i))
+
+ def test_null_then_lengths(self):
+ for i in range(256):
+ self.do_test(bytes([0] + [0xff] * i))
+
+ def test_lengths_then_null(self):
+ for i in range(256):
+ self.do_test(bytes([0xff] * i + [0]))
+
+ def test_two_byte(self):
+ for i in range(4):
+ for j in range(4):
+ self.do_test(bytes([i, j]))
+
+ def test_long(self):
+ for i in range(5):
+ self.do_test(b'A' * (100 + 256*i))
+
+ def test_random(self):
+ for i in range(10000):
+ self.do_test(os.urandom(random.randint(0, 600)))
+