From 846f2a65ae0bc9ee1c7d58a6047a9eaf71f6e6c2 Mon Sep 17 00:00:00 2001 From: jaseg Date: Sat, 18 Jul 2020 14:21:31 +0200 Subject: Sprinkle some thread safety over event loop, add *wait_for_event --- mpv-test.py | 17 ++++--- mpv.py | 147 +++++++++++++++++++++++++++++++++++++----------------------- 2 files changed, 99 insertions(+), 65 deletions(-) diff --git a/mpv-test.py b/mpv-test.py index ecc4d18..5a74eb7 100755 --- a/mpv-test.py +++ b/mpv-test.py @@ -395,33 +395,32 @@ class KeyBindingTest(MpvTestCase): self.assertEqual(reg_test_fun.mpv_key_bindings, ['b', 'a']) def keypress_and_sync(key): - self.m.keypress(key) - self.m.frame_step() - self.m.wait_until_playing() + with self.m.prepare_and_wait_for_event('client_message'): + self.m.keypress(key) keypress_and_sync('a') handler1.assert_has_calls([ mock.call() ]) handler2.assert_has_calls([]) handler1.reset_mock() - keypress_and_sync('x') - keypress_and_sync('X') + self.m.keypress('x') + self.m.keypress('X') keypress_and_sync('b') handler1.assert_has_calls([ mock.call() ]) handler2.assert_has_calls([]) handler1.reset_mock() keypress_and_sync('c') - keypress_and_sync('B') + self.m.keypress('B') handler1.assert_has_calls([]) handler2.assert_has_calls([ mock.call() ]) handler2.reset_mock() reg_test_fun.unregister_mpv_key_bindings() - keypress_and_sync('a') + self.m.keypress('a') keypress_and_sync('c') - keypress_and_sync('x') - keypress_and_sync('A') + self.m.keypress('x') + self.m.keypress('A') handler1.assert_has_calls([]) handler2.assert_has_calls([ mock.call() ]) diff --git a/mpv.py b/mpv.py index aded4a9..a2950ce 100644 --- a/mpv.py +++ b/mpv.py @@ -23,6 +23,7 @@ import os import sys from warnings import warn from functools import partial, wraps +from contextlib import contextmanager import collections import re import traceback @@ -636,37 +637,6 @@ def _event_generator(handle): yield event -def _event_loop(event_handle, event_callbacks, message_handlers, property_handlers, log_handler): - for event in _event_generator(event_handle): - try: - devent = event.as_dict(decoder=lazy_decoder) # copy data from ctypes - eid = devent['event_id'] - for callback in event_callbacks: - callback(devent) - - if eid == MpvEventID.PROPERTY_CHANGE: - pc = devent['event'] - name, value, _fmt = pc['name'], pc['value'], pc['format'] - for handler in property_handlers[name]: - handler(name, value) - - if eid == MpvEventID.LOG_MESSAGE and log_handler is not None: - ev = devent['event'] - log_handler(ev['level'], ev['prefix'], ev['text']) - - if eid == MpvEventID.CLIENT_MESSAGE: - # {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16} - target, *args = devent['event']['args'] - if target in message_handlers: - message_handlers[target](*args) - - if eid == MpvEventID.SHUTDOWN: - _mpv_detach_destroy(event_handle) - return - - except Exception as e: - traceback.print_exc() - _py_to_mpv = lambda name: name.replace('_', '-') _mpv_to_py = lambda name: name.replace('-', '_') @@ -857,13 +827,13 @@ class MPV(object): self.lazy = _DecoderPropertyProxy(self, lazy_decoder) self._event_callbacks = [] + self._event_handler_lock = threading.Lock() self._property_handlers = collections.defaultdict(lambda: []) self._quit_handlers = set() self._message_handlers = {} self._key_binding_handlers = {} self._event_handle = _mpv_create_client(self.handle, b'py_event_handler') - self._loop = partial(_event_loop, self._event_handle, self._event_callbacks, - self._message_handlers, self._property_handlers, log_handler) + self._log_handler = log_handler self._stream_protocol_cbs = {} self._stream_protocol_frontends = collections.defaultdict(lambda: {}) self.register_stream_protocol('python', self._python_stream_open) @@ -881,36 +851,68 @@ class MPV(object): self._event_thread = None self._core_shutdown = False - # This is the first callback in line, so other event callback-based mechanisms can use core_shutdown - @self.event_callback('shutdown') - def shutdown_event_callback(event): - nonlocal self - self._core_shutdown = True + + def _loop(self): + for event in _event_generator(self._event_handle): + try: + devent = event.as_dict(decoder=lazy_decoder) # copy data from ctypes + eid = devent['event_id'] + + with self._event_handler_lock: + if eid == MpvEventID.SHUTDOWN: + self._core_shutdown = True + + for callback in self._event_callbacks: + callback(devent) + + if eid == MpvEventID.PROPERTY_CHANGE: + pc = devent['event'] + name, value, _fmt = pc['name'], pc['value'], pc['format'] + for handler in self._property_handlers[name]: + handler(name, value) + + if eid == MpvEventID.LOG_MESSAGE and self._log_handler is not None: + ev = devent['event'] + self._log_handler(ev['level'], ev['prefix'], ev['text']) + + if eid == MpvEventID.CLIENT_MESSAGE: + # {'event': {'args': ['key-binding', 'foo', 'u-', 'g']}, 'reply_userdata': 0, 'error': 0, 'event_id': 16} + target, *args = devent['event']['args'] + if target in self._message_handlers: + self._message_handlers[target](*args) + + if eid == MpvEventID.SHUTDOWN: + _mpv_detach_destroy(self._event_handle) + return + + except Exception as e: + print('Exception inside python-mpv event loop:', file=sys.stderr) + traceback.print_exc() @property def core_shutdown(self): return self._core_shutdown def wait_until_paused(self): - """Waits until playback of the current title is paused or done.""" + """Waits until playback of the current title is paused or done. Raises a ShutdownError if the core is shutdown while + waiting.""" self.wait_for_property('core-idle') def wait_for_playback(self): - """Waits until playback of the current title is paused or done. - - NOTE: This function changed from an event-based implementation to a property observer-based implementation in - v0.5.0. This may cause different results in certain cases. If you find one such case, for documentation please - tell the world in an issue on the github project.""" - self.wait_until_playing() - self.wait_until_paused() + """Waits until playback of the current title is finished. Raises a ShutdownError if the core is shutdown while + waiting. + """ + self.wait_for_event('end_file') def wait_until_playing(self): - """Waits until playback of the current title has started.""" + """Waits until playback of the current title has started. Raises a ShutdownError if the core is shutdown while + waiting.""" self.wait_for_property('core-idle', lambda idle: not idle) def wait_for_property(self, name, cond=lambda val: val, level_sensitive=True): """Waits until ``cond`` evaluates to a truthy value on the named property. This can be used to wait for - properties such as ``idle_active`` indicating the player is done with regular playback and just idling around + properties such as ``idle_active`` indicating the player is done with regular playback and just idling around. + Raises a ShutdownError when the core is shutdown while waiting. """ sema = threading.Semaphore(value=0) @@ -929,8 +931,38 @@ class MPV(object): if self._core_shutdown: raise ShutdownError('libmpv core has been shutdown') + shutdown_handler.unregister_mpv_events() self.unobserve_property(name, observer) + def wait_for_event(self, *event_types, cond=lambda evt: True): + with self.prepare_and_wait_for_event(*event_types, cond=cond): + pass + + @contextmanager + def prepare_and_wait_for_event(self, *event_types, cond=lambda evt: True): + """Waits for the indicated event(s). If cond is given, waits until cond(event) is true. Raises a ShutdownError + if the core is shutdown while waiting. This also happens when 'shutdown' is in event_types. + """ + sema = threading.Semaphore(value=0) + + @self.event_callback('shutdown') + def shutdown_handler(event): + sema.release() + + @self.event_callback(*event_types) + def target_handler(evt): + if cond(evt): + sema.release() + + yield + sema.acquire() + + if self._core_shutdown: + raise ShutdownError('libmpv core has been shutdown') + + shutdown_handler.unregister_mpv_events() + target_handler.unregister_mpv_events() + def __del__(self): if self.handle: self.terminate() @@ -1369,14 +1401,17 @@ class MPV(object): my_handler.unregister_mpv_events() """ def register(callback): - types = [MpvEventID.from_str(t) if isinstance(t, str) else t for t in event_types] or MpvEventID.ANY - @wraps(callback) - def wrapper(event, *args, **kwargs): - if event['event_id'] in types: - callback(event, *args, **kwargs) - self._event_callbacks.append(wrapper) - wrapper.unregister_mpv_events = partial(self.unregister_event_callback, wrapper) - return wrapper + with self._event_handler_lock: + if self._core_shutdown: + raise ShutdownError('libmpv core has been shutdown') + types = [MpvEventID.from_str(t) if isinstance(t, str) else t for t in event_types] or MpvEventID.ANY + @wraps(callback) + def wrapper(event, *args, **kwargs): + if event['event_id'] in types: + callback(event, *args, **kwargs) + self._event_callbacks.append(wrapper) + wrapper.unregister_mpv_events = partial(self.unregister_event_callback, wrapper) + return wrapper return register @staticmethod -- cgit