diff options
Diffstat (limited to 'tests/test_mpv.py')
-rwxr-xr-x | tests/test_mpv.py | 752 |
1 files changed, 752 insertions, 0 deletions
diff --git a/tests/test_mpv.py b/tests/test_mpv.py new file mode 100755 index 0000000..cb6297c --- /dev/null +++ b/tests/test_mpv.py @@ -0,0 +1,752 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# vim: ts=4 sw=4 et + +import unittest +from unittest import mock +import math +import threading +from contextlib import contextmanager +from functools import wraps +import gc +import os.path +import os +import sys +import time +import io +import platform +import ctypes + +import mpv + +from xvfbwrapper import Xvfb + + +# stdout magic to suppress useless libmpv warning messages in unittest output +# See https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/ +@contextmanager +def devnull_libmpv(): + """ Redirect libmpv stdout into /dev/null while still allowing python to print to stdout as usual """ + if platform.system() != 'Linux': + # This is only a linux-specific convenience function. + yield + return + + libc = ctypes.CDLL("libc.so.6") + + stderr_fd, stdout_fd = sys.stderr.fileno(), sys.stdout.fileno() + sys.stderr.flush() + sys.stdout.flush() + libc.fflush(None) + + # Preserve a copy so python can continue printing + stderr_copy, stdout_copy = os.dup(stderr_fd), os.dup(stdout_fd) + sys.stderr = io.TextIOWrapper(open(stderr_copy, 'wb', closefd=False), write_through=True) + sys.stdout = io.TextIOWrapper(open(stdout_copy, 'wb', closefd=False)) + with open(os.devnull, 'w') as devnull: + os.dup2(devnull.fileno(), stderr_fd) # closes old stderr + os.dup2(devnull.fileno(), stdout_fd) # closes old stdout + + yield + + sys.stderr.flush() + sys.stdout.flush() + libc.fflush(None) + os.dup2(stderr_copy, stderr_fd) + os.dup2(stdout_copy, stdout_fd) + os.close(stderr_copy) + os.close(stdout_copy) + sys.stderr = io.TextIOWrapper(open(stderr_fd, 'wb', closefd=False), write_through=True) + sys.stdout = io.TextIOWrapper(open(stdout_fd, 'wb', closefd=False)) + +TESTVID = os.path.join(os.path.dirname(__file__), 'test.webm') +MPV_ERRORS = [ l(ec) for ec, l in mpv.ErrorCode.EXCEPTION_DICT.items() if l ] + +class MpvTestCase(unittest.TestCase): + def setUp(self): + self.disp = Xvfb() + self.disp.start() + self.m = mpv.MPV(vo='x11') + + def tearDown(self): + self.m.terminate() + self.disp.stop() + +class TestProperties(MpvTestCase): + @contextmanager + def swallow_mpv_errors(self, exception_exceptions=[]): + try: + yield + except Exception as e: + if any(e.args[:2] == ex.args for ex in MPV_ERRORS): + if e.args[1] not in exception_exceptions: + raise + else: + raise + + @devnull_libmpv() + def test_read(self): + self.m.loop = 'inf' + self.m.play(TESTVID) + while self.m.core_idle: + time.sleep(0.05) + for name in sorted(self.m.property_list): + name = name.replace('-', '_') + with self.subTest(property_name=name), self.swallow_mpv_errors([ + mpv.ErrorCode.PROPERTY_UNAVAILABLE, + mpv.ErrorCode.PROPERTY_ERROR, + mpv.ErrorCode.PROPERTY_NOT_FOUND]): + getattr(self.m, name) + + @devnull_libmpv() + def test_write(self): + self.m.loop = 'inf' + self.m.play(TESTVID) + while self.m.core_idle: + time.sleep(0.05) + check_canaries = lambda: os.path.exists('100') or os.path.exists('foo') + for name in sorted(self.m.property_list): + # See issue #108 and upstream mpv issues #7919 and #7920. + if name in ('demuxer', 'audio-demuxer', 'audio-files'): + continue + # These may cause files to be created + if name in ('external-file', 'heartbeat-cmd', 'wid', 'dump-stats', 'log-file') or name.startswith('input-'): + continue + name = name.replace('-', '_') + old_canaries = check_canaries() + with self.subTest(property_name=name), self.swallow_mpv_errors([ + mpv.ErrorCode.PROPERTY_UNAVAILABLE, + mpv.ErrorCode.PROPERTY_ERROR, + mpv.ErrorCode.PROPERTY_FORMAT, + mpv.ErrorCode.PROPERTY_NOT_FOUND]): # This is due to a bug with option-mapped properties in mpv 0.18.1 + setattr(self.m, name, 100) + setattr(self.m, name, 1) + setattr(self.m, name, 0) + setattr(self.m, name, -1) + setattr(self.m, name, 1) + setattr(self.m, name, 1.0) + setattr(self.m, name, 0.0) + setattr(self.m, name, -1.0) + setattr(self.m, name, float('nan')) + setattr(self.m, name, 'foo') + setattr(self.m, name, '') + setattr(self.m, name, 'bazbazbaz'*1000) + setattr(self.m, name, b'foo') + setattr(self.m, name, b'') + setattr(self.m, name, b'bazbazbaz'*1000) + setattr(self.m, name, True) + setattr(self.m, name, False) + if not old_canaries and check_canaries(): + raise UserWarning('Property test for {} produced files on file system, might not be safe.'.format(name)) + + def test_property_bounce(self): + self.m.aid = False + self.assertEqual(self.m.audio, False) + self.m.aid = 'auto' + self.assertEqual(self.m.audio, 'auto') + self.m.aid = 'no' + self.assertEqual(self.m.audio, False) + self.m.audio = 'auto' + self.assertEqual(self.m.aid, 'auto') + self.m.audio = False + self.assertEqual(self.m.aid, False) + self.m.audio = 'auto' + self.assertEqual(self.m.aid, 'auto') + self.m.audio = 'no' + self.assertEqual(self.m.aid, False) + + def test_array_property_bounce(self): + self.m.alang = 'en' + self.assertEqual(self.m.alang, ['en']) + self.m.alang = 'de' + self.assertEqual(self.m.alang, ['de']) + self.m.alang = ['de', 'en'] + self.assertEqual(self.m.alang, ['de', 'en']) + self.m.alang = 'de,en' + self.assertEqual(self.m.alang, ['de', 'en']) + self.m.alang = ['de,en'] + self.assertEqual(self.m.alang, ['de,en']) + + def test_osd_property_bounce(self): + self.m.alang = ['en'] + self.assertEqual(self.m.osd.alang, 'en') + self.m.alang = ['de'] + self.assertEqual(self.m.osd.alang, 'de') + self.m.alang = ['en', 'de'] + self.assertEqual(self.m.osd.alang, 'en,de') + + def test_raw_property_bounce(self): + self.m.alang = 'en' + self.assertEqual(self.m.raw.alang, [b'en']) + self.m.alang = 'de' + self.assertEqual(self.m.raw.alang, [b'de']) + self.m.alang = ['de', 'en'] + self.assertEqual(self.m.raw.alang, [b'de', b'en']) + self.m.alang = 'de,en' + self.assertEqual(self.m.raw.alang, [b'de', b'en']) + self.m.alang = ['de,en'] + self.assertEqual(self.m.raw.alang, [b'de,en']) + + def test_property_decoding_invalid_utf8(self): + invalid_utf8 = b'foo\xc3\x28bar' + self.m.alang = invalid_utf8 + self.assertEqual(self.m.raw.alang, [invalid_utf8]) + with self.assertRaises(UnicodeDecodeError): + self.m.strict.alang + with self.assertRaises(UnicodeDecodeError): + # alang is considered safe and pasted straight into the OSD string. But OSD strings should always be valid + # UTF-8. This test may be removed in case OSD encoding sanitization is handled differently in the future. + self.m.osd.alang + + def test_property_decoding_valid_utf8(self): + valid_utf8 = 'pröpérty' + self.m.alang = valid_utf8 + self.assertEqual(self.m.alang, [valid_utf8]) + self.assertEqual(self.m.raw.alang, [valid_utf8.encode('utf-8')]) + self.assertEqual(self.m.osd.alang, valid_utf8) + self.assertEqual(self.m.strict.alang, [valid_utf8]) + + def test_property_decoding_multi(self): + valid_utf8 = 'pröpérty' + invalid_utf8 = b'foo\xc3\x28bar' + self.m.alang = [valid_utf8, 'foo', invalid_utf8] + self.assertEqual(self.m.alang, [valid_utf8, 'foo', invalid_utf8]) + self.assertEqual(self.m.raw.alang, [valid_utf8.encode('utf-8'), b'foo', invalid_utf8]) + with self.assertRaises(UnicodeDecodeError): + self.m.strict.alang + with self.assertRaises(UnicodeDecodeError): + # See comment in test_property_decoding_invalid_utf8 + self.m.osd.alang + + @devnull_libmpv() + def test_option_read(self): + self.m.loop = 'inf' + self.m.play(TESTVID) + while self.m.core_idle: + time.sleep(0.05) + for name in sorted(self.m): + with self.subTest(option_name=name), self.swallow_mpv_errors([ + mpv.ErrorCode.PROPERTY_UNAVAILABLE, mpv.ErrorCode.PROPERTY_NOT_FOUND, mpv.ErrorCode.PROPERTY_ERROR]): + self.m[name] + + def test_multivalued_option(self): + self.m['external-files'] = ['test.webm', b'test.webm'] + self.assertEqual(self.m['external-files'], ['test.webm', 'test.webm']) + + +class ObservePropertyTest(MpvTestCase): + @devnull_libmpv() + def test_observe_property(self): + handler = mock.Mock() + + m = self.m + m.observe_property('vid', handler) + + time.sleep(0.1) + m.play(TESTVID) + + time.sleep(0.5) #couple frames + m.unobserve_property('vid', handler) + + time.sleep(0.1) #couple frames + m.terminate() # needed for synchronization of event thread + handler.assert_has_calls([mock.call('vid', 'auto')]) + + @devnull_libmpv() + def test_property_observer_decorator(self): + handler = mock.Mock() + + m = self.m + m.play(TESTVID) + + m.slang = 'ru' + m.mute = True + + @m.property_observer('mute') + @m.property_observer('slang') + def foo(*args, **kwargs): + handler(*args, **kwargs) + + m.mute = False + m.slang = 'jp' + self.assertEqual(m.mute, False) + self.assertEqual(m.slang, ['jp']) + + # Wait for tick. AFAICT property events are only generated at regular + # intervals, and if we change a property too fast we don't get any + # events. This is a limitation of the upstream API. + time.sleep(0.1) + # Another API limitation is that the order of property change events on + # different properties does not necessarily exactly match the order in + # which these properties were previously accessed. Thus, any_order. + handler.assert_has_calls([ + mock.call('mute', False), + mock.call('slang', ['jp'])], + any_order=True) + handler.reset_mock() + + m.mute = True + m.slang = 'ru' + self.assertEqual(m.mute, True) + self.assertEqual(m.slang, ['ru']) + + time.sleep(0.05) + foo.unobserve_mpv_properties() + + m.mute = False + m.slang = 'jp' + m.mute = True + m.slang = 'ru' + m.terminate() # needed for synchronization of event thread + handler.assert_has_calls([ + mock.call('mute', True), + mock.call('slang', ['ru'])], + any_order=True) + +class KeyBindingTest(MpvTestCase): + def test_register_direct_cmd(self): + self.m.register_key_binding('a', 'playlist-clear') + self.assertEqual(self.m._key_binding_handlers, {}) + self.m.register_key_binding('Ctrl+Shift+a', 'playlist-clear') + self.m.unregister_key_binding('a') + self.m.unregister_key_binding('Ctrl+Shift+a') + + def test_register_direct_fun(self): + b = mpv.MPV._binding_name + + def reg_test_fun(state, name, char): + pass + + self.m.register_key_binding('a', reg_test_fun) + self.assertIn(b('a'), self.m._key_binding_handlers) + self.assertEqual(self.m._key_binding_handlers[b('a')], reg_test_fun) + + self.m.unregister_key_binding('a') + self.assertNotIn(b('a'), self.m._key_binding_handlers) + + def test_register_direct_bound_method(self): + b = mpv.MPV._binding_name + + class RegTestCls: + def method(self, state, name, char): + pass + instance = RegTestCls() + + self.m.register_key_binding('a', instance.method) + self.assertIn(b('a'), self.m._key_binding_handlers) + self.assertEqual(self.m._key_binding_handlers[b('a')], instance.method) + + self.m.unregister_key_binding('a') + self.assertNotIn(b('a'), self.m._key_binding_handlers) + + def test_register_decorator_fun(self): + b = mpv.MPV._binding_name + + @self.m.key_binding('a') + def reg_test_fun(state, name, char): + pass + self.assertEqual(reg_test_fun.mpv_key_bindings, ['a']) + self.assertIn(b('a'), self.m._key_binding_handlers) + self.assertEqual(self.m._key_binding_handlers[b('a')], reg_test_fun) + + reg_test_fun.unregister_mpv_key_bindings() + self.assertNotIn(b('a'), self.m._key_binding_handlers) + + def test_register_decorator_fun_chaining(self): + b = mpv.MPV._binding_name + + @self.m.key_binding('a') + @self.m.key_binding('b') + def reg_test_fun(state, name, char): + pass + + @self.m.key_binding('c') + def reg_test_fun_2_stay_intact(state, name, char): + pass + + self.assertEqual(reg_test_fun.mpv_key_bindings, ['b', 'a']) + self.assertIn(b('a'), self.m._key_binding_handlers) + self.assertIn(b('b'), self.m._key_binding_handlers) + self.assertIn(b('c'), self.m._key_binding_handlers) + self.assertEqual(self.m._key_binding_handlers[b('a')], reg_test_fun) + self.assertEqual(self.m._key_binding_handlers[b('b')], reg_test_fun) + + reg_test_fun.unregister_mpv_key_bindings() + self.assertNotIn(b('a'), self.m._key_binding_handlers) + self.assertNotIn(b('b'), self.m._key_binding_handlers) + self.assertIn(b('c'), self.m._key_binding_handlers) + + def test_register_simple_decorator_fun_chaining(self): + self.m.loop = 'inf' + self.m.play(TESTVID) + self.m.wait_until_playing() + + handler1, handler2 = mock.Mock(), mock.Mock() + + @self.m.on_key_press('a') + @self.m.on_key_press('b') + def reg_test_fun(*args, **kwargs): + handler1(*args, **kwargs) + + @self.m.on_key_press('c') + def reg_test_fun_2_stay_intact(*args, **kwargs): + handler2(*args, **kwargs) + + self.assertEqual(reg_test_fun.mpv_key_bindings, ['b', 'a']) + + def keypress_and_sync(key): + with self.m.prepare_and_wait_for_event('client_message'): + self.m.keypress(key) + + keypress_and_sync('a') + handler1.assert_has_calls([ mock.call() ]) + handler2.assert_has_calls([]) + handler1.reset_mock() + + self.m.keypress('x') + self.m.keypress('X') + keypress_and_sync('b') + handler1.assert_has_calls([ mock.call() ]) + handler2.assert_has_calls([]) + handler1.reset_mock() + + keypress_and_sync('c') + self.m.keypress('B') + handler1.assert_has_calls([]) + handler2.assert_has_calls([ mock.call() ]) + handler2.reset_mock() + + reg_test_fun.unregister_mpv_key_bindings() + self.m.keypress('a') + keypress_and_sync('c') + self.m.keypress('x') + self.m.keypress('A') + handler1.assert_has_calls([]) + handler2.assert_has_calls([ mock.call() ]) + +class TestStreams(unittest.TestCase): + @devnull_libmpv() + def test_python_stream(self): + handler = mock.Mock() + + disp = Xvfb() + disp.start() + m = mpv.MPV() + m.register_event_callback(handler) + + @m.python_stream('foo') + def foo_gen(): + with open(TESTVID, 'rb') as f: + yield f.read() + + @m.python_stream('bar') + def bar_gen(): + yield b'' + + m.play('python://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.EOF, 'error': mpv.ErrorCode.SUCCESS}}) + handler.reset_mock() + + m.play('python://bar') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + handler.reset_mock() + + m.play('python://baz') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('foo://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + foo_gen.unregister() + + m.play('python://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('python://bar') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + handler.reset_mock() + + m.terminate() + disp.stop() + + def test_custom_stream(self): + handler = mock.Mock() + fail_mock = mock.Mock(side_effect=ValueError) + stream_mock = mock.Mock() + stream_mock.seek = mock.Mock(return_value=0) + stream_mock.read = mock.Mock(return_value=b'') + + disp = Xvfb() + disp.start() + m = mpv.MPV(video=False) + m.register_event_callback(handler) + + m.register_stream_protocol('pythonfail', fail_mock) + + @m.register_stream_protocol('pythonsuccess') + def open_fn(uri): + self.assertEqual(uri, 'pythonsuccess://foo') + return stream_mock + + m.play('pythondoesnotexist://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('pythonfail://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('pythonsuccess://foo') + m.wait_for_playback() + stream_mock.seek.assert_any_call(0) + stream_mock.read.assert_called() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + + m.terminate() + disp.stop() + +class TestLifecycle(unittest.TestCase): + def test_create_destroy(self): + thread_names = lambda: [ t.name for t in threading.enumerate() ] + self.assertNotIn('MPVEventHandlerThread', thread_names()) + m = mpv.MPV() + self.assertIn('MPVEventHandlerThread', thread_names()) + m.terminate() + self.assertNotIn('MPVEventHandlerThread', thread_names()) + + def test_flags(self): + with self.assertRaises(AttributeError): + mpv.MPV('this-option-does-not-exist') + m = mpv.MPV('cursor-autohide-fs-only', 'fs', video=False) + self.assertTrue(m.fullscreen) + self.assertEqual(m.cursor_autohide, 1000) + m.terminate() + + def test_options(self): + with self.assertRaises(AttributeError): + mpv.MPV(this_option_does_not_exists=23) + m = mpv.MPV(osd_level=0, loop='inf', deinterlace=False) + self.assertEqual(m.osd_level, 0) + # For compatibility with mpv master (v0.32.0-585-gfba1c681b8) accept both + self.assertIn(m.loop, ['inf', True]) + self.assertEqual(m.deinterlace, False) + m.terminate() + + def test_event_callback(self): + handler = mock.Mock() + m = mpv.MPV(video=False) + m.register_event_callback(handler) + m.play(TESTVID) + m.wait_for_playback() + + m.unregister_event_callback(handler) + handler.assert_has_calls([ + mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 6, 'event': None}), + mock.call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.NOTHING_TO_PLAY}}) + ], any_order=True) + handler.reset_mock() + + m.terminate() + handler.assert_not_called() + + @devnull_libmpv() + def test_wait_for_property_negative(self): + self.disp = Xvfb() + self.disp.start() + m = mpv.MPV() + m.play(TESTVID) + def run(): + nonlocal self + try: + m.wait_for_property('mute') + self.fail() + except mpv.ShutdownError: + pass + t = threading.Thread(target=run, daemon=True) + t.start() + time.sleep(1) + m.terminate() + t.join() + self.disp.stop() + + @devnull_libmpv() + def test_wait_for_property_positive(self): + self.disp = Xvfb() + self.disp.start() + handler = mock.Mock() + m = mpv.MPV() + m.play(TESTVID) + def run(): + nonlocal self + m.wait_for_property('mute') + handler() + t = threading.Thread(target=run, daemon=True) + t.start() + m.wait_until_playing() + m.mute = True + t.join() + m.terminate() + handler.assert_called() + self.disp.stop() + + @devnull_libmpv() + def test_wait_for_event(self): + self.disp = Xvfb() + self.disp.start() + handler = mock.Mock() + m = mpv.MPV() + m.play(TESTVID) + def run(): + nonlocal self + try: + m.wait_for_event('seek') + self.fail() + except mpv.ShutdownError: + pass + t = threading.Thread(target=run, daemon=True) + t.start() + time.sleep(1) + m.terminate() + t.join() + self.disp.stop() + + @devnull_libmpv() + def test_wait_for_property_shutdown(self): + self.disp = Xvfb() + self.disp.start() + handler = mock.Mock() + m = mpv.MPV() + m.play(TESTVID) + with self.assertRaises(mpv.ShutdownError): + # level_sensitive=false needed to prevent get_property on dead + # handle + with m.prepare_and_wait_for_property('mute', level_sensitive=False): + m.terminate() + self.disp.stop() + + @devnull_libmpv() + def test_wait_for_event_shutdown(self): + self.disp = Xvfb() + self.disp.start() + handler = mock.Mock() + m = mpv.MPV() + m.play(TESTVID) + with self.assertRaises(mpv.ShutdownError): + with m.prepare_and_wait_for_event('seek'): + m.terminate() + self.disp.stop() + + @devnull_libmpv() + def test_log_handler(self): + handler = mock.Mock() + self.disp = Xvfb() + self.disp.start() + m = mpv.MPV(log_handler=handler) + m.play(TESTVID) + # Wait for playback to start + m.wait_until_playing() + m.command("print-text", 'This is a python-mpv test') + m.wait_for_playback() + m.terminate() + for call in handler.mock_calls: + _1, (a, b, c), _2 = call + if a == 'info' and b == 'cplayer' and 'This is a python-mpv test' in c: + break + else: + self.fail('"Test log entry not found in log handler calls: '+','.join(repr(call) for call in handler.mock_calls)) + self.disp.stop() + + +class CommandTests(MpvTestCase): + + def test_loadfile_with_subtitles(self): + handler = mock.Mock() + self.m.property_observer('sub-text')(handler) + + self.m.loadfile(TESTVID, sub_file='sub_test.srt') + + self.m.wait_for_playback() + handler.assert_any_call('sub-text', 'This is\na subtitle test.') + handler.assert_any_call('sub-text', 'This is the second subtitle line.') + + def test_sub_add(self): + handler = mock.Mock() + self.m.property_observer('sub-text')(handler) + + self.m.loadfile(TESTVID) + self.m.wait_until_playing() + self.m.sub_add('sub_test.srt') + + self.m.wait_for_playback() + handler.assert_any_call('sub-text', 'This is\na subtitle test.') + handler.assert_any_call('sub-text', 'This is the second subtitle line.') + + +class RegressionTests(MpvTestCase): + + def test_unobserve_property_runtime_error(self): + """ + Ensure a `RuntimeError` is not thrown within + `unobserve_property`. + """ + handler = mock.Mock() + + self.m.observe_property('loop', handler) + + try: + self.m.unobserve_property('loop', handler) + except RuntimeError: + self.fail( + """ + "RuntimeError" exception thrown within + `unobserve_property` + """, + ) + + def test_instance_method_property_observer(self): + """ + Ensure that bound method objects can be used as property observers. + See issue #26 + """ + handler = mock.Mock() + m = self.m + + class T(object): + def t(self, *args, **kw): + handler(*args, **kw) + t = T() + + m.slang = 'ru' + time.sleep(0.5) + + m.observe_property('slang', t.t) + time.sleep(0.5) + + m.slang = 'jp' + time.sleep(0.5) + + m.slang = 'ru' + time.sleep(0.5) + + m.unobserve_property('slang', t.t) + time.sleep(0.5) + + m.slang = 'jp' + m.slang = 'ru' + m.terminate() # needed for synchronization of event thread + handler.assert_has_calls([mock.call('slang', ['jp']), mock.call('slang', ['ru'])]) + + +if __name__ == '__main__': + unittest.main() |