diff options
author | jaseg <code@jaseg.net> | 2019-11-30 21:15:57 +0100 |
---|---|---|
committer | jaseg <code@jaseg.net> | 2019-12-01 21:25:04 +0100 |
commit | 49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9 (patch) | |
tree | 1f34fa964ea79a0a75b753685bc72b580e8c5375 /mpv-test.py | |
parent | 3d6155871447ef5c5e4d54c39eb8ff573a3b49e9 (diff) | |
download | python-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.tar.gz python-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.tar.bz2 python-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.zip |
Add stream protocol handling
This allows you to directly feed bytes into mpv without going through a
file, FIFO etc. first. The new API is:
@player.register_stream_protocol(name)
@player.python_stream(name, size)
@player.python_stream_catchall
See their docstrings for their usage.
Diffstat (limited to 'mpv-test.py')
-rwxr-xr-x | mpv-test.py | 88 |
1 files changed, 86 insertions, 2 deletions
diff --git a/mpv-test.py b/mpv-test.py index 9e83dbb..3ac8985 100755 --- a/mpv-test.py +++ b/mpv-test.py @@ -350,6 +350,91 @@ class KeyBindingTest(MpvTestCase): self.assertNotIn(b('b'), self.m._key_binding_handlers) self.assertIn(b('c'), self.m._key_binding_handlers) +class TestStreams(unittest.TestCase): + def test_python_stream(self): + handler = mock.Mock() + + m = mpv.MPV() + m.register_event_callback(handler) + + @m.python_stream('foo') + def foo_gen(): + with open(TESTVID, 'rb') as f: + yield f.read() + + @m.python_stream('bar') + def bar_gen(): + yield b'' + + m.play('python://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.EOF, 'error': mpv.ErrorCode.SUCCESS}}) + handler.reset_mock() + + m.play('python://bar') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + handler.reset_mock() + + m.play('python://baz') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('foo://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + foo_gen.unregister() + + m.play('python://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('python://bar') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + handler.reset_mock() + + m.terminate() + + def test_custom_stream(self): + handler = mock.Mock() + fail_mock = mock.Mock(side_effect=ValueError) + stream_mock = mock.Mock() + stream_mock.seek = mock.Mock(return_value=0) + stream_mock.read = mock.Mock(return_value=b'') + + m = mpv.MPV(video=False) + m.register_event_callback(handler) + + m.register_stream_protocol('pythonfail', fail_mock) + + @m.register_stream_protocol('pythonsuccess') + def open_fn(uri): + self.assertEqual(uri, 'pythonsuccess://foo') + return stream_mock + + m.play('pythondoesnotexist://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('pythonfail://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('pythonsuccess://foo') + m.wait_for_playback() + stream_mock.seek.assert_any_call(0) + stream_mock.read.assert_called() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + + m.terminate() + class TestLifecycle(unittest.TestCase): def test_create_destroy(self): thread_names = lambda: [ t.name for t in threading.enumerate() ] @@ -386,8 +471,7 @@ class TestLifecycle(unittest.TestCase): m.unregister_event_callback(handler) handler.assert_has_calls([ mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 6, 'event': None}), - mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 9, 'event': None}), - mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 7, 'event': {'reason': 4}}), + mock.call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.NOTHING_TO_PLAY}}) ], any_order=True) handler.reset_mock() |