From 49e6f6f6d0a3340c2664792cdb5bcc059dc24ea9 Mon Sep 17 00:00:00 2001 From: jaseg Date: Sat, 30 Nov 2019 21:15:57 +0100 Subject: Add stream protocol handling This allows you to directly feed bytes into mpv without going through a file, FIFO etc. first. The new API is: @player.register_stream_protocol(name) @player.python_stream(name, size) @player.python_stream_catchall See their docstrings for their usage. --- README.rst | 18 +++++ mpv-test.py | 88 +++++++++++++++++++- mpv.py | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 365 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index c609969..e46da2b 100644 --- a/README.rst +++ b/README.rst @@ -135,6 +135,24 @@ Playlist handling print(player.playlist) player.wait_for_playback() +Directly feeding mpv data from python +..................................... + +.. code:: python + + #!/usr/bin/env python3 + import mpv + + player = mpv.MPV() + @player.python_stream('foo') + def reader(): + with open('test.webm', 'rb') as f: + while True: + yield f.read(1024*1024) + + player.play('python://foo') + player.wait_for_playback() + PyQT embedding .............. diff --git a/mpv-test.py b/mpv-test.py index 9e83dbb..3ac8985 100755 --- a/mpv-test.py +++ b/mpv-test.py @@ -350,6 +350,91 @@ class KeyBindingTest(MpvTestCase): self.assertNotIn(b('b'), self.m._key_binding_handlers) self.assertIn(b('c'), self.m._key_binding_handlers) +class TestStreams(unittest.TestCase): + def test_python_stream(self): + handler = mock.Mock() + + m = mpv.MPV() + m.register_event_callback(handler) + + @m.python_stream('foo') + def foo_gen(): + with open(TESTVID, 'rb') as f: + yield f.read() + + @m.python_stream('bar') + def bar_gen(): + yield b'' + + m.play('python://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.EOF, 'error': mpv.ErrorCode.SUCCESS}}) + handler.reset_mock() + + m.play('python://bar') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + handler.reset_mock() + + m.play('python://baz') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('foo://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + foo_gen.unregister() + + m.play('python://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('python://bar') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + handler.reset_mock() + + m.terminate() + + def test_custom_stream(self): + handler = mock.Mock() + fail_mock = mock.Mock(side_effect=ValueError) + stream_mock = mock.Mock() + stream_mock.seek = mock.Mock(return_value=0) + stream_mock.read = mock.Mock(return_value=b'') + + m = mpv.MPV(video=False) + m.register_event_callback(handler) + + m.register_stream_protocol('pythonfail', fail_mock) + + @m.register_stream_protocol('pythonsuccess') + def open_fn(uri): + self.assertEqual(uri, 'pythonsuccess://foo') + return stream_mock + + m.play('pythondoesnotexist://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('pythonfail://foo') + m.wait_for_playback() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.LOADING_FAILED}}) + handler.reset_mock() + + m.play('pythonsuccess://foo') + m.wait_for_playback() + stream_mock.seek.assert_any_call(0) + stream_mock.read.assert_called() + handler.assert_any_call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.UNKNOWN_FORMAT}}) + + m.terminate() + class TestLifecycle(unittest.TestCase): def test_create_destroy(self): thread_names = lambda: [ t.name for t in threading.enumerate() ] @@ -386,8 +471,7 @@ class TestLifecycle(unittest.TestCase): m.unregister_event_callback(handler) handler.assert_has_calls([ mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 6, 'event': None}), - mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 9, 'event': None}), - mock.call({'reply_userdata': 0, 'error': 0, 'event_id': 7, 'event': {'reason': 4}}), + mock.call({'reply_userdata': 0, 'error': 0, 'event_id': mpv.MpvEventID.END_FILE, 'event': {'reason': mpv.MpvEventEndFile.ERROR, 'error': mpv.ErrorCode.NOTHING_TO_PLAY}}) ], any_order=True) handler.reset_mock() diff --git a/mpv.py b/mpv.py index 7e8e4ae..882a30b 100644 --- a/mpv.py +++ b/mpv.py @@ -72,6 +72,14 @@ class ErrorCode(object): PROPERTY_UNAVAILABLE = -10 PROPERTY_ERROR = -11 COMMAND = -12 + LOADING_FAILED = -13 + AO_INIT_FAILED = -14 + VO_INIT_FAILED = -15 + NOTHING_TO_PLAY = -16 + UNKNOWN_FORMAT = -17 + UNSUPPORTED = -18 + NOT_IMPLEMENTED = -19 + GENERIC = -20 EXCEPTION_DICT = { 0: None, @@ -88,7 +96,17 @@ class ErrorCode(object): -9: lambda *a: TypeError('Tried to get/set mpv property using wrong format, or passed invalid value', *a), -10: lambda *a: PropertyUnavailableError('mpv property is not available', *a), -11: lambda *a: RuntimeError('Generic error getting or setting mpv property', *a), - -12: lambda *a: SystemError('Error running mpv command', *a) } + -12: lambda *a: SystemError('Error running mpv command', *a), + -14: lambda *a: RuntimeError('Initializing the audio output failed', *a), + -15: lambda *a: RuntimeError('Initializing the video output failed'), + -16: lambda *a: RuntimeError('There was no audio or video data to play. This also happens if the file ' + 'was recognized, but did not contain any audio or video streams, or no ' + 'streams were selected.'), + -17: lambda *a: RuntimeError('When trying to load the file, the file format could not be determined, ' + 'or the file was too broken to open it'), + -18: lambda *a: ValueError('Generic error for signaling that certain system requirements are not fulfilled'), + -19: lambda *a: NotImplementedError('The API function which was called is a stub only'), + -20: lambda *a: RuntimeError('Unspecified error') } @staticmethod def default_error_handler(ec, *args): @@ -282,14 +300,24 @@ class MpvEventLogMessage(Structure): 'level': self.level.decode('utf-8'), 'text': decoder(self.text).rstrip() } -class MpvEventEndFile(c_int): - EOF_OR_INIT_FAILURE = 0 +class MpvEventEndFile(Structure): + _fields_ = [('reason', c_int), + ('error', c_int)] + + EOF = 0 RESTARTED = 1 ABORTED = 2 QUIT = 3 + ERROR = 4 + REDIRECT = 5 + + # For backwards-compatibility + @property + def value(self): + return self.reason def as_dict(self, decoder=identity_decoder): - return {'reason': self.value} + return {'reason': self.reason, 'error': self.error} class MpvEventScriptInputDispatch(Structure): _fields_ = [('arg0', c_int), @@ -305,6 +333,22 @@ class MpvEventClientMessage(Structure): def as_dict(self, decoder=identity_decoder): return { 'args': [ self.args[i].decode('utf-8') for i in range(self.num_args) ] } +StreamReadFn = CFUNCTYPE(c_int64, c_void_p, POINTER(c_char), c_uint64) +StreamSeekFn = CFUNCTYPE(c_int64, c_void_p, c_int64) +StreamSizeFn = CFUNCTYPE(c_int64, c_void_p) +StreamCloseFn = CFUNCTYPE(None, c_void_p) +StreamCancelFn = CFUNCTYPE(None, c_void_p) + +class StreamCallbackInfo(Structure): + _fields_ = [('cookie', c_void_p), + ('read', StreamReadFn), + ('seek', StreamSeekFn), + ('size', StreamSizeFn), + ('close', StreamCloseFn), ] +# ('cancel', StreamCancelFn)] + +StreamOpenFn = CFUNCTYPE(c_int, c_void_p, c_char_p, POINTER(StreamCallbackInfo)) + WakeupCallback = CFUNCTYPE(None, c_void_p) OpenGlCbUpdateFn = CFUNCTYPE(None, c_void_p) @@ -387,6 +431,8 @@ _handle_func('mpv_wakeup', [], _handle_func('mpv_set_wakeup_callback', [WakeupCallback, c_void_p], None, errcheck=None) _handle_func('mpv_get_wakeup_pipe', [], c_int, errcheck=None) +_handle_func('mpv_stream_cb_add_ro', [c_char_p, c_void_p, StreamOpenFn], c_int, ec_errcheck) + _handle_func('mpv_get_sub_api', [MpvSubApi], c_void_p, notnull_errcheck) _handle_gl_func('mpv_opengl_cb_set_update_callback', [OpenGlCbUpdateFn, c_void_p]) @@ -517,6 +563,40 @@ class _DecoderPropertyProxy(_PropertyProxy): def __setattr__(self, name, value): setattr(self.mpv, _py_to_mpv(name), value) +class GeneratorStream: + """Transform a python generator into an mpv-compatible stream object. This only supports size() and read(), and + does not support seek(), close() or cancel(). + """ + + def __init__(self, generator_fun, size=None): + self._generator_fun = generator_fun + self.size = size + + def seek(self, offset): + self._read_iter = iter(self._generator_fun()) + self._read_chunk = b'' + return 0 # We only support seeking to the first byte atm + # implementation in case seeking to arbitrary offsets would be necessary + # while offset > 0: + # offset -= len(self.read(offset)) + # return offset + + def read(self, size): + if not self._read_chunk: + try: + self._read_chunk += next(self._read_iter) + except StopIteration: + return b'' + rv, self._read_chunk = self._read_chunk[:size], self._read_chunk[size:] + return rv + + def close(self): + self._read_iter = iter([]) # make next read() call return EOF + + def cancel(self): + self._read_iter = iter([]) # make next read() call return EOF + # TODO? + class MPV(object): """See man mpv(1) for the details of the implemented commands. All mpv properties can be accessed as ``my_mpv.some_property`` and all mpv options can be accessed as ``my_mpv['some-option']``. @@ -565,6 +645,11 @@ class MPV(object): self._event_handle = _mpv_create_client(self.handle, b'py_event_handler') self._loop = partial(_event_loop, self._event_handle, self._playback_cond, self._event_callbacks, self._message_handlers, self._property_handlers, log_handler) + self._stream_protocol_cbs = {} + self._stream_protocol_frontends = collections.defaultdict(lambda: {}) + self.register_stream_protocol('python', self._python_stream_open) + self._python_streams = {} + self._python_stream_catchall = None if loglevel is not None or log_handler is not None: self.set_loglevel(loglevel or 'terminal-default') if start_event_thread: @@ -1028,6 +1113,87 @@ class MPV(object): if not self._key_binding_handlers: self.unregister_message_handler('key-binding') + def register_stream_protocol(self, proto, open_fn=None): + """ Register a custom stream protocol as documented in libmpv/stream_cb.h: + https://github.com/mpv-player/mpv/blob/master/libmpv/stream_cb.h + + proto is the protocol scheme, e.g. "foo" for "foo://" urls. + + This function can either be used with two parameters or it can be used as a decorator on the target + function. + + open_fn is a function taking an URI string and returning an mpv stream object. + open_fn may raise a ValueError to signal libmpv the URI could not be opened. + + The mpv stream protocol is as follows: + class Stream: + @property + def size(self): + return None # unknown size + return size # int with size in bytes + + def read(self, size): + ... + return read # non-empty bytes object with input + return b'' # empty byte object signals permanent EOF + + def seek(self, pos): + return new_offset # integer with new byte offset. The new offset may be before the requested offset + in case an exact seek is inconvenient. + + def close(self): + ... + + # def cancel(self): (future API versions only) + # Abort a running read() or seek() operation + # ... + + """ + + def decorator(open_fn): + @StreamOpenFn + def open_backend(_userdata, uri, cb_info): + try: + frontend = open_fn(uri.decode('utf-8')) + except ValueError: + return ErrorCode.LOADING_FAILED + + def read_backend(_userdata, buf, bufsize): + data = frontend.read(bufsize) + for i in range(len(data)): + buf[i] = data[i] + return len(data) + + cb_info.contents.cookie = None + read = cb_info.contents.read = StreamReadFn(read_backend) + close = cb_info.contents.close = StreamCloseFn(lambda _userdata: frontend.close()) + + seek, size, cancel = None, None, None + if hasattr(frontend, 'seek'): + seek = cb_info.contents.seek = StreamSeekFn(lambda _userdata, offx: frontend.seek(offx)) + if hasattr(frontend, 'size') and frontend.size is not None: + size = cb_info.contents.size = StreamSizeFn(lambda _userdata: frontend.size) + + # Future API versions only + # if hasattr(frontend, 'cancel'): + # cb_info.contents.cancel = StreamCancelFn(lambda _userdata: frontend.cancel()) + + # keep frontend and callbacks in memory forever (TODO) + frontend._registered_callbacks = [read, close, seek, size, cancel] + self._stream_protocol_frontends[proto][uri] = frontend + return 0 + + if proto in self._stream_protocol_cbs: + raise KeyError('Stream protocol already registered') + self._stream_protocol_cbs[proto] = [open_backend] + _mpv_stream_cb_add_ro(self.handle, proto.encode('utf-8'), c_void_p(), open_backend) + + return open_fn + + if open_fn is not None: + decorator(open_fn) + return decorator + # Convenience functions def play(self, filename): """Play a path or URL (requires ``ytdl`` option to be set).""" @@ -1043,6 +1209,97 @@ class MPV(object): ``MPV.loadfile(filename, 'append-play')``.""" self.loadfile(filename, 'append', **options) + # "Python stream" logic. This is some porcelain for directly playing data from python generators. + + def _python_stream_open(self, uri): + """Internal handler for python:// protocol streams registered through @python_stream(...) and + @python_stream_catchall + """ + name, = re.fullmatch('python://(.*)', uri).groups() + + if name in self._python_streams: + generator_fun, size = self._python_streams[name] + else: + if self._python_stream_catchall is not None: + generator_fun, size = self._python_stream_catchall(name) + else: + raise ValueError('Python stream name not found and no catch-all defined') + + return GeneratorStream(generator_fun, size) + + def python_stream(self, name=None, size=None): + """Register a generator for the python stream with the given name. + + name is the name, i.e. the part after the "python://" in the URI, that this generator is registered as. + size is the total number of bytes in the stream (if known). + + Any given name can only be registered once. The catch-all can also only be registered once. To unregister a + stream, call the .unregister function set on the callback. + + The generator signals EOF by returning, manually raising StopIteration or by yielding b'', an empty bytes + object. + + The generator may be called multiple times if libmpv seeks or loops. + + See also: @mpv.python_stream_catchall + + @mpv.python_stream('foobar') + def reader(): + for chunk in chunks: + yield chunk + mpv.play('python://foobar') + mpv.wait_for_playback() + reader.unregister() + """ + def register(cb): + if name in self._python_streams: + raise KeyError(f'Python stream name "{name}" is already registered') + self._python_streams[name] = (cb, size) + def unregister(): + if name not in self._python_streams or\ + self._python_streams[name][0] is not cb: # This is just a basic sanity check + raise RuntimeError('Python stream has already been unregistered') + del self._python_streams[name] + cb.unregister = unregister + return cb + return register + + def python_stream_catchall(self, cb): + """ Register a catch-all python stream to be called when no name matches can be found. Use this decorator on a + function that takes a name argument and returns a (generator, size) tuple (with size being None if unknown). + + An invalid URI can be signalled to libmpv by raising a ValueError inside the callback. + + See also: @mpv.python_stream(name, size) + + @mpv.python_stream_catchall + def catchall(name): + if not name.startswith('foo'): + raise ValueError('Unknown Name') + + def foo_reader(): + with open(name, 'rb') as f: + while True: + chunk = f.read(1024) + if not chunk: + break + yield chunk + return foo_reader, None + mpv.play('python://foo23') + mpv.wait_for_playback() + catchall.unregister() + """ + if self._python_stream_catchall is not None: + raise KeyError('A catch-all python stream is already registered') + + self._python_stream_catchall = cb + def unregister(): + if self._python_stream_catchall is not cb: + raise RuntimeError('This catch-all python stream has already been unregistered') + self._python_stream_catchall = None + cb.unregister = unregister + return cb + # Property accessors def _get_property(self, name, decoder=strict_decoder, fmt=MpvFormat.NODE): out = create_string_buffer(sizeof(MpvNode)) -- cgit