summaryrefslogtreecommitdiff
path: root/mpv-test.py
diff options
context:
space:
mode:
authorjaseg <code@jaseg.net>2019-11-30 21:15:57 +0100
committerjaseg <code@jaseg.net>2019-12-01 21:25:04 +0100
commit49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9 (patch)
tree1f34fa964ea79a0a75b753685bc72b580e8c5375 /mpv-test.py
parent3d6155871447ef5c5e4d54c39eb8ff573a3b49e9 (diff)
downloadpython-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.tar.gz
python-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.tar.bz2
python-mpv-49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9.zip
Add stream protocol handling
This allows you to directly feed bytes into mpv without going through a file, FIFO etc. first. The new API is: @player.register_stream_protocol(name) @player.python_stream(name, size) @player.python_stream_catchall See their docstrings for their usage.
Diffstat (limited to 'mpv-test.py')
-rwxr-xr-xmpv-test.py88
1 files changed, 86 insertions, 2 deletions
diff --git a/mpv-test.py b/mpv-test.py
index 9e83dbb..3ac8985 100755
--- a/mpv-test.py
+++ b/mpv-test.py
@@ -350,6 +350,91 @@ class KeyBindingTest(MpvTestCase):
self.assertNotIn(b('b'), self.m._key_binding_handlers)
self.assertIn(b('c'), self.m._key_binding_handlers)
+class TestStreams(unittest.TestCase):
+ def test_python_stream(self):
+ handler = mock.Mock()
+
+ m = mpv.MPV()
+ m.register_event_callback(handler)
+
+ @m.python_stream('foo')
+ def foo_gen():
+ with open(TESTVID, 'rb') as f:
+ yield f.read()
+
+ @m.python_stream('bar')
+ def bar_gen():
+ yield b''
+
+ m.play('python://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.EOF, 'error': mpv.ErrorCode.SUCCESS}})
+ handler.reset_mock()
+
+ m.play('python://bar')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
+ handler.reset_mock()
+
+ m.play('python://baz')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('foo://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ foo_gen.unregister()
+
+ m.play('python://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('python://bar')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
+ handler.reset_mock()
+
+ m.terminate()
+
+ def test_custom_stream(self):
+ handler = mock.Mock()
+ fail_mock = mock.Mock(side_effect=ValueError)
+ stream_mock = mock.Mock()
+ stream_mock.seek = mock.Mock(return_value=0)
+ stream_mock.read = mock.Mock(return_value=b'')
+
+ m = mpv.MPV(video=False)
+ m.register_event_callback(handler)
+
+ m.register_stream_protocol('pythonfail', fail_mock)
+
+ @m.register_stream_protocol('pythonsuccess')
+ def open_fn(uri):
+ self.assertEqual(uri, 'pythonsuccess://foo')
+ return stream_mock
+
+ m.play('pythondoesnotexist://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('pythonfail://foo')
+ m.wait_for_playback()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}})
+ handler.reset_mock()
+
+ m.play('pythonsuccess://foo')
+ m.wait_for_playback()
+ stream_mock.seek.assert_any_call(0)
+ stream_mock.read.assert_called()
+ handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}})
+
+ m.terminate()
+
class TestLifecycle(unittest.TestCase):
def test_create_destroy(self):
thread_names = lambda: [ t.name for t in threading.enumerate() ]
@@ -386,8 +471,7 @@ class TestLifecycle(unittest.TestCase):
m.unregister_event_callback(handler)
handler.assert_has_calls([
mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 6, 'event': None}),
- mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 9, 'event': None}),
- mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 7, 'event': {'reason': 4}}),
+ mock.call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.NOTHING_TO_PLAY}})
], any_order=True)
handler.reset_mock()