import itertools import unittest import os import subprocess import binascii import random import tempfile from cobs import cobs INTERESTING = bytes([1, 2, 3, 0, 1, 2, 3, 0, 0, 1, 2, 3, 0, 0, 0, 1, 2, 3] + list(range(256))) class MicrocobsScatterGatherTest(unittest.TestCase): def do_test(self, data_sglist): data_sglist = list(data_sglist) ref = cobs.encode(b''.join(data_sglist)) input = b'\n'.join(binascii.hexlify(x) for x in data_sglist) debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False) debug_file.write(input) try: test = subprocess.check_output(os.getenv('MICROCOBS_SG_TEST_BINARY', 'build/microcobs_test_sg'), input=input, stderr=subprocess.DEVNULL) test = binascii.unhexlify(test.strip()) self.assertEqual(test[-1], 0, 'Missing terminating null byte') self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}') debug_file.close() os.remove(debug_file.name) except Exception as e: raise SystemError(f'Test error for input {debug_file.name}') from e def test_empty_chunks(self): for i in range(256): self.do_test([b''] * i) def test_interspersed_empty_chunks(self): testdata = INTERESTING for i in range(len(testdata) + 1): self.do_test([testdata[:i], b'', testdata[i:]]) def test_interspersed_double_empty_chunks(self): testdata = INTERESTING for i in range(len(testdata) + 1): self.do_test([testdata[:i], b'', b'', testdata[i:]]) def test_one_byte_chunks(self): testdata = INTERESTING self.do_test(itertools.chain.from_iterable(zip([bytes([x]) for x in testdata], [b''] * len(testdata)))) def test_one_byte(self): for i in range(256): self.do_test([bytes([i])]) def test_lengths(self): for i in range(260): self.do_test([bytes([0xff] * i)]) def test_null_then_lengths(self): for i in range(256): self.do_test([bytes([0] + [0xff] * i)]) def test_lengths_then_null(self): for i in range(256): self.do_test([bytes([0xff] * i + [0])]) def test_two_byte(self): for i in range(4): for j in range(4): self.do_test([bytes([i, j])]) self.do_test([bytes([i]), bytes([j])]) def test_long(self): for i in range(5): self.do_test([b'A' * (100 + 256*i)]) self.do_test([b'A' * 100] * 256*i) def test_random(self): for i in range(1000): testdata = os.urandom(random.randint(0, 600)) chunks = [testdata] for i in range(random.randint(0, len(testdata) // 5)): idx = random.randint(0, len(chunks)-1) chunk = chunks[idx] bidx = random.randint(0, len(chunk)) chunks = chunks[:idx] + [chunk[:bidx], chunk[bidx:]] + chunks[idx+1:] self.do_test(chunks) class MicrocobsTest(unittest.TestCase): def do_test(self, data): ref = cobs.encode(data) input = binascii.hexlify(data) debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False) debug_file.write(input) try: test = subprocess.check_output(os.getenv('MICROCOBS_TEST_BINARY', 'build/microcobs_test'), input=input, stderr=subprocess.DEVNULL) test = binascii.unhexlify(test.strip()) self.assertEqual(test[-1], 0, 'Missing terminating null byte') self.assertEqual(ref, test[:-1], f'Mismatched output for input {debug_file.name}') debug_file.close() os.remove(debug_file.name) except Exception as e: raise SystemError(f'Test error for input {debug_file.name}') from e def test_one_byte(self): for i in range(256): self.do_test(bytes([i])) def test_lengths(self): for i in range(260): self.do_test(bytes([0xff] * i)) def test_null_then_lengths(self): for i in range(256): self.do_test(bytes([0] + [0xff] * i)) def test_lengths_then_null(self): for i in range(256): self.do_test(bytes([0xff] * i + [0])) def test_two_byte(self): for i in range(4): for j in range(4): self.do_test(bytes([i, j])) def test_long(self): for i in range(5): self.do_test(b'A' * (100 + 256*i)) def test_random(self): for i in range(10000): self.do_test(os.urandom(random.randint(0, 600))) class MicrocobsDecodeTest(unittest.TestCase): def do_test(self, data): enc = cobs.encode(data) input = binascii.hexlify(enc) debug_file = tempfile.NamedTemporaryFile(prefix='microcobs_test_', delete=False) debug_file.write(input) try: test = subprocess.check_output(os.getenv('MICROCOBS_DECODE_TEST_BINARY', 'build/microcobs_decode_test'), input=input, stderr=subprocess.DEVNULL) test = binascii.unhexlify(test.strip()) self.assertEqual(data, test, f'Mismatched output for input {debug_file.name}') debug_file.close() os.remove(debug_file.name) except Exception as e: raise SystemError(f'Test error for input {debug_file.name}') from e def test_one_byte(self): for i in range(256): self.do_test(bytes([i])) def test_lengths(self): for i in range(260): self.do_test(bytes([0xff] * i)) def test_null_then_lengths(self): for i in range(256): self.do_test(bytes([0] + [0xff] * i)) def test_lengths_then_null(self): for i in range(256): self.do_test(bytes([0xff] * i + [0])) def test_two_byte(self): for i in range(4): for j in range(4): self.do_test(bytes([i, j])) def test_long(self): for i in range(5): self.do_test(b'A' * (100 + 256*i)) def test_random(self): for i in range(10000): self.do_test(os.urandom(random.randint(0, 600)))