From 7343604f10d8e057d084955d4620815cc1b49f2c Mon Sep 17 00:00:00 2001 From: jaseg Date: Sun, 26 Feb 2023 20:15:27 +0100 Subject: Add tests and fix error handling for stream callbacks --- mpv.py | 28 +++++++++----- tests/test_mpv.py | 109 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 9 deletions(-) diff --git a/mpv.py b/mpv.py index 6918f97..350456c 100644 --- a/mpv.py +++ b/mpv.py @@ -898,10 +898,13 @@ class MPV(object): try: yield except Exception as e: - try: - fut = next(iter(self._exception_futures)) - fut.set_exception(e) - except StopIteration: + for fut in self._exception_futures: + try: + fut.set_exception(e) + break + except InvalidStateError: + pass + else: warn(f'Unhandled exception on python-mpv event loop: {e}\n{traceback.format_exc()}', RuntimeWarning) def _loop(self): @@ -1087,7 +1090,6 @@ class MPV(object): @self.event_callback(*event_types) def target_handler(evt): - try: rv = cond(evt) if rv: @@ -1801,12 +1803,17 @@ class MPV(object): except ValueError: return ErrorCode.LOADING_FAILED except Exception as e: - try: - fut = next(iter(self._exception_futures)) - fut.set_exception(e) - except StopIteration: + for fut in self._exception_futures: + try: + fut.set_exception(e) + break + except InvalidStateError: + pass + else: warnings.warn(f'Unhandled exception {e} inside stream open callback for URI {uri}\n{traceback.format_exc()}') + + return ErrorCode.LOADING_FAILED cb_info.contents.cookie = None @@ -1817,6 +1824,7 @@ class MPV(object): for i in range(len(data)): buf[i] = data[i] return len(data) + return -1 read = cb_info.contents.read = StreamReadFn(read_backend) def close_backend(_userdata): @@ -1832,12 +1840,14 @@ class MPV(object): def seek_backend(_userdata, offx): with self._enqueue_exceptions(): return frontend.seek(offx) + return ErrorCode.GENERIC seek = cb_info.contents.seek = StreamSeekFn(seek_backend) if hasattr(frontend, 'size') and frontend.size is not None: def size_backend(_userdata): with self._enqueue_exceptions(): return frontend.size + return 0 size = cb_info.contents.size = StreamSizeFn(size_backend) if hasattr(frontend, 'cancel'): diff --git a/tests/test_mpv.py b/tests/test_mpv.py index 38b0222..5f1f979 100755 --- a/tests/test_mpv.py +++ b/tests/test_mpv.py @@ -535,6 +535,115 @@ class TestStreams(unittest.TestCase): m.terminate() disp.stop() + def test_stream_open_exception(self): + disp = Xvfb() + disp.start() + m = mpv.MPV(vo=testvo, video=False) + + @m.register_stream_protocol('raiseerror') + def open_fn(uri): + raise SystemError() + + waiting = threading.Semaphore() + result = Future() + def run(): + result.set_running_or_notify_cancel() + try: + waiting.release() + m.wait_for_playback() + result.set_result(False) + except SystemError: + result.set_result(True) + except Exception: + result.set_result(False) + + t = threading.Thread(target=run, daemon=True) + t.start() + + with waiting: + time.sleep(0.2) + m.play('raiseerror://foo') + + m.wait_for_playback(catch_errors=False) + try: + assert result.result() + finally: + m.terminate() + disp.stop() + + def test_python_stream_exception(self): + disp = Xvfb() + disp.start() + m = mpv.MPV(vo=testvo) + + @m.python_stream('foo') + def foo_gen(): + with open(TESTVID, 'rb') as f: + yield f.read(100) + raise SystemError() + + waiting = threading.Semaphore() + result = Future() + def run(): + result.set_running_or_notify_cancel() + try: + waiting.release() + m.wait_for_playback() + result.set_result(False) + except SystemError: + result.set_result(True) + except Exception: + result.set_result(False) + + t = threading.Thread(target=run, daemon=True) + t.start() + + with waiting: + time.sleep(0.2) + m.play('python://foo') + + m.wait_for_playback(catch_errors=False) + try: + assert result.result() + finally: + m.terminate() + disp.stop() + + def test_stream_open_forward(self): + disp = Xvfb() + disp.start() + m = mpv.MPV(vo=testvo, video=False) + + @m.register_stream_protocol('raiseerror') + def open_fn(uri): + raise ValueError() + + waiting = threading.Semaphore() + result = Future() + def run(): + result.set_running_or_notify_cancel() + try: + waiting.release() + m.wait_for_playback() + result.set_result(True) + except Exception: + result.set_result(False) + + t = threading.Thread(target=run, daemon=True) + t.start() + + with waiting: + time.sleep(0.2) + m.play('raiseerror://foo') + + m.wait_for_playback(catch_errors=False) + try: + assert result.result() + finally: + m.terminate() + disp.stop() + + class TestLifecycle(unittest.TestCase): def test_create_destroy(self): -- cgit