From f9a655e7ca39a29d71baa177bac94e7be04f6936 Mon Sep 17 00:00:00 2001 From: jaseg Date: Sun, 26 Feb 2023 19:46:59 +0100 Subject: Add cross-thread exception handling for event loop and stream callbacks --- mpv.py | 122 +++++++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 89 insertions(+), 33 deletions(-) (limited to 'mpv.py') diff --git a/mpv.py b/mpv.py index 97c73da..6918f97 100644 --- a/mpv.py +++ b/mpv.py @@ -20,6 +20,7 @@ from ctypes import * import ctypes.util import threading +import queue import os import sys from warnings import warn @@ -880,6 +881,7 @@ class MPV(object): self.register_stream_protocol('python', self._python_stream_open) self._python_streams = {} self._python_stream_catchall = None + self._exception_futures = set() self.overlay_ids = set() self.overlays = {} if loglevel is not None or log_handler is not None: @@ -891,6 +893,17 @@ class MPV(object): else: self._event_thread = None + @contextmanager + def _enqueue_exceptions(self): + try: + yield + except Exception as e: + try: + fut = next(iter(self._exception_futures)) + fut.set_exception(e) + except StopIteration: + warn(f'Unhandled exception on python-mpv event loop: {e}\n{traceback.format_exc()}', RuntimeWarning) + def _loop(self): for event in _event_generator(self._event_handle): try: @@ -901,45 +914,51 @@ class MPV(object): self._core_shutdown = True for callback in self._event_callbacks: - callback(event) + with self._enqueue_exceptions(): + callback(event) if eid == MpvEventID.PROPERTY_CHANGE: pc = event.data name, value, _fmt = pc.name, pc.value, pc.format for handler in self._property_handlers[name]: - handler(name, value) + with self._enqueue_exceptions(): + handler(name, value) if eid == MpvEventID.LOG_MESSAGE and self._log_handler is not None: ev = event.data - self._log_handler(ev.level, ev.prefix, ev.text) + with self._enqueue_exceptions(): + self._log_handler(ev.level, ev.prefix, ev.text) if eid == MpvEventID.CLIENT_MESSAGE: # {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16} target, *args = event.data.args target = target.decode("utf-8") if target in self._message_handlers: - self._message_handlers[target](*args) + with self._enqueue_exceptions(): + self._message_handlers[target](*args) if eid == MpvEventID.COMMAND_REPLY: key = event.reply_userdata callback = self._command_reply_callbacks.pop(key, None) if callback: - callback(ErrorCode.exception_for_ec(event.error), event.data) + with self._enqueue_exceptions(): + callback(ErrorCode.exception_for_ec(event.error), event.data) if eid == MpvEventID.QUEUE_OVERFLOW: # cache list, since error handlers will unregister themselves for cb in list(self._command_reply_callbacks.values()): - cb(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'), None) + with self._enqueue_exceptions(): + cb(EventOverflowError('libmpv event queue has flown over because events have not been processed fast enough'), None) if eid == MpvEventID.SHUTDOWN: _mpv_destroy(self._event_handle) for cb in list(self._command_reply_callbacks.values()): - cb(ShutdownError('libmpv core has been shutdown'), None) + with self._enqueue_exceptions(): + cb(ShutdownError('libmpv core has been shutdown'), None) return except Exception as e: - print('Exception inside python-mpv event loop:', file=sys.stderr) - traceback.print_exc() + warn(f'Unhandled {e} inside python-mpv event loop!\n{traceback.format_exc()}', RuntimeWarning) @property def core_shutdown(self): @@ -953,35 +972,35 @@ class MPV(object): if self._core_shutdown: raise ShutdownError('libmpv core has been shutdown') - def wait_until_paused(self, timeout=None): + def wait_until_paused(self, timeout=None, catch_errors=True): """Waits until playback of the current title is paused or done. Raises a ShutdownError if the core is shutdown while waiting.""" - self.wait_for_property('core-idle', timeout=timeout) + self.wait_for_property('core-idle', timeout=timeout, catch_errors=catch_errors) - def wait_for_playback(self, timeout=None): + def wait_for_playback(self, timeout=None, catch_errors=True): """Waits until playback of the current title is finished. Raises a ShutdownError if the core is shutdown while waiting. """ - self.wait_for_event('end_file', timeout=timeout) + self.wait_for_event('end_file', timeout=timeout, catch_errors=catch_errors) - def wait_until_playing(self, timeout=None): + def wait_until_playing(self, timeout=None, catch_errors=True): """Waits until playback of the current title has started. Raises a ShutdownError if the core is shutdown while waiting.""" - self.wait_for_property('core-idle', lambda idle: not idle, timeout=timeout) + self.wait_for_property('core-idle', lambda idle: not idle, timeout=timeout, catch_errors=catch_errors) - def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None): + def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None, catch_errors=True): """Waits until ``cond`` evaluates to a truthy value on the named property. This can be used to wait for properties such as ``idle_active`` indicating the player is done with regular playback and just idling around. Raises a ShutdownError when the core is shutdown while waiting. """ - with self.prepare_and_wait_for_property(name, cond, level_sensitive, timeout=timeout) as result: + with self.prepare_and_wait_for_property(name, cond, level_sensitive, timeout=timeout, catch_errors=catch_errors) as result: pass return result.result() - def wait_for_shutdown(self, timeout=None): + def wait_for_shutdown(self, timeout=None, catch_errors=True): '''Wait for core to shutdown (e.g. through quit() or terminate()).''' try: - self.wait_for_event(None, timeout=timeout) + self.wait_for_event(None, timeout=timeout, catch_errors=catch_errors) except ShutdownError: return @@ -999,7 +1018,7 @@ class MPV(object): return shutdown_handler.unregister_mpv_events @contextmanager - def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None): + def prepare_and_wait_for_property(self, name, cond=lambda val: val, level_sensitive=True, timeout=None, catch_errors=True): """Context manager that waits until ``cond`` evaluates to a truthy value on the named property. See prepare_and_wait_for_event for usage. Raises a ShutdownError when the core is shutdown while waiting. Re-raises any errors inside ``cond``. @@ -1023,6 +1042,9 @@ class MPV(object): try: result.set_running_or_notify_cancel() + if catch_errors: + self._exception_futures.add(result) + yield result rv = cond(getattr(self, name.replace('-', '_'))) @@ -1035,18 +1057,19 @@ class MPV(object): finally: err_unregister() self.unobserve_property(name, observer) + self._exception_futures.discard(result) - def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None): + def wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None, catch_errors=True): """Waits for the indicated event(s). If cond is given, waits until cond(event) is true. Raises a ShutdownError if the core is shutdown while waiting. This also happens when 'shutdown' is in event_types. Re-raises any error inside ``cond``. """ - with self.prepare_and_wait_for_event(*event_types, cond=cond, timeout=timeout) as result: + with self.prepare_and_wait_for_event(*event_types, cond=cond, timeout=timeout, catch_errors=catch_errors) as result: pass return result.result() @contextmanager - def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None): + def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True, timeout=None, catch_errors=True): """Context manager that waits for the indicated event(s) like wait_for_event after running. If cond is given, waits until cond(event) is true. Raises a ShutdownError if the core is shutdown while waiting. This also happens when 'shutdown' is in event_types. Re-raises any error inside ``cond``. @@ -1081,13 +1104,18 @@ class MPV(object): try: result.set_running_or_notify_cancel() + if catch_errors: + self._exception_futures.add(result) + yield result + self.check_core_alive() result.result(timeout) finally: err_unregister() target_handler.unregister_mpv_events() + self._exception_futures.discard(result) def __del__(self): if self.handle: @@ -1772,32 +1800,60 @@ class MPV(object): frontend = open_fn(uri.decode('utf-8')) except ValueError: return ErrorCode.LOADING_FAILED + except Exception as e: + try: + fut = next(iter(self._exception_futures)) + fut.set_exception(e) + except StopIteration: + warnings.warn(f'Unhandled exception {e} inside stream open callback for URI {uri}\n{traceback.format_exc()}') - def read_backend(_userdata, buf, bufsize): - data = frontend.read(bufsize) - for i in range(len(data)): - buf[i] = data[i] - return len(data) + return ErrorCode.LOADING_FAILED cb_info.contents.cookie = None + + def read_backend(_userdata, buf, bufsize): + with self._enqueue_exceptions(): + data = frontend.read(bufsize) + for i in range(len(data)): + buf[i] = data[i] + return len(data) read = cb_info.contents.read = StreamReadFn(read_backend) - close = cb_info.contents.close = StreamCloseFn(lambda _userdata: frontend.close()) + + def close_backend(_userdata): + with self._enqueue_exceptions(): + del self._stream_protocol_frontends[proto][uri] + if hasattr(frontend, 'close'): + frontend.close() + close = cb_info.contents.close = StreamCloseFn(close_backend) seek, size, cancel = None, None, None + if hasattr(frontend, 'seek'): - seek = cb_info.contents.seek = StreamSeekFn(lambda _userdata, offx: frontend.seek(offx)) + def seek_backend(_userdata, offx): + with self._enqueue_exceptions(): + return frontend.seek(offx) + seek = cb_info.contents.seek = StreamSeekFn(seek_backend) + if hasattr(frontend, 'size') and frontend.size is not None: - size = cb_info.contents.size = StreamSizeFn(lambda _userdata: frontend.size) + def size_backend(_userdata): + with self._enqueue_exceptions(): + return frontend.size + size = cb_info.contents.size = StreamSizeFn(size_backend) + if hasattr(frontend, 'cancel'): - cancel = cb_info.contents.cancel = StreamCancelFn(lambda _userdata: frontend.cancel()) + def cancel_backend(_userdata): + with self._enqueue_exceptions(): + frontend.cancel() + cancel = cb_info.contents.cancel = StreamCancelFn(cancel_backend) - # keep frontend and callbacks in memory forever (TODO) + # keep frontend and callbacks in memory until closed frontend._registered_callbacks = [read, close, seek, size, cancel] self._stream_protocol_frontends[proto][uri] = frontend return 0 if proto in self._stream_protocol_cbs: raise KeyError('Stream protocol already registered') + # keep backend in memory forever self._stream_protocol_cbs[proto] = [open_backend] _mpv_stream_cb_add_ro(self.handle, proto.encode('utf-8'), c_void_p(), open_backend) -- cgit